Back to Curriculum

Advanced Concurrency Patterns

📚 Lesson 13 of 16 ⏱️ 55 min

Advanced Concurrency Patterns

55 min

Worker pools distribute work across multiple goroutines, enabling controlled concurrency. A worker pool limits the number of concurrent goroutines, preventing resource exhaustion. Workers process tasks from a channel and send results to another channel. Worker pools are useful for processing large numbers of tasks efficiently. Understanding worker pools helps you manage concurrency effectively.

Fan-out/fan-in patterns process data in parallel by distributing work to multiple goroutines (fan-out) and collecting results (fan-in). Fan-out spreads work across workers; fan-in combines results. This pattern enables efficient parallel processing. Understanding fan-out/fan-in helps you process large datasets concurrently.

The `context` package manages goroutine lifecycle and cancellation, providing a way to signal cancellation, set deadlines, and pass request-scoped values. Context is passed through function calls and enables graceful cancellation. Understanding context helps you write cancellable, time-bounded operations.

The `select` statement handles multiple channel operations, allowing goroutines to wait on multiple channels simultaneously. Select enables non-blocking operations, timeouts, and coordination. Understanding select helps you coordinate complex concurrent operations.

Other patterns include pipelines (processing data through stages), rate limiting (controlling request rates), and semaphores (controlling resource access). Understanding these patterns helps you build robust concurrent systems.

Best practices include using context for cancellation, implementing proper error handling in concurrent code, using worker pools for controlled concurrency, and avoiding goroutine leaks. Concurrent code should be safe, efficient, and maintainable. Understanding advanced concurrency patterns enables you to build scalable, robust systems.

Key Concepts

  • Worker pools distribute work across multiple goroutines.
  • Fan-out/fan-in patterns process data in parallel.
  • context manages goroutine lifecycle and cancellation.
  • select handles multiple channel operations.
  • Advanced patterns enable efficient concurrent processing.

Learning Objectives

Master

  • Implementing worker pools for controlled concurrency
  • Using fan-out/fan-in patterns for parallel processing
  • Managing goroutine lifecycle with context
  • Coordinating complex concurrent operations

Develop

  • Advanced concurrency design thinking
  • Understanding when and how to use concurrency patterns
  • Building scalable, robust concurrent systems

Tips

  • Use worker pools to limit concurrent goroutines.
  • Use context for cancellation and timeouts.
  • Implement proper error handling in concurrent code.
  • Avoid goroutine leaks by properly closing channels.

Common Pitfalls

  • Creating too many goroutines, causing resource exhaustion.
  • Not using context for cancellation, causing goroutine leaks.
  • Not handling errors in goroutines, causing silent failures.
  • Race conditions from improper synchronization.

Summary

  • Worker pools enable controlled concurrency.
  • Fan-out/fan-in patterns enable efficient parallel processing.
  • context manages goroutine lifecycle and cancellation.
  • select coordinates multiple channel operations.
  • Understanding advanced patterns enables scalable concurrent systems.

Exercise

Implement a worker pool with fan-out/fan-in pattern for processing tasks.

package main

import (
    "context"
    "fmt"
    "log"
    "sync"
    "time"
)

// Task represents a unit of work
type Task struct {
    ID       int
    Data     string
    Duration time.Duration
}

// Result represents the result of processing a task
type Result struct {
    TaskID   int
    Data     string
    Processed bool
    Error    error
}

// Worker pool with fan-out/fan-in pattern
func workerPool(ctx context.Context, numWorkers int, tasks <-chan Task) <-chan Result {
    results := make(chan Result)
    var wg sync.WaitGroup
    
    // Fan-out: Start workers
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go worker(ctx, i, tasks, results, &wg)
    }
    
    // Fan-in: Collect results
    go func() {
        wg.Wait()
        close(results)
    }()
    
    return results
}

func worker(ctx context.Context, id int, tasks <-chan Task, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for {
        select {
        case task, ok := <-tasks:
            if !ok {
                return
            }
            
            // Process task
            result := processTask(task)
            results <- result
            
            log.Printf("Worker %d processed task %d", id, task.ID)
            
        case <-ctx.Done():
            log.Printf("Worker %d cancelled", id)
            return
        }
    }
}

func processTask(task Task) Result {
    // Simulate work
    time.Sleep(task.Duration)
    
    // Simulate occasional error
    if task.ID%10 == 0 {
        return Result{
            TaskID:    task.ID,
            Data:      task.Data,
            Processed: false,
            Error:     fmt.Errorf("simulated error for task %d", task.ID),
        }
    }
    
    return Result{
        TaskID:    task.ID,
        Data:      task.Data + " (processed)",
        Processed: true,
        Error:     nil,
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    
    // Create tasks
    numTasks := 50
    tasks := make(chan Task, numTasks)
    
    for i := 1; i <= numTasks; i++ {
        tasks <- Task{
            ID:       i,
            Data:     fmt.Sprintf("data-%d", i),
            Duration: time.Duration(i%5+1) * 100 * time.Millisecond,
        }
    }
    close(tasks)
    
    // Process tasks with worker pool
    numWorkers := 5
    results := workerPool(ctx, numWorkers, tasks)
    
    // Collect results
    var processed, failed int
    for result := range results {
        if result.Processed {
            processed++
            log.Printf("Task %d completed: %s", result.TaskID, result.Data)
        } else {
            failed++
            log.Printf("Task %d failed: %v", result.TaskID, result.Error)
        }
    }
    
    log.Printf("Processing complete. Processed: %d, Failed: %d", processed, failed)
}

Code Editor

Output