Advanced Concurrency Patterns
55 minWorker pools distribute work across multiple goroutines, enabling controlled concurrency. A worker pool limits the number of concurrent goroutines, preventing resource exhaustion. Workers process tasks from a channel and send results to another channel. Worker pools are useful for processing large numbers of tasks efficiently. Understanding worker pools helps you manage concurrency effectively.
Fan-out/fan-in patterns process data in parallel by distributing work to multiple goroutines (fan-out) and collecting results (fan-in). Fan-out spreads work across workers; fan-in combines results. This pattern enables efficient parallel processing. Understanding fan-out/fan-in helps you process large datasets concurrently.
The `context` package manages goroutine lifecycle and cancellation, providing a way to signal cancellation, set deadlines, and pass request-scoped values. Context is passed through function calls and enables graceful cancellation. Understanding context helps you write cancellable, time-bounded operations.
The `select` statement handles multiple channel operations, allowing goroutines to wait on multiple channels simultaneously. Select enables non-blocking operations, timeouts, and coordination. Understanding select helps you coordinate complex concurrent operations.
Other patterns include pipelines (processing data through stages), rate limiting (controlling request rates), and semaphores (controlling resource access). Understanding these patterns helps you build robust concurrent systems.
Best practices include using context for cancellation, implementing proper error handling in concurrent code, using worker pools for controlled concurrency, and avoiding goroutine leaks. Concurrent code should be safe, efficient, and maintainable. Understanding advanced concurrency patterns enables you to build scalable, robust systems.
Key Concepts
- Worker pools distribute work across multiple goroutines.
- Fan-out/fan-in patterns process data in parallel.
- context manages goroutine lifecycle and cancellation.
- select handles multiple channel operations.
- Advanced patterns enable efficient concurrent processing.
Learning Objectives
Master
- Implementing worker pools for controlled concurrency
- Using fan-out/fan-in patterns for parallel processing
- Managing goroutine lifecycle with context
- Coordinating complex concurrent operations
Develop
- Advanced concurrency design thinking
- Understanding when and how to use concurrency patterns
- Building scalable, robust concurrent systems
Tips
- Use worker pools to limit concurrent goroutines.
- Use context for cancellation and timeouts.
- Implement proper error handling in concurrent code.
- Avoid goroutine leaks by properly closing channels.
Common Pitfalls
- Creating too many goroutines, causing resource exhaustion.
- Not using context for cancellation, causing goroutine leaks.
- Not handling errors in goroutines, causing silent failures.
- Race conditions from improper synchronization.
Summary
- Worker pools enable controlled concurrency.
- Fan-out/fan-in patterns enable efficient parallel processing.
- context manages goroutine lifecycle and cancellation.
- select coordinates multiple channel operations.
- Understanding advanced patterns enables scalable concurrent systems.
Exercise
Implement a worker pool with fan-out/fan-in pattern for processing tasks.
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
)
// Task represents a unit of work
type Task struct {
ID int
Data string
Duration time.Duration
}
// Result represents the result of processing a task
type Result struct {
TaskID int
Data string
Processed bool
Error error
}
// Worker pool with fan-out/fan-in pattern
func workerPool(ctx context.Context, numWorkers int, tasks <-chan Task) <-chan Result {
results := make(chan Result)
var wg sync.WaitGroup
// Fan-out: Start workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(ctx, i, tasks, results, &wg)
}
// Fan-in: Collect results
go func() {
wg.Wait()
close(results)
}()
return results
}
func worker(ctx context.Context, id int, tasks <-chan Task, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case task, ok := <-tasks:
if !ok {
return
}
// Process task
result := processTask(task)
results <- result
log.Printf("Worker %d processed task %d", id, task.ID)
case <-ctx.Done():
log.Printf("Worker %d cancelled", id)
return
}
}
}
func processTask(task Task) Result {
// Simulate work
time.Sleep(task.Duration)
// Simulate occasional error
if task.ID%10 == 0 {
return Result{
TaskID: task.ID,
Data: task.Data,
Processed: false,
Error: fmt.Errorf("simulated error for task %d", task.ID),
}
}
return Result{
TaskID: task.ID,
Data: task.Data + " (processed)",
Processed: true,
Error: nil,
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Create tasks
numTasks := 50
tasks := make(chan Task, numTasks)
for i := 1; i <= numTasks; i++ {
tasks <- Task{
ID: i,
Data: fmt.Sprintf("data-%d", i),
Duration: time.Duration(i%5+1) * 100 * time.Millisecond,
}
}
close(tasks)
// Process tasks with worker pool
numWorkers := 5
results := workerPool(ctx, numWorkers, tasks)
// Collect results
var processed, failed int
for result := range results {
if result.Processed {
processed++
log.Printf("Task %d completed: %s", result.TaskID, result.Data)
} else {
failed++
log.Printf("Task %d failed: %v", result.TaskID, result.Error)
}
}
log.Printf("Processing complete. Processed: %d, Failed: %d", processed, failed)
}