Concurrency is a cornerstone of Go’s design, offering developers powerful tools to build efficient and scalable applications. As I’ve delved deeper into Go’s concurrency model, I’ve discovered several patterns that have significantly improved my ability to write parallel code. In this article, I’ll share ten of these patterns, each accompanied by practical examples and insights from my own experiences.
- Fan-Out, Fan-In
The fan-out, fan-in pattern is one of the most versatile concurrency patterns in Go. It involves distributing work across multiple goroutines (fan-out) and then collecting the results (fan-in). This pattern is particularly useful when dealing with independent, parallelizable tasks.
Here’s an example of how to implement this pattern:
func fanOutFanIn(tasks []int) []int {
numWorkers := 3
tasksChan := make(chan int, len(tasks))
resultsChan := make(chan int, len(tasks))
for _, task := range tasks {
tasksChan <- task
}
close(tasksChan)
var wg sync.WaitGroup
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go func() {
defer wg.Done()
for task := range tasksChan {
result := processTask(task)
resultsChan <- result
}
}()
}
go func() {
wg.Wait()
close(resultsChan)
}()
var results []int
for result := range resultsChan {
results = append(results, result)
}
return results
}
func processTask(task int) int {
// Simulating some work
time.Sleep(time.Millisecond * time.Duration(task))
return task * 2
}
In this example, we distribute tasks across multiple worker goroutines and collect the results in a single channel. This pattern can significantly improve performance when dealing with I/O-bound or CPU-intensive tasks.
- Worker Pool
The worker pool pattern is similar to fan-out, fan-in, but with a fixed number of worker goroutines that continuously process tasks from a queue. This pattern is excellent for managing resources and controlling concurrency levels.
Here’s an implementation of a worker pool:
func workerPool(tasks []int, numWorkers int) []int {
tasksChan := make(chan int, len(tasks))
resultsChan := make(chan int, len(tasks))
for _, task := range tasks {
tasksChan <- task
}
close(tasksChan)
var wg sync.WaitGroup
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go func() {
defer wg.Done()
for task := range tasksChan {
result := processTask(task)
resultsChan <- result
}
}()
}
go func() {
wg.Wait()
close(resultsChan)
}()
var results []int
for result := range resultsChan {
results = append(results, result)
}
return results
}
This pattern is particularly useful when you need to limit the number of concurrent operations, such as database connections or API calls.
- Pipeline
The pipeline pattern involves breaking down a complex process into a series of stages, each performed by a separate goroutine. This pattern is excellent for processing streams of data or implementing complex workflows.
Here’s an example of a simple pipeline:
func generateNumbers(done <-chan struct{}) <-chan int {
numbers := make(chan int)
go func() {
defer close(numbers)
for i := 0; ; i++ {
select {
case <-done:
return
case numbers <- i:
}
}
}()
return numbers
}
func square(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case <-done:
return
case out <- n * n:
}
}
}()
return out
}
func main() {
done := make(chan struct{})
defer close(done)
numbers := generateNumbers(done)
squares := square(done, numbers)
for i := 0; i < 10; i++ {
fmt.Println(<-squares)
}
}
This pipeline generates numbers, squares them, and prints the results. Each stage is implemented as a separate function, making the code modular and easy to maintain.
- Timeout Pattern
When dealing with concurrent operations, it’s crucial to implement timeouts to prevent goroutines from blocking indefinitely. The timeout pattern uses Go’s select statement to implement this behavior.
Here’s an example:
func timeoutPattern(ctx context.Context) (string, error) {
ch := make(chan string, 1)
go func() {
// Simulating a long-running operation
time.Sleep(2 * time.Second)
ch <- "Operation completed"
}()
select {
case result := <-ch:
return result, nil
case <-ctx.Done():
return "", ctx.Err()
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
result, err := timeoutPattern(ctx)
if err != nil {
fmt.Println("Operation timed out:", err)
} else {
fmt.Println(result)
}
}
This pattern ensures that our concurrent operations don’t hang indefinitely, improving the reliability of our applications.
- Cancellation Pattern
The cancellation pattern allows us to stop ongoing operations when they’re no longer needed. This is particularly useful for long-running operations or when implementing user-initiated cancellations.
Here’s an example using context:
func cancellableOperation(ctx context.Context) error {
for i := 0; i < 5; i++ {
select {
case <-ctx.Done():
return ctx.Err()
default:
// Simulating some work
time.Sleep(100 * time.Millisecond)
fmt.Println("Operation step", i+1)
}
}
return nil
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
go func() {
// Simulate a user cancellation after 250ms
time.Sleep(250 * time.Millisecond)
cancel()
}()
err := cancellableOperation(ctx)
if err != nil {
fmt.Println("Operation was cancelled:", err)
}
}
This pattern allows us to gracefully handle cancellations, freeing up resources and preventing unnecessary work.
- Semaphore Pattern
The semaphore pattern is used to limit the number of goroutines that can access a resource simultaneously. This is particularly useful when dealing with limited resources or when you need to control the level of concurrency.
Here’s an implementation using a buffered channel:
type Semaphore struct {
sem chan struct{}
}
func NewSemaphore(maxConcurrency int) *Semaphore {
return &Semaphore{
sem: make(chan struct{}, maxConcurrency),
}
}
func (s *Semaphore) Acquire() {
s.sem <- struct{}{}
}
func (s *Semaphore) Release() {
<-s.sem
}
func main() {
sem := NewSemaphore(3)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
sem.Acquire()
defer sem.Release()
// Simulating some work
fmt.Printf("Worker %d started\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d finished\n", id)
}(i)
}
wg.Wait()
}
This pattern ensures that we don’t overwhelm our system or external resources by limiting the number of concurrent operations.
- Error Group Pattern
The error group pattern, implemented in the golang.org/x/sync/errgroup package, allows us to run multiple goroutines concurrently and collect any errors that occur. This is particularly useful when you need to perform multiple operations in parallel and want to handle errors gracefully.
Here’s an example:
import (
"fmt"
"net/http"
"golang.org/x/sync/errgroup"
)
func fetchURL(url string) error {
resp, err := http.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
return nil
}
func main() {
var g errgroup.Group
urls := []string{
"https://www.google.com",
"https://www.github.com",
"https://www.invalid-url.com",
}
for _, url := range urls {
url := url // https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error {
return fetchURL(url)
})
}
if err := g.Wait(); err != nil {
fmt.Println("Error occurred:", err)
} else {
fmt.Println("All URLs fetched successfully")
}
}
This pattern simplifies error handling in concurrent operations and allows us to fail fast if any goroutine encounters an error.
- Rate Limiting Pattern
Rate limiting is crucial when dealing with external APIs or resources that have usage limits. Go’s time.Ticker can be used to implement a simple rate limiter.
Here’s an example:
import (
"fmt"
"time"
)
func rateLimitedOperation(rateLimiter *time.Ticker) {
for range rateLimiter.C {
// Perform rate-limited operation
fmt.Println("Performing operation at", time.Now())
}
}
func main() {
rateLimiter := time.NewTicker(200 * time.Millisecond)
defer rateLimiter.Stop()
go rateLimitedOperation(rateLimiter)
// Run for 2 seconds
time.Sleep(2 * time.Second)
}
This pattern ensures that we don’t exceed API rate limits or overload external services.
- Context Propagation Pattern
When dealing with complex systems, it’s important to propagate context through different layers of your application. This pattern allows you to pass request-scoped values, cancellation signals, and deadlines across API boundaries and between processes.
Here’s an example:
import (
"context"
"fmt"
"time"
)
func operation1(ctx context.Context) error {
// Simulate some work
time.Sleep(100 * time.Millisecond)
select {
case <-ctx.Done():
return ctx.Err()
default:
fmt.Println("Operation 1 completed")
return nil
}
}
func operation2(ctx context.Context) error {
// Simulate some work
time.Sleep(200 * time.Millisecond)
select {
case <-ctx.Done():
return ctx.Err()
default:
fmt.Println("Operation 2 completed")
return nil
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond)
defer cancel()
err := operation1(ctx)
if err != nil {
fmt.Println("Operation 1 error:", err)
return
}
err = operation2(ctx)
if err != nil {
fmt.Println("Operation 2 error:", err)
return
}
}
This pattern ensures that all parts of your application respect timeouts and cancellation signals, improving the overall responsiveness and reliability of your system.
- Throttling Pattern
Throttling is useful when you want to limit the rate at which operations are performed, but still want to process all items eventually. This pattern is similar to rate limiting but allows for bursts of activity.
Here’s an implementation using a token bucket algorithm:
import (
"fmt"
"time"
)
type Throttle struct {
rate time.Duration
maxTokens int
tokens int
lastRefill time.Time
}
func NewThrottle(rate time.Duration, maxTokens int) *Throttle {
return &Throttle{
rate: rate,
maxTokens: maxTokens,
tokens: maxTokens,
lastRefill: time.Now(),
}
}
func (t *Throttle) Allow() bool {
now := time.Now()
elapsed := now.Sub(t.lastRefill)
refill := int(elapsed / t.rate)
if refill > 0 {
t.tokens = min(t.maxTokens, t.tokens+refill)
t.lastRefill = now
}
if t.tokens > 0 {
t.tokens--
return true
}
return false
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
func main() {
throttle := NewThrottle(100*time.Millisecond, 3)
for i := 0; i < 10; i++ {
if throttle.Allow() {
fmt.Printf("Operation %d allowed at %v\n", i, time.Now())
} else {
fmt.Printf("Operation %d throttled at %v\n", i, time.Now())
}
time.Sleep(50 * time.Millisecond)
}
}
This pattern allows for more flexible rate control, permitting bursts of activity while still maintaining an overall limit on operations.
In conclusion, these ten concurrency patterns form a powerful toolkit for Go developers. By leveraging these patterns, we can create efficient, scalable, and robust concurrent systems. As with any powerful tool, it’s important to use these patterns judiciously and in the right contexts.
Throughout my journey with Go, I’ve found that mastering these patterns has significantly improved my ability to write high-performance, concurrent code. However, it’s crucial to remember that concurrency adds complexity to our programs. Always strive for the simplest solution that meets your performance requirements.
As you implement these patterns in your own projects, you’ll likely discover new variations and combinations that suit your specific needs. The key is to understand the underlying principles and adapt them to your unique challenges.
Remember, effective concurrent programming is as much about managing complexity as it is about improving performance. By using these patterns thoughtfully, we can create Go programs that are not only fast and efficient but also clear and maintainable.