I’ve been working with Go for nearly a decade now, and what still impresses me is its elegant approach to concurrency. When building high-performance systems, understanding these patterns isn’t just academic—it’s essential. Let me share the most effective concurrent design patterns I’ve implemented in production environments.
Worker Pool Pattern
The worker pool pattern creates a fixed number of goroutines to process tasks from a shared queue. This approach prevents unbounded resource consumption while maintaining optimal throughput.
func workerPool(numWorkers int, jobs <-chan Job, results chan<- Result) {
var wg sync.WaitGroup
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go func(workerID int) {
defer wg.Done()
for job := range jobs {
result := process(job)
results <- result
log.Printf("Worker %d processed job %v", workerID, job.ID)
}
}(i)
}
wg.Wait()
close(results)
}
func process(job Job) Result {
// Process the job
return Result{JobID: job.ID, Data: processedData}
}
I’ve found that sizing the worker pool correctly is critical. Too few workers underutilize your resources, while too many can cause contention. A good starting point is to match the number of CPU cores, then adjust based on profiling.
Fan-Out, Fan-In Pattern
When tasks can be processed independently, the fan-out/fan-in pattern shines. It distributes work across multiple goroutines (fan-out), then collects and combines their results (fan-in).
func fanOutFanIn(tasks []Task) []Result {
numWorkers := runtime.NumCPU()
tasksChan := make(chan Task, len(tasks))
resultsChan := make(chan Result, len(tasks))
// Fan out: start multiple workers
for i := 0; i < numWorkers; i++ {
go func() {
for task := range tasksChan {
result := processTask(task)
resultsChan <- result
}
}()
}
// Send all tasks
for _, task := range tasks {
tasksChan <- task
}
close(tasksChan)
// Fan in: collect all results
var results []Result
for i := 0; i < len(tasks); i++ {
results = append(results, <-resultsChan)
}
return results
}
This pattern works exceptionally well for CPU-bound tasks like data processing or computation. I’ve used this to parallelize image processing, achieving near-linear speedup on multi-core systems.
Rate Limiting Pattern
Rate limiting prevents overloading systems with too many concurrent requests. It’s vital when working with external APIs or shared resources.
func rateLimiter(requests <-chan Request, results chan<- Result, rate time.Duration) {
ticker := time.NewTicker(rate)
defer ticker.Stop()
for req := range requests {
<-ticker.C // Wait for tick
go func(r Request) {
resp := processRequest(r)
results <- resp
}(req)
}
}
// More sophisticated token bucket implementation
func tokenBucketRateLimiter(qps int) func() {
ticker := time.NewTicker(time.Second / time.Duration(qps))
defer ticker.Stop()
return func() {
<-ticker.C
}
}
In production systems, I implement adaptive rate limiting that responds to downstream service health. This prevents cascading failures during peak loads.
Pipeline Pattern
The pipeline pattern connects stages via channels, creating a sequence of operations where each stage processes data and passes it to the next.
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
func filter(in <-chan int, predicate func(int) bool) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if predicate(n) {
out <- n
}
}
}()
return out
}
// Usage
func runPipeline() {
c := generator(2, 3, 4, 5, 6, 7, 8, 9, 10)
squared := square(c)
evenSquares := filter(squared, func(n int) bool {
return n%2 == 0
})
for n := range evenSquares {
fmt.Println(n)
}
}
Pipelines create maintainable and modular code that’s easy to test and extend. Each stage focuses on a single responsibility, following the Unix philosophy of doing one thing well.
Timeout and Cancellation Pattern
Proper timeout and cancellation handling prevents resource leaks and unresponsive behavior.
func processWithTimeout(input Data) (Result, error) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
resultCh := make(chan Result, 1)
errCh := make(chan error, 1)
go func() {
result, err := longRunningOperation(input)
if err != nil {
errCh <- err
return
}
resultCh <- result
}()
select {
case result := <-resultCh:
return result, nil
case err := <-errCh:
return Result{}, err
case <-ctx.Done():
return Result{}, fmt.Errorf("operation timed out after 2 seconds")
}
}
In distributed systems, I always implement timeouts. Network calls, database queries, and any I/O operations should have appropriate deadlines to maintain system resilience.
Multiplexing with Select
The select statement allows goroutines to wait on multiple channel operations, creating powerful coordination patterns.
func multiplexChannels(ch1, ch2, ch3 <-chan Event, done <-chan struct{}) <-chan Event {
out := make(chan Event)
go func() {
defer close(out)
for {
select {
case ev := <-ch1:
out <- ev
case ev := <-ch2:
out <- ev
case ev := <-ch3:
out <- ev
case <-done:
return
default:
// Non-blocking case
// Can perform other work here
time.Sleep(100 * time.Millisecond)
}
}
}()
return out
}
I recently used this pattern to build a notification service that monitored multiple event sources. The select statement elegantly handled events from different systems without blocking.
Error Propagation Pattern
Error handling in concurrent code requires careful design to ensure errors are properly communicated and handled.
type Result struct {
Value interface{}
Err error
}
func concurrentTasks(tasks []Task) []Result {
results := make(chan Result, len(tasks))
for _, task := range tasks {
go func(t Task) {
value, err := t.Execute()
results <- Result{Value: value, Err: err}
}(task)
}
var taskResults []Result
for i := 0; i < len(tasks); i++ {
taskResults = append(taskResults, <-results)
}
return taskResults
}
// Usage example
func handleResults(results []Result) {
for i, result := range results {
if result.Err != nil {
log.Printf("Task %d failed: %v", i, result.Err)
continue
}
// Process successful result
processValue(result.Value)
}
}
This pattern avoids panic situations and provides structured error information that can be properly logged and responded to.
Thread-Safe Data Access Pattern
Coordinating access to shared data is one of the most challenging aspects of concurrent programming.
type SafeCounter struct {
mu sync.Mutex
v map[string]int
}
func (c *SafeCounter) Increment(key string) {
c.mu.Lock()
defer c.mu.Unlock()
c.v[key]++
}
func (c *SafeCounter) Value(key string) int {
c.mu.Lock()
defer c.mu.Unlock()
return c.v[key]
}
// Alternative using atomic operations
type AtomicCounter struct {
value int64
}
func (c *AtomicCounter) Increment() {
atomic.AddInt64(&c.value, 1)
}
func (c *AtomicCounter) Value() int64 {
return atomic.LoadInt64(&c.value)
}
For more complex scenarios, I’ve found that combining channels with mutexes creates clean, comprehensible thread-safe code:
type DataService struct {
cache map[string]Data
mu sync.RWMutex
updates chan DataUpdate
subscribed map[string][]chan Data
}
func (ds *DataService) Start() {
go func() {
for update := range ds.updates {
ds.processUpdate(update)
}
}()
}
func (ds *DataService) processUpdate(update DataUpdate) {
ds.mu.Lock()
ds.cache[update.Key] = update.Value
subscribers := ds.subscribed[update.Key]
ds.mu.Unlock()
// Notify subscribers
for _, ch := range subscribers {
select {
case ch <- update.Value:
// Subscriber received update
default:
// Subscriber is not ready, skip
}
}
}
This combined approach provides thread safety while maintaining high performance through minimal locking durations.
Performance Considerations
When implementing these patterns, keep these principles in mind:
- Benchmark different concurrency levels to find optimal worker counts.
- Consider the overhead of goroutine creation and channel operations.
- Use buffered channels when appropriate to reduce blocking.
- Implement proper cancellation to prevent goroutine leaks.
- Profile your application to identify concurrency bottlenecks.
I’ve found that channel-based concurrency works best for coordination and signaling, while mutex-based synchronization is often better for shared memory access patterns.
Real-World Application
The true power of these patterns emerges when combining them. Here’s a simplified version of a data processing system I built:
func ProcessDataStream(stream <-chan Data, concurrency int) <-chan Result {
// 1. Fan out to multiple workers
processors := make([]<-chan ProcessedData, concurrency)
for i := 0; i < concurrency; i++ {
processors[i] = processData(stream)
}
// 2. Fan in the results
merged := mergeResults(processors...)
// 3. Apply pipeline transformation
validated := validateResults(merged)
enriched := enrichResults(validated)
// 4. Apply rate limiting for external API calls
rateLimited := applyRateLimiting(enriched, 10)
return rateLimited
}
func processData(in <-chan Data) <-chan ProcessedData {
out := make(chan ProcessedData)
go func() {
defer close(out)
for data := range in {
// Process with timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
result, err := processWithTimeout(ctx, data)
cancel()
out <- ProcessedData{
Original: data,
Result: result,
Error: err,
}
}
}()
return out
}
This system combines worker pools, fan-out/fan-in, pipelines, timeouts, and rate limiting into a cohesive and resilient data processing architecture.
Conclusion
Go’s concurrency model is powerful but requires discipline and understanding of these patterns. By mastering these essential patterns, you can build applications that fully utilize modern hardware while maintaining code clarity and robustness.
When designing concurrent systems, I focus first on the data flow and separation of concerns, then apply the appropriate patterns to each component. This approach leads to systems that scale gracefully and remain maintainable as they grow.
The most elegant concurrent programs often combine multiple patterns to create a system that’s greater than the sum of its parts. With practice, you’ll develop an intuition for which patterns best fit your specific requirements.