package domain import "time" type Event interface { EventName() string OccurredAt() time.Time
} type OrderPlaced struct { OrderID string CustomerID string Total float64 Timestamp time.Time
} func (e OrderPlaced) EventName() string { return "order.placed"
} func (e OrderPlaced) OccurredAt() time.Time { return e.Timestamp
}
package domain import "time" type Event interface { EventName() string OccurredAt() time.Time
} type OrderPlaced struct { OrderID string CustomerID string Total float64 Timestamp time.Time
} func (e OrderPlaced) EventName() string { return "order.placed"
} func (e OrderPlaced) OccurredAt() time.Time { return e.Timestamp
}
package domain import "time" type Event interface { EventName() string OccurredAt() time.Time
} type OrderPlaced struct { OrderID string CustomerID string Total float64 Timestamp time.Time
} func (e OrderPlaced) EventName() string { return "order.placed"
} func (e OrderPlaced) OccurredAt() time.Time { return e.Timestamp
}
package domain import "context" type EventPublisher interface { Publish(ctx context.Context, events ...Event) error
}
package domain import "context" type EventPublisher interface { Publish(ctx context.Context, events ...Event) error
}
package domain import "context" type EventPublisher interface { Publish(ctx context.Context, events ...Event) error
}
package domain import ( "context" "time" "github.com/google/uuid"
) type OrderService struct { repo OrderRepository publisher EventPublisher
} func NewOrderService( repo OrderRepository, publisher EventPublisher,
) *OrderService { return &OrderService{ repo: repo, publisher: publisher, }
}
package domain import ( "context" "time" "github.com/google/uuid"
) type OrderService struct { repo OrderRepository publisher EventPublisher
} func NewOrderService( repo OrderRepository, publisher EventPublisher,
) *OrderService { return &OrderService{ repo: repo, publisher: publisher, }
}
package domain import ( "context" "time" "github.com/google/uuid"
) type OrderService struct { repo OrderRepository publisher EventPublisher
} func NewOrderService( repo OrderRepository, publisher EventPublisher,
) *OrderService { return &OrderService{ repo: repo, publisher: publisher, }
}
func (s *OrderService) PlaceOrder( ctx context.Context, customerID string, total float64,
) (Order, error) { order := Order{ ID: uuid.New().String(), CustomerID: customerID, Total: total, Status: StatusPending, CreatedAt: time.Now(), } if err := s.repo.Save(ctx, order); err != nil { return Order{}, err } event := OrderPlaced{ OrderID: order.ID, CustomerID: order.CustomerID, Total: order.Total, Timestamp: order.CreatedAt, } if err := s.publisher.Publish(ctx, event); err != nil { return Order{}, err } return order, nil
}
func (s *OrderService) PlaceOrder( ctx context.Context, customerID string, total float64,
) (Order, error) { order := Order{ ID: uuid.New().String(), CustomerID: customerID, Total: total, Status: StatusPending, CreatedAt: time.Now(), } if err := s.repo.Save(ctx, order); err != nil { return Order{}, err } event := OrderPlaced{ OrderID: order.ID, CustomerID: order.CustomerID, Total: order.Total, Timestamp: order.CreatedAt, } if err := s.publisher.Publish(ctx, event); err != nil { return Order{}, err } return order, nil
}
func (s *OrderService) PlaceOrder( ctx context.Context, customerID string, total float64,
) (Order, error) { order := Order{ ID: uuid.New().String(), CustomerID: customerID, Total: total, Status: StatusPending, CreatedAt: time.Now(), } if err := s.repo.Save(ctx, order); err != nil { return Order{}, err } event := OrderPlaced{ OrderID: order.ID, CustomerID: order.CustomerID, Total: order.Total, Timestamp: order.CreatedAt, } if err := s.publisher.Publish(ctx, event); err != nil { return Order{}, err } return order, nil
}
package eventbus import ( "context" "log" "sync" "yourapp/domain"
) type Handler func(ctx context.Context, event domain.Event) type ChannelPublisher struct { ch chan domain.Event handlers map[string][]Handler mu sync.RWMutex
} func NewChannelPublisher(bufSize int) *ChannelPublisher { return &ChannelPublisher{ ch: make(chan domain.Event, bufSize), handlers: make(map[string][]Handler), }
} func (p *ChannelPublisher) Subscribe( eventName string, handler Handler,
) { p.mu.Lock() defer p.mu.Unlock() p.handlers[eventName] = append( p.handlers[eventName], handler, )
}
package eventbus import ( "context" "log" "sync" "yourapp/domain"
) type Handler func(ctx context.Context, event domain.Event) type ChannelPublisher struct { ch chan domain.Event handlers map[string][]Handler mu sync.RWMutex
} func NewChannelPublisher(bufSize int) *ChannelPublisher { return &ChannelPublisher{ ch: make(chan domain.Event, bufSize), handlers: make(map[string][]Handler), }
} func (p *ChannelPublisher) Subscribe( eventName string, handler Handler,
) { p.mu.Lock() defer p.mu.Unlock() p.handlers[eventName] = append( p.handlers[eventName], handler, )
}
package eventbus import ( "context" "log" "sync" "yourapp/domain"
) type Handler func(ctx context.Context, event domain.Event) type ChannelPublisher struct { ch chan domain.Event handlers map[string][]Handler mu sync.RWMutex
} func NewChannelPublisher(bufSize int) *ChannelPublisher { return &ChannelPublisher{ ch: make(chan domain.Event, bufSize), handlers: make(map[string][]Handler), }
} func (p *ChannelPublisher) Subscribe( eventName string, handler Handler,
) { p.mu.Lock() defer p.mu.Unlock() p.handlers[eventName] = append( p.handlers[eventName], handler, )
}
func (p *ChannelPublisher) Publish( ctx context.Context, events ...domain.Event,
) error { for _, e := range events { select { case p.ch <- e: case <-ctx.Done(): return ctx.Err() } } return nil
} func (p *ChannelPublisher) Start(ctx context.Context) { go func() { for { select { case e := <-p.ch: p.mu.RLock() handlers := p.handlers[e.EventName()] p.mu.RUnlock() for _, h := range handlers { h(ctx, e) } case <-ctx.Done(): log.Println("event bus stopped") return } } }()
}
func (p *ChannelPublisher) Publish( ctx context.Context, events ...domain.Event,
) error { for _, e := range events { select { case p.ch <- e: case <-ctx.Done(): return ctx.Err() } } return nil
} func (p *ChannelPublisher) Start(ctx context.Context) { go func() { for { select { case e := <-p.ch: p.mu.RLock() handlers := p.handlers[e.EventName()] p.mu.RUnlock() for _, h := range handlers { h(ctx, e) } case <-ctx.Done(): log.Println("event bus stopped") return } } }()
}
func (p *ChannelPublisher) Publish( ctx context.Context, events ...domain.Event,
) error { for _, e := range events { select { case p.ch <- e: case <-ctx.Done(): return ctx.Err() } } return nil
} func (p *ChannelPublisher) Start(ctx context.Context) { go func() { for { select { case e := <-p.ch: p.mu.RLock() handlers := p.handlers[e.EventName()] p.mu.RUnlock() for _, h := range handlers { h(ctx, e) } case <-ctx.Done(): log.Println("event bus stopped") return } } }()
}
package notification import ( "context" "fmt" "yourapp/domain"
) type EmailSender interface { Send( ctx context.Context, to, subject, body string, ) error
} type OrderNotifier struct { sender EmailSender lookup CustomerLookup
} type CustomerLookup interface { EmailByID( ctx context.Context, id string, ) (string, error)
} func NewOrderNotifier( sender EmailSender, lookup CustomerLookup,
) *OrderNotifier { return &OrderNotifier{ sender: sender, lookup: lookup, }
}
package notification import ( "context" "fmt" "yourapp/domain"
) type EmailSender interface { Send( ctx context.Context, to, subject, body string, ) error
} type OrderNotifier struct { sender EmailSender lookup CustomerLookup
} type CustomerLookup interface { EmailByID( ctx context.Context, id string, ) (string, error)
} func NewOrderNotifier( sender EmailSender, lookup CustomerLookup,
) *OrderNotifier { return &OrderNotifier{ sender: sender, lookup: lookup, }
}
package notification import ( "context" "fmt" "yourapp/domain"
) type EmailSender interface { Send( ctx context.Context, to, subject, body string, ) error
} type OrderNotifier struct { sender EmailSender lookup CustomerLookup
} type CustomerLookup interface { EmailByID( ctx context.Context, id string, ) (string, error)
} func NewOrderNotifier( sender EmailSender, lookup CustomerLookup,
) *OrderNotifier { return &OrderNotifier{ sender: sender, lookup: lookup, }
}
func (n *OrderNotifier) Handle( ctx context.Context, event domain.Event,
) { placed, ok := event.(domain.OrderPlaced) if !ok { return } email, err := n.lookup.EmailByID( ctx, placed.CustomerID, ) if err != nil { fmt.Printf( "lookup failed for %s: %v\n", placed.CustomerID, err, ) return } subject := fmt.Sprintf( "Order %s confirmed", placed.OrderID, ) body := fmt.Sprintf( "Your order of $%.2f is being processed.", placed.Total, ) if err := n.sender.Send( ctx, email, subject, body, ); err != nil { fmt.Printf( "email send failed for %s: %v\n", placed.OrderID, err, ) }
}
func (n *OrderNotifier) Handle( ctx context.Context, event domain.Event,
) { placed, ok := event.(domain.OrderPlaced) if !ok { return } email, err := n.lookup.EmailByID( ctx, placed.CustomerID, ) if err != nil { fmt.Printf( "lookup failed for %s: %v\n", placed.CustomerID, err, ) return } subject := fmt.Sprintf( "Order %s confirmed", placed.OrderID, ) body := fmt.Sprintf( "Your order of $%.2f is being processed.", placed.Total, ) if err := n.sender.Send( ctx, email, subject, body, ); err != nil { fmt.Printf( "email send failed for %s: %v\n", placed.OrderID, err, ) }
}
func (n *OrderNotifier) Handle( ctx context.Context, event domain.Event,
) { placed, ok := event.(domain.OrderPlaced) if !ok { return } email, err := n.lookup.EmailByID( ctx, placed.CustomerID, ) if err != nil { fmt.Printf( "lookup failed for %s: %v\n", placed.CustomerID, err, ) return } subject := fmt.Sprintf( "Order %s confirmed", placed.OrderID, ) body := fmt.Sprintf( "Your order of $%.2f is being processed.", placed.Total, ) if err := n.sender.Send( ctx, email, subject, body, ); err != nil { fmt.Printf( "email send failed for %s: %v\n", placed.OrderID, err, ) }
}
func main() { ctx, cancel := context.WithCancel( context.Background(), ) defer cancel() db := setupDB() repo := postgres.NewOrderRepository(db) bus := eventbus.NewChannelPublisher(256) smtpClient := smtp.NewClient(smtpConfig) customerRepo := postgres.NewCustomerRepository(db) notifier := notification.NewOrderNotifier( smtpClient, customerRepo, ) bus.Subscribe("order.placed", notifier.Handle) bus.Start(ctx) svc := domain.NewOrderService(repo, bus) handler := httphandler.New(svc) mux := http.NewServeMux() mux.HandleFunc("POST /orders", handler.Create()) log.Fatal(http.ListenAndServe(":8080", mux))
}
func main() { ctx, cancel := context.WithCancel( context.Background(), ) defer cancel() db := setupDB() repo := postgres.NewOrderRepository(db) bus := eventbus.NewChannelPublisher(256) smtpClient := smtp.NewClient(smtpConfig) customerRepo := postgres.NewCustomerRepository(db) notifier := notification.NewOrderNotifier( smtpClient, customerRepo, ) bus.Subscribe("order.placed", notifier.Handle) bus.Start(ctx) svc := domain.NewOrderService(repo, bus) handler := httphandler.New(svc) mux := http.NewServeMux() mux.HandleFunc("POST /orders", handler.Create()) log.Fatal(http.ListenAndServe(":8080", mux))
}
func main() { ctx, cancel := context.WithCancel( context.Background(), ) defer cancel() db := setupDB() repo := postgres.NewOrderRepository(db) bus := eventbus.NewChannelPublisher(256) smtpClient := smtp.NewClient(smtpConfig) customerRepo := postgres.NewCustomerRepository(db) notifier := notification.NewOrderNotifier( smtpClient, customerRepo, ) bus.Subscribe("order.placed", notifier.Handle) bus.Start(ctx) svc := domain.NewOrderService(repo, bus) handler := httphandler.New(svc) mux := http.NewServeMux() mux.HandleFunc("POST /orders", handler.Create()) log.Fatal(http.ListenAndServe(":8080", mux))
}
package testutil import ( "context" "yourapp/domain"
) type SpyPublisher struct { Events []domain.Event
} func (s *SpyPublisher) Publish( _ context.Context, events ...domain.Event,
) error { s.Events = append(s.Events, events...) return nil
}
package testutil import ( "context" "yourapp/domain"
) type SpyPublisher struct { Events []domain.Event
} func (s *SpyPublisher) Publish( _ context.Context, events ...domain.Event,
) error { s.Events = append(s.Events, events...) return nil
}
package testutil import ( "context" "yourapp/domain"
) type SpyPublisher struct { Events []domain.Event
} func (s *SpyPublisher) Publish( _ context.Context, events ...domain.Event,
) error { s.Events = append(s.Events, events...) return nil
}
func TestPlaceOrder_EmitsEvent(t *testing.T) { repo := &inMemoryRepo{ orders: make(map[string]domain.Order), } spy := &testutil.SpyPublisher{} svc := domain.NewOrderService(repo, spy) order, err := svc.PlaceOrder( context.Background(), "cust-1", 99.99, ) if err != nil { t.Fatalf("unexpected error: %v", err) } if len(spy.Events) != 1 { t.Fatalf( "expected 1 event, got %d", len(spy.Events), ) } placed, ok := spy.Events[0].(domain.OrderPlaced) if !ok { t.Fatal("expected OrderPlaced event") } if placed.OrderID != order.ID { t.Errorf( "event order ID = %s, want %s", placed.OrderID, order.ID, ) }
}
func TestPlaceOrder_EmitsEvent(t *testing.T) { repo := &inMemoryRepo{ orders: make(map[string]domain.Order), } spy := &testutil.SpyPublisher{} svc := domain.NewOrderService(repo, spy) order, err := svc.PlaceOrder( context.Background(), "cust-1", 99.99, ) if err != nil { t.Fatalf("unexpected error: %v", err) } if len(spy.Events) != 1 { t.Fatalf( "expected 1 event, got %d", len(spy.Events), ) } placed, ok := spy.Events[0].(domain.OrderPlaced) if !ok { t.Fatal("expected OrderPlaced event") } if placed.OrderID != order.ID { t.Errorf( "event order ID = %s, want %s", placed.OrderID, order.ID, ) }
}
func TestPlaceOrder_EmitsEvent(t *testing.T) { repo := &inMemoryRepo{ orders: make(map[string]domain.Order), } spy := &testutil.SpyPublisher{} svc := domain.NewOrderService(repo, spy) order, err := svc.PlaceOrder( context.Background(), "cust-1", 99.99, ) if err != nil { t.Fatalf("unexpected error: %v", err) } if len(spy.Events) != 1 { t.Fatalf( "expected 1 event, got %d", len(spy.Events), ) } placed, ok := spy.Events[0].(domain.OrderPlaced) if !ok { t.Fatal("expected OrderPlaced event") } if placed.OrderID != order.ID { t.Errorf( "event order ID = %s, want %s", placed.OrderID, order.ID, ) }
} - Book: Hexagonal Architecture in Go
- Also by me: Thinking in Go (2-book series) — Complete Guide to Go Programming + Hexagonal Architecture in Go
- My project: Hermes IDE | GitHub — an IDE for developers who ship with Claude Code and other AI coding tools
- Me: xgabriel.com | GitHub - The event consumer lives in the same process
- You do not need delivery guarantees beyond "the process stays up"
- You have fewer than a handful of event types - Consumers live in separate services
- You need at-least-once delivery across restarts
- You need fan-out to multiple independent consumers at scale
- You need replay (re-processing historical events)