Go语言并发模式常见并发范式1. Go并发基础回顾Go语言的并发模型基于CSPCommunicating Sequential Processes理论通过goroutine和channel实现。理解这些基础概念对于掌握并发模式至关重要。GoroutineGoroutine是由Go运行时管理的轻量级线程创建成本极低package main import ( fmt time ) func main() { // 启动一个goroutine go sayHello(World) // 主goroutine继续执行 sayHello(Go) // 等待一段时间让goroutine完成 time.Sleep(time.Second) } func sayHello(name string) { fmt.Printf(Hello, %s!\n, name) }Channel基础Channel是goroutine之间通信的管道package main import fmt func main() { // 创建一个无缓冲channel ch : make(chan int) // 发送数据 go func() { ch - 42 }() // 接收数据 value : -ch fmt.Println(Received:, value) }2. 生产者-消费者模式生产者-消费者是最经典的并发模式之一用于解耦数据生产和数据处理的速率差异。2.1 基础版本package pipeline import ( fmt sync ) // Producer 生产者生成数据并发送到channel func Producer(n int) -chan int { out : make(chan int) go func() { defer close(out) for i : 0; i n; i { out - i } }() return out } // Consumer 消费者从channel接收数据并处理 func Consumer(in -chan int) { for v : range in { fmt.Printf(Processed: %d\n, v*v) // 模拟处理计算平方 } } // ProducerConsumerDemo 演示基本的生产者-消费者 func ProducerConsumerDemo() { ch : Producer(10) Consumer(ch) }2.2 带缓冲的版本package pipeline import ( fmt time ) // BufferedProducer 带缓冲的生产者 func BufferedProducer(n int, bufferSize int) -chan int { out : make(chan int, bufferSize) // 带缓冲的channel go func() { defer close(out) for i : 0; i n; i { // 当缓冲区满时会阻塞 out - i fmt.Printf(Produced: %d (buffer size: %d)\n, i, bufferSize) } }() return out } // ConsumerWithTimeout 带超时的消费者 func ConsumerWithTimeout(in -chan int, timeout time.Duration) { for { select { case v, ok : -in: if !ok { fmt.Println(Channel closed, consumer exiting) return } fmt.Printf(Processed: %d\n, v*v) case -time.After(timeout): fmt.Println(Timeout, consumer exiting) return } } }2.3 多个生产者和消费者package pipeline import ( fmt sync ) // Task 任务结构 type Task struct { ID int Data int } // Result 结果结构 type Result struct { TaskID int Output int } // MultiProducerConsumer 多生产者多消费者模式 type MultiProducerConsumer struct { producers int consumers int taskQueue chan Task resultQueue chan Result wg sync.WaitGroup } // NewMultiProducerConsumer 创建多生产者消费者 func NewMultiProducerConsumer(producers, consumers, queueSize int) *MultiProducerConsumer { return MultiProducerConsumer{ producers: producers, consumers: consumers, taskQueue: make(chan Task, queueSize), resultQueue: make(chan Result, queueSize), } } // Start 启动所有生产者和消费者 func (m *MultiProducerConsumer) Start() { // 启动生产者 for i : 0; i m.producers; i { m.wg.Add(1) go m.producer(i) } // 启动消费者 for i : 0; i m.consumers; i { m.wg.Add(1) go m.consumer(i) } // 等待所有goroutine完成 go func() { m.wg.Wait() close(m.resultQueue) }() } // producer 生产者goroutine func (m *MultiProducerConsumer) producer(id int) { defer m.wg.Done() for i : 0; i 5; i { task : Task{ ID: id*100 i, Data: i * 10, } m.taskQueue - task fmt.Printf(Producer %d produced task %d\n, id, task.ID) } } // consumer 消费者goroutine func (m *MultiProducerConsumer) consumer(id int) { defer m.wg.Done() for task : range m.taskQueue { result : Result{ TaskID: task.ID, Output: task.Data * task.Data, } m.resultQueue - result fmt.Printf(Consumer %d processed task %d\n, id, task.ID) } } // GetResults 返回结果channel func (m *MultiProducerConsumer) GetResults() -chan Result { return m.resultQueue }3. 发布-订阅模式发布-订阅模式用于解耦消息发送方和接收方发送方不需要知道接收方的存在。3.1 基础的Pub-Sub实现package pubsub import ( fmt sync ) // Topic 主题类型 type Topic string // Message 消息结构 type Message struct { Topic Topic Content interface{} } // Subscriber 订阅者接口 type Subscriber interface { ID() string Topics() []Topic Consume(msg Message) } // PubSub 发布-订阅系统 type PubSub struct { mu sync.RWMutex subscribers map[Topic][]Subscriber bufferSize int } // NewPubSub 创建发布-订阅系统 func NewPubSub(bufferSize int) *PubSub { return PubSub{ subscribers: make(map[Topic][]Subscriber), bufferSize: bufferSize, } } // Subscribe 订阅主题 func (p *PubSub) Subscribe(sub Subscriber) { p.mu.Lock() defer p.mu.Unlock() for _, topic : range sub.Topics() { p.subscribers[topic] append(p.subscribers[topic], sub) fmt.Printf(Subscriber %s subscribed to topic %s\n, sub.ID(), topic) } } // Unsubscribe 取消订阅 func (p *PubSub) Unsubscribe(sub Subscriber) { p.mu.Lock() defer p.mu.Unlock() for _, topic : range sub.Topics() { subs : p.subscribers[topic] for i, s : range subs { if s.ID() sub.ID() { // 删除订阅者 p.subscribers[topic] append(subs[:i], subs[i1:]...) fmt.Printf(Subscriber %s unsubscribed from topic %s\n, sub.ID(), topic) break } } } } // Publish 发布消息 func (p *PubSub) Publish(msg Message) { p.mu.RLock() defer p.mu.RUnlock() subscribers : p.subscribers[msg.Topic] for _, sub : range subscribers { go sub.Consume(msg) } } // SimpleSubscriber 简单订阅者实现 type SimpleSubscriber struct { id string topics []Topic msgCh chan Message done chan struct{} } // NewSimpleSubscriber 创建订阅者 func NewSimpleSubscriber(id string, topics []Topic, bufferSize int) *SimpleSubscriber { return SimpleSubscriber{ id: id, topics: topics, msgCh: make(chan Message, bufferSize), done: make(chan struct{}), } } func (s *SimpleSubscriber) ID() string { return s.id } func (s *SimpleSubscriber) Topics() []Topic { return s.topics } func (s *SimpleSubscriber) Consume(msg Message) { select { case s.msgCh - msg: case -s.done: } } // Start 开始处理消息 func (s *SimpleSubscriber) Start(handler func(msg Message)) { go func() { for { select { case msg : -s.msgCh: handler(msg) case -s.done: return } } }() } // Stop 停止订阅者 func (s *SimpleSubscriber) Stop() { close(s.done) }3.2 使用示例package pubsub import ( fmt time ) func ExamplePubSub() { ps : NewPubSub(100) // 创建订阅者 sub1 : NewSimpleSubscriber(sub1, []Topic{news, sports}, 10) sub2 : NewSimpleSubscriber(sub2, []Topic{news}, 10) // 订阅 ps.Subscribe(sub1) ps.Subscribe(sub2) // 启动消息处理 sub1.Start(func(msg Message) { fmt.Printf(Sub1 received on %s: %v\n, msg.Topic, msg.Content) }) sub2.Start(func(msg Message) { fmt.Printf(Sub2 received on %s: %v\n, msg.Topic, msg.Content) }) // 发布消息 ps.Publish(Message{Topic: news, Content: Breaking news!}) ps.Publish(Message{Topic: sports, Content: Game started!}) // 等待消息处理 time.Sleep(time.Second) sub1.Stop() sub2.Stop() }4. 管道模式Pipeline管道模式将多个处理阶段串联起来数据像水流一样从一端流向另一端。4.1 基础管道package pipeline import fmt // Pipeline 管道接口 type Pipeline interface { Process(data interface{}) interface{} } // IntPipeline 整数管道 type IntPipeline struct { stages []func(int) int } // NewIntPipeline 创建整数管道 func NewIntPipeline() *IntPipeline { return IntPipeline{stages: make([]func(int) int, 0)} } // AddStage 添加处理阶段 func (p *IntPipeline) AddStage(fn func(int) int) *IntPipeline { p.stages append(p.stages, fn) return p } // Execute 执行管道 func (p *IntPipeline) Execute(input -chan int) -chan int { out : make(chan int) go func() { defer close(out) for v : range input { result : v for _, stage : range p.stages { result stage(result) } out - result } }() return out } // 示例函数 func double(n int) int { return n * 2 } func square(n int) int { return n * n } func increment(n int) int { return n 1 } func ExamplePipeline() { // 创建管道double - square - increment pipe : NewIntPipeline(). AddStage(double). AddStage(square). AddStage(increment) // 创建输入 input : make(chan int) go func() { defer close(input) for i : 1; i 5; i { input - i } }() // 执行管道 output : pipe.Execute(input) for v : range output { fmt.Printf(Result: %d\n, v) } }4.2 并行管道package pipeline import ( fmt sync ) // ParallelPipeline 并行处理管道 type ParallelPipeline struct { stageFunc func(interface{}) interface{} workers int } // NewParallelPipeline 创建并行管道 func NewParallelPipeline(workers int, stageFunc func(interface{}) interface{}) *ParallelPipeline { return ParallelPipeline{ stageFunc: stageFunc, workers: workers, } } // Process 并行处理数据 func (p *ParallelPipeline) Process(input -chan interface{}) -chan interface{} { out : make(chan interface{}) var wg sync.WaitGroup for i : 0; i p.workers; i { wg.Add(1) go func() { defer wg.Done() for data : range input { out - p.stageFunc(data) } }() } go func() { wg.Wait() close(out) }() return out } // FanIn 合并多个channel func FanIn(channels ...-chan interface{}) -chan interface{} { out : make(chan interface{}) var wg sync.WaitGroup wg.Add(len(channels)) for _, ch : range channels { go func(c -chan interface{}) { defer wg.Done() for v : range c { out - v } }(ch) } go func() { wg.Wait() close(out) }() return out } func ExampleParallelPipeline() { input : make(chan interface{}) go func() { defer close(input) for i : 1; i 100; i { input - i } }() // 使用3个worker并行处理 pipe : NewParallelPipeline(3, func(v interface{}) interface{} { return v.(int) * 2 }) output : pipe.Process(input) count : 0 for v : range output { count if count 5 { fmt.Printf(Processed: %v\n, v) } } fmt.Printf(Total processed: %d\n, count) }5. 扇入-扇出模式扇出Fan-out将一个任务分散到多个goroutine并行处理扇入Fan-in将多个结果合并成一个。5.1 基础扇入-扇出package fanout import ( fmt sync ) // Job 工作单元 type Job struct { ID int Value int } // Result 结果单元 type Result struct { JobID int Output int } // Worker 工作函数 type Worker func(Job) Result // FanOut 扇出将任务分配给多个worker func FanOut(jobs -chan Job, numWorkers int, worker Worker) []-chan Result { resultChans : make([]-chan Result, numWorkers) for i : 0; i numWorkers; i { resultChans[i] fanOutWorker(jobs, worker) } return resultChans } func fanOutWorker(jobs -chan Job, worker Worker) -chan Result { out : make(chan Result) go func() { defer close(out) for job : range jobs { out - worker(job) } }() return out } // FanIn 扇入合并多个结果channel func FanIn(resultChans ...-chan Result) -chan Result { out : make(chan Result) var wg sync.WaitGroup wg.Add(len(resultChans)) for _, ch : range resultChans { go func(c -chan Result) { defer wg.Done() for result : range c { out - result } }(ch) } go func() { wg.Wait() close(out) }() return out } // 实际应用示例 func ExampleFanOutFanIn() { jobs : make(chan Job, 100) // 启动任务生产者 go func() { defer close(jobs) for i : 1; i 20; i { jobs - Job{ID: i, Value: i} } }() // 定义worker函数 worker : func(job Job) Result { // 模拟耗时操作 // time.Sleep(time.Millisecond * 10) return Result{ JobID: job.ID, Output: job.Value * job.Value, } } // 扇出使用5个worker处理 resultChans : FanOut(jobs, 5, worker) // 扇入合并结果 results : FanIn(resultChans...) // 收集结果 count : 0 for result : range results { count if count 5 { fmt.Printf(Job %d - Result %d\n, result.JobID, result.Output) } } fmt.Printf(Total results: %d\n, count) }5.2 带优先级的扇入-扇出package fanout import ( fmt sync ) // PriorityJob 带优先级的任务 type PriorityJob struct { Job Priority int } // PriorityResult 带优先级的结果 type PriorityResult struct { Result Priority int } // MergeWithPriority 按优先级合并结果 func MergeWithPriority(resultChans ...-chan PriorityResult) -chan PriorityResult { out : make(chan PriorityResult) var wg sync.WaitGroup wg.Add(len(resultChans)) for _, ch : range resultChans { go func(c -chan PriorityResult) { defer wg.Done() for result : range c { out - result } }(ch) } go func() { wg.Wait() close(out) }() return out } // OrderedMerge 有序合并保持原始顺序 func OrderedMerge[T any](chans ...-chan T) -chan T { out : make(chan T) var wg sync.WaitGroup wg.Add(len(chans)) for _, ch : range chans { go func(c -chan T) { defer wg.Done() for v : range c { out - v } }(ch) } go func() { wg.Wait() close(out) }() return out }6. Context取消控制Context是Go 1.7引入的重要功能用于在goroutine之间传递取消信号和截止时间。6.1 Context基础package contextDemo import ( context fmt time ) // ContextWithCancel 展示context取消机制 func ContextWithCancel() { // 创建可取消的context ctx, cancel : context.WithCancel(context.Background()) go func() { for { select { case -ctx.Done(): fmt.Println(Goroutine cancelled:, ctx.Err()) return default: fmt.Println(Working...) time.Sleep(time.Second) } } }() // 主goroutine等待2秒后取消 time.Sleep(2 * time.Second) fmt.Println(Cancelling...) cancel() time.Sleep(time.Second) } // ContextWithTimeout 带超时的context func ContextWithTimeout() { ctx, cancel : context.WithTimeout(context.Background(), 2*time.Second) defer cancel() done : make(chan bool) go func() { for { select { case -ctx.Done(): fmt.Println(Operation timed out:, ctx.Err()) done - true return default: fmt.Println(Operation in progress...) time.Sleep(500 * time.Millisecond) } } }() -done } // ContextWithDeadline 带截止时间的context func ContextWithDeadline() { deadline : time.Now().Add(3 * time.Second) ctx, cancel : context.WithDeadline(context.Background(), deadline) defer cancel() for { select { case -ctx.Done(): fmt.Println(Deadline reached:, ctx.Err()) return default: fmt.Println(Working towards deadline...) time.Sleep(500 * time.Millisecond) } } }6.2 在HTTP服务中使用Contextpackage contextDemo import ( context fmt net/http time ) // RequestIDKey context中存储request ID的key type contextKey string const RequestIDKey contextKey requestID // middleware 中间件添加request ID func middleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // 生成或获取request ID requestID : r.Header.Get(X-Request-ID) if requestID { requestID fmt.Sprintf(req-%d, time.Now().UnixNano()) } // 将request ID放入context ctx : context.WithValue(r.Context(), RequestIDKey, requestID) next.ServeHTTP(w, r.WithContext(ctx)) }) } // HandlerWithContext 使用context的处理器 func HandlerWithContext(w http.ResponseWriter, r *http.Request) { requestID : r.Context().Value(RequestIDKey) if requestID ! nil { fmt.Fprintf(w, Request ID: %s\n, requestID) } } // LongRunningOperation 长时间运行的操作 func LongRunningOperation(ctx context.Context) error { for i : 0; i 10; i { select { case -ctx.Done(): return ctx.Err() default: fmt.Printf(Processing step %d...\n, i1) time.Sleep(time.Second) } } return nil } // CancellationDemo 取消演示 func CancellationDemo() { ctx, cancel : context.WithCancel(context.Background()) go func() { err : LongRunningOperation(ctx) if err ! nil { fmt.Printf(Operation cancelled: %v\n, err) } }() // 模拟客户端取消 time.Sleep(3 * time.Second) fmt.Println(Client cancelled the operation) cancel() time.Sleep(2 * time.Second) }6.3 完整的Context取消模式package contextDemo import ( context fmt sync time ) // TaskResult 任务结果 type TaskResult struct { ID int Value string } // ContextManager 带有context管理的任务管理器 type ContextManager struct { ctx context.Context cancel context.CancelFunc wg sync.WaitGroup } // NewContextManager 创建context管理器 func NewContextManager(timeout time.Duration) *ContextManager { ctx, cancel : context.WithTimeout(context.Background(), timeout) return ContextManager{ ctx: ctx, cancel: cancel, } } // RunTask 运行任务支持取消 func (m *ContextManager) RunTask(id int, task func() (string, error)) -chan TaskResult { resultCh : make(chan TaskResult, 1) m.wg.Add(1) go func() { defer m.wg.Done() defer close(resultCh) // 创建任务的子context taskCtx, taskCancel : context.WithCancel(m.ctx) defer taskCancel() // 使用select监听取消信号 done : make(chan struct{}) go func() { value, err : task() close(done) if err nil { select { case resultCh - TaskResult{ID: id, Value: value}: case -taskCtx.Done(): } } }() select { case -done: // 任务正常完成 case -taskCtx.Done(): // 任务被取消 fmt.Printf(Task %d cancelled\n, id) } }() return resultCh } // Wait 等待所有任务完成 func (m *ContextManager) Wait() { m.wg.Wait() } // Cancel 取消所有任务 func (m *ContextManager) Cancel() { m.cancel() } // ExampleContextManager 使用示例 func ExampleContextManager() { manager : NewContextManager(5 * time.Second) // 模拟多个任务 tasks : []func() (string, error){ func() (string, error) { time.Sleep(1 * time.Second) return Task 1 completed, nil }, func() (string, error) { time.Sleep(2 * time.Second) return Task 2 completed, nil }, func() (string, error) { time.Sleep(10 * time.Second) // 这个任务会超时 return Task 3 completed, nil }, } // 启动所有任务 results : make([]-chan TaskResult, len(tasks)) for i, task : range tasks { results[i] manager.RunTask(i, task) } // 收集结果 done : make(chan struct{}) go func() { for i, ch : range results { for result : range ch { fmt.Printf(Result from task %d: %s\n, result.ID, result.Value) } fmt.Printf(Task %d finished\n, i) } close(done) }() // 等待完成或超时 select { case -done: fmt.Println(All tasks completed) case -manager.ctx.Done(): fmt.Println(Timeout reached, cancelling...) } manager.Wait() }7. 实战完整的并发爬虫示例package crawler import ( context fmt sync time ) // Page 页面结构 type Page struct { URL string Title string } // Fetcher 页面抓取器接口 type Fetcher interface { Fetch(url string) (*Page, error) } // SimpleFetcher 简单抓取器实现 type SimpleFetcher struct{} func (f *SimpleFetcher) Fetch(url string) (*Page, error) { // 模拟网络请求 time.Sleep(100 * time.Millisecond) return Page{URL: url, Title: fmt.Sprintf(Page for %s, url)}, nil } // CrawlerConfig 爬虫配置 type CrawlerConfig struct { MaxWorkers int MaxDepth int BufferSize int } // Crawler 并发爬虫 type Crawler struct { config CrawlerConfig fetcher Fetcher visited map[string]bool visitedMu sync.RWMutex } // NewCrawler 创建爬虫 func NewCrawler(config CrawlerConfig, fetcher Fetcher) *Crawler { return Crawler{ config: config, fetcher: fetcher, visited: make(map[string]bool), } } // isVisited 检查URL是否已访问 func (c *Crawler) isVisited(url string) bool { c.visitedMu.RLock() defer c.visitedMu.RUnlock() return c.visited[url] } // markVisited 标记URL已访问 func (c *Crawler) markVisited(url string) { c.visitedMu.Lock() defer c.visitedMu.Unlock() c.visited[url] true } // Crawl 开始爬取 func (c *Crawler) Crawl(ctx context.Context, urls []string) ([]*Page, error) { pages : make([]*Page, 0) var pagesMu sync.Mutex var wg sync.WaitGroup // URL队列 urlQueue : make(chan string, c.config.BufferSize) // 启动worker pool for i : 0; i c.config.MaxWorkers; i { wg.Add(1) go func(workerID int) { defer wg.Done() c.worker(ctx, workerID, urlQueue, pages, pagesMu) }(i) } // 添加初始URL for _, url : range urls { if !c.isVisited(url) { c.markVisited(url) urlQueue - url } } // 等待完成 go func() { wg.Wait() close(urlQueue) }() // 监听context取消 -ctx.Done() return pages, ctx.Err() } // worker worker goroutine func (c *Crawler) worker(ctx context.Context, id int, urls -chan string, pages *[]*Page, pagesMu *sync.Mutex) { for { select { case -ctx.Done(): fmt.Printf(Worker %d cancelled\n, id) return case url, ok : -urls: if !ok { fmt.Printf(Worker %d exiting\n, id) return } fmt.Printf(Worker %d fetching: %s\n, id, url) page, err : c.fetcher.Fetch(url) if err ! nil { fmt.Printf(Worker %d error fetching %s: %v\n, id, url, err) continue } pagesMu.Lock() *pages append(*pages, page) pagesMu.Unlock() // 模拟发现新URL // for _, newURL : range extractLinks(page) { // if !c.isVisited(newURL) { // c.markVisited(newURL) // select { // case urls - newURL: // case -ctx.Done(): // return // } // } // } } } } // ExampleCrawler 使用示例 func ExampleCrawler() { config : CrawlerConfig{ MaxWorkers: 3, MaxDepth: 2, BufferSize: 10, } fetcher : SimpleFetcher{} crawler : NewCrawler(config, fetcher) // 创建带超时的context ctx, cancel : context.WithTimeout(context.Background(), 10*time.Second) defer cancel() urls : []string{ https://example.com/page1, https://example.com/page2, https://example.com/page3, https://example.com/page4, https://example.com/page5, } pages, err : crawler.Crawl(ctx, urls) if err ! nil { fmt.Printf(Crawl finished with error: %v\n, err) } fmt.Printf(Total pages crawled: %d\n, len(pages)) for i, page : range pages { if i 5 { fmt.Printf( - %s: %s\n, page.URL, page.Title) } } }8. 常见并发模式总结模式对比模式适用场景关键特性生产者-消费者解耦生产和消费速率使用channel缓冲发布-订阅一对多通知订阅者独立处理管道多阶段处理数据流式处理扇入-扇出并行处理任务工作池模式Context取消请求生命周期管理层级取消信号最佳实践channel关闭原则通常由发送方关闭channel使用sync.WaitGroup等待goroutine完成使用defer close()确保资源释放避免死锁确保不会发生循环等待使用缓冲channel避免阻塞使用select处理多个channelcontext使用在HTTP服务中传递请求scope的context使用WithCancel/WithTimeout/WithDeadline不要将context用于非取消场景的值传递goroutine生命周期确保所有goroutine都能退出使用errgroup管理相关goroutine避免创建过多的goroutine错误处理在goroutine中使用命名的返回值使用channel传递错误而非共享变量考虑使用golang.org/x/sync/errgroup通过掌握这些并发模式我们可以编写出高效、可靠的并发Go程序。关键是根据具体场景选择合适的模式并注意避免常见的并发陷阱。