Go语言并发模式与最佳实践
Go语言并发模式与最佳实践并发编程是Go语言的核心特性之一。本文将深入探讨Go语言中的并发模式和最佳实践帮助你编写高效、安全的并发代码。一、并发基础1.1 Goroutine基础package main import ( fmt time ) func sayHello(name string) { for i : 0; i 3; i { fmt.Printf(Hello, %s! Count: %d\n, name, i) time.Sleep(100 * time.Millisecond) } } func main() { // 启动多个goroutine go sayHello(Alice) go sayHello(Bob) // 主goroutine等待其他goroutine完成 time.Sleep(500 * time.Millisecond) fmt.Println(Main goroutine finished) }1.2 Channel基础func main() { // 创建一个无缓冲channel ch : make(chan string) go func() { ch - Hello from goroutine }() // 接收channel中的值 msg : -ch fmt.Println(msg) }二、常见并发模式2.1 生产者-消费者模式func producer(ch chan- int, count int) { for i : 0; i count; i { ch - i fmt.Printf(Produced: %d\n, i) } close(ch) } func consumer(ch -chan int, id int) { for num : range ch { fmt.Printf(Consumer %d received: %d\n, id, num) time.Sleep(100 * time.Millisecond) } } func main() { ch : make(chan int, 5) // 启动生产者 go producer(ch, 10) // 启动多个消费者 for i : 0; i 3; i { go consumer(ch, i) } time.Sleep(2 * time.Second) }2.2 Fan-Out模式func generateNumbers(count int) -chan int { ch : make(chan int) go func() { for i : 0; i count; i { ch - i } close(ch) }() return ch } func square(in -chan int) -chan int { out : make(chan int) go func() { for num : range in { out - num * num } close(out) }() return out } func main() { nums : generateNumbers(10) // Fan-out: 多个goroutine处理同一个channel sq1 : square(nums) sq2 : square(nums) sq3 : square(nums) // Fan-in: 合并多个channel的输出 merged : merge(sq1, sq2, sq3) for num : range merged { fmt.Println(num) } } func merge(channels ...-chan int) -chan int { out : make(chan int) var wg sync.WaitGroup for _, ch : range channels { wg.Add(1) go func(c -chan int) { defer wg.Done() for num : range c { out - num } }(ch) } go func() { wg.Wait() close(out) }() return out }2.3 Worker Pool模式type Worker struct { id int jobs -chan int results chan- int } func NewWorker(id int, jobs -chan int, results chan- int) *Worker { return Worker{ id: id, jobs: jobs, results: results, } } func (w *Worker) Start() { go func() { for job : range w.jobs { fmt.Printf(Worker %d processing job %d\n, w.id, job) time.Sleep(200 * time.Millisecond) w.results - job * 2 } }() } func main() { const numJobs 10 const numWorkers 3 jobs : make(chan int, numJobs) results : make(chan int, numJobs) // 启动worker for i : 0; i numWorkers; i { worker : NewWorker(i, jobs, results) worker.Start() } // 发送任务 for i : 0; i numJobs; i { jobs - i } close(jobs) // 收集结果 for i : 0; i numJobs; i { -results } }三、同步原语3.1 Mutextype Counter struct { mu sync.Mutex value int } func (c *Counter) Increment() { c.mu.Lock() defer c.mu.Unlock() c.value } func (c *Counter) Get() int { c.mu.Lock() defer c.mu.Unlock() return c.value }3.2 RWMutextype DataStore struct { mu sync.RWMutex data map[string]string } func (ds *DataStore) Get(key string) (string, bool) { ds.mu.RLock() defer ds.mu.RUnlock() value, ok : ds.data[key] return value, ok } func (ds *DataStore) Set(key, value string) { ds.mu.Lock() defer ds.mu.Unlock() ds.data[key] value }3.3 WaitGroupfunc processItem(item int) { fmt.Printf(Processing item %d\n, item) time.Sleep(100 * time.Millisecond) } func main() { items : []int{1, 2, 3, 4, 5} var wg sync.WaitGroup wg.Add(len(items)) for _, item : range items { go func(i int) { defer wg.Done() processItem(i) }(item) } wg.Wait() fmt.Println(All items processed) }3.4 Oncetype Singleton struct{} var ( instance *Singleton once sync.Once ) func GetInstance() *Singleton { once.Do(func() { instance Singleton{} fmt.Println(Instance created) }) return instance }四、并发安全的数据结构type SafeMap struct { mu sync.RWMutex data map[string]interface{} } func NewSafeMap() *SafeMap { return SafeMap{ data: make(map[string]interface{}), } } func (sm *SafeMap) Get(key string) (interface{}, bool) { sm.mu.RLock() defer sm.mu.RUnlock() value, ok : sm.data[key] return value, ok } func (sm *SafeMap) Set(key string, value interface{}) { sm.mu.Lock() defer sm.mu.Unlock() sm.data[key] value } func (sm *SafeMap) Delete(key string) { sm.mu.Lock() defer sm.mu.Unlock() delete(sm.data, key) } func (sm *SafeMap) Len() int { sm.mu.RLock() defer sm.mu.RUnlock() return len(sm.data) }五、错误处理与并发type TaskResult struct { Result interface{} Err error } func executeTask(task func() (interface{}, error)) -chan TaskResult { ch : make(chan TaskResult, 1) go func() { result, err : task() ch - TaskResult{Result: result, Err: err} }() return ch } func main() { tasks : []func() (interface{}, error){ func() (interface{}, error) { return task 1, nil }, func() (interface{}, error) { return nil, fmt.Errorf(task 2 failed) }, func() (interface{}, error) { return task 3, nil }, } channels : make([]-chan TaskResult, len(tasks)) for i, task : range tasks { channels[i] executeTask(task) } for _, ch : range channels { result : -ch if result.Err ! nil { fmt.Printf(Error: %v\n, result.Err) } else { fmt.Printf(Result: %v\n, result.Result) } } }六、上下文控制func worker(ctx context.Context, id int) { for { select { case -ctx.Done(): fmt.Printf(Worker %d stopped\n, id) return default: fmt.Printf(Worker %d working\n, id) time.Sleep(100 * time.Millisecond) } } } func main() { ctx, cancel : context.WithCancel(context.Background()) // 启动多个worker for i : 0; i 3; i { go worker(ctx, i) } // 3秒后取消所有worker time.Sleep(3 * time.Second) cancel() time.Sleep(500 * time.Millisecond) fmt.Println(All workers stopped) }七、总结本文介绍了Go语言并发编程的核心模式和最佳实践Goroutine和ChannelGo并发的基础生产者-消费者模式解耦数据生产和消费Fan-Out/Fan-In模式并行处理和结果聚合Worker Pool模式控制并发数量同步原语Mutex、RWMutex、WaitGroup、Once并发安全数据结构线程安全的Map实现错误处理并发任务的错误收集上下文控制优雅地取消并发任务Go语言的并发模型简单而强大通过channel和goroutine可以轻松构建高效的并发系统。