Tools: Domain Events in Go Without an Event Bus

Tools: Domain Events in Go Without an Event Bus

What Is a Domain Event

The Port: EventPublisher

The Domain Service

Adapter 1: Channel-Based Publisher

Adapter 2: The Notification Handler

Wiring in main()

Adapter 3: In-Memory Publisher for Tests

When to Graduate to a Real Broker

What to Try Next

If this was useful Someone on a team I talked to last year spent three weeks evaluating message brokers. Kafka vs NATS vs RabbitMQ. Benchmarks, Helm charts, ACL configs, consumer-group semantics. All of it for one use case: send an email when an order is placed. Three weeks of infrastructure work for what amounts to a function call. They diagnosed the wrong thing. Their assumption — that "domain events" required an event bus — was the actual gap. Domain events are a design pattern. The transport is a detail you pick later. In Go, you can get real event-driven behavior with an interface, a struct, and a channel. Drop the broker, skip the Docker Compose dependency, and avoid vendor lock-in on day one. A domain event is a record of something that happened in your business logic. Not a message on a queue. Not a protobuf payload. A plain struct. OrderPlaced lives in the domain package. It carries no opinions about Kafka topics, JSON serialization, or HTTP webhooks — just a plain fact about what happened. Your domain needs a way to emit events without knowing where they go. Define an outbound port as an interface in the domain package: One method. Takes a context and a variadic slice of events, and hands back an error if something goes wrong. The domain service calls this port without ever knowing whether the events land in a channel, a Kafka topic, a log file, or /dev/null. Here is OrderService. It depends on two ports: a repository to persist orders and a publisher to emit events. The constructor takes the two ports. PlaceOrder uses them both — persist first, then emit: Notice what is absent: no kafka.Producer, no nats.Conn, no import outside the domain. PlaceOrder calls s.publisher.Publish(ctx, event) and moves on. Where those events actually land is someone else's problem entirely. For a single-process service, a Go channel works. The adapter implements EventPublisher by sending events to a buffered channel. A separate goroutine reads from that channel and dispatches to handlers. Subscribe registers handlers by event name. Publish pushes events onto the buffered channel, and Start spins up a goroutine that drains the channel and dispatches to registered handlers: An adapter through and through — living in an infrastructure package, not in the domain. Swap it for Kafka tomorrow and the domain service stays untouched. Write a new adapter, wire it in main(), and deploy. The handler that consumes OrderPlaced events is also an adapter. It sits on the outbound side, reacting to domain events. Handle type-asserts the incoming event, looks up the customer email, and fires the notification: The notifier depends on its own ports (EmailSender, CustomerLookup) and nothing else. Whether a goroutine reading a channel invoked it or a Kafka consumer group did makes no difference — the handler neither knows nor cares. main() is the composition root. Every adapter gets instantiated here, every port gets satisfied here — plain constructor calls, no framework, no reflection. Read this top-to-bottom and you see the entire architecture. Repository adapter. Event bus adapter. Notification adapter. Domain service. HTTP adapter. Five imports, zero ambiguity. Here is where the pattern pays for itself. Tests should not involve a channel, a goroutine, or any asynchrony. All you want is a publisher that records what was emitted so you can assert on it. Twelve lines, no mocking framework, no code generation. Because SpyPublisher has the right method signature, Go's implicit interface satisfaction handles the rest. The test runs without a broker, without Docker, without teardown — finishing in microseconds while proving the domain emits the right event with the right data. None of this means you should avoid Kafka forever. Defer the decision until you actually need it. Here is a rough guide: Stay with the channel publisher when: Move to Kafka, NATS, or a queue when: Migration is clean. Write a KafkaPublisher that satisfies EventPublisher, swap one line in main(), and deploy. Your domain service, event structs, tests, and notification handler stay untouched — exactly what the port buys you. If this approach clicks, push it one step further: add a second event (OrderShipped) and a second handler that updates an analytics counter. Wire both handlers in main(). You will see how adding behavior means adding adapters — the domain service never reopens. That feedback loop is where hexagonal architecture starts to feel less like theory and more like the way you want to build everything. The channel-and-port pattern is one chapter in a much larger story about structuring Go services so they survive contact with production. If the idea of transport-agnostic domain logic clicked for you, the book goes deeper: outbound adapters, transaction boundaries with Unit of Work, decorator-based observability, and when to skip hexagonal entirely. Hexagonal Architecture in Go covers 22 chapters of this, from first principles to production-ready services. It is Book 2 in the Thinking in Go series. Book 1, The Complete Guide to Go Programming, covers the language itself. Templates let you quickly answer FAQs or store snippets for re-use. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse

Code Block

Copy

package domain import "time" type Event interface { EventName() string OccurredAt() time.Time } type OrderPlaced struct { OrderID string CustomerID string Total float64 Timestamp time.Time } func (e OrderPlaced) EventName() string { return "order.placed" } func (e OrderPlaced) OccurredAt() time.Time { return e.Timestamp } package domain import "time" type Event interface { EventName() string OccurredAt() time.Time } type OrderPlaced struct { OrderID string CustomerID string Total float64 Timestamp time.Time } func (e OrderPlaced) EventName() string { return "order.placed" } func (e OrderPlaced) OccurredAt() time.Time { return e.Timestamp } package domain import "time" type Event interface { EventName() string OccurredAt() time.Time } type OrderPlaced struct { OrderID string CustomerID string Total float64 Timestamp time.Time } func (e OrderPlaced) EventName() string { return "order.placed" } func (e OrderPlaced) OccurredAt() time.Time { return e.Timestamp } package domain import "context" type EventPublisher interface { Publish(ctx context.Context, events ...Event) error } package domain import "context" type EventPublisher interface { Publish(ctx context.Context, events ...Event) error } package domain import "context" type EventPublisher interface { Publish(ctx context.Context, events ...Event) error } package domain import ( "context" "time" "github.com/google/uuid" ) type OrderService struct { repo OrderRepository publisher EventPublisher } func NewOrderService( repo OrderRepository, publisher EventPublisher, ) *OrderService { return &OrderService{ repo: repo, publisher: publisher, } } package domain import ( "context" "time" "github.com/google/uuid" ) type OrderService struct { repo OrderRepository publisher EventPublisher } func NewOrderService( repo OrderRepository, publisher EventPublisher, ) *OrderService { return &OrderService{ repo: repo, publisher: publisher, } } package domain import ( "context" "time" "github.com/google/uuid" ) type OrderService struct { repo OrderRepository publisher EventPublisher } func NewOrderService( repo OrderRepository, publisher EventPublisher, ) *OrderService { return &OrderService{ repo: repo, publisher: publisher, } } func (s *OrderService) PlaceOrder( ctx context.Context, customerID string, total float64, ) (Order, error) { order := Order{ ID: uuid.New().String(), CustomerID: customerID, Total: total, Status: StatusPending, CreatedAt: time.Now(), } if err := s.repo.Save(ctx, order); err != nil { return Order{}, err } event := OrderPlaced{ OrderID: order.ID, CustomerID: order.CustomerID, Total: order.Total, Timestamp: order.CreatedAt, } if err := s.publisher.Publish(ctx, event); err != nil { return Order{}, err } return order, nil } func (s *OrderService) PlaceOrder( ctx context.Context, customerID string, total float64, ) (Order, error) { order := Order{ ID: uuid.New().String(), CustomerID: customerID, Total: total, Status: StatusPending, CreatedAt: time.Now(), } if err := s.repo.Save(ctx, order); err != nil { return Order{}, err } event := OrderPlaced{ OrderID: order.ID, CustomerID: order.CustomerID, Total: order.Total, Timestamp: order.CreatedAt, } if err := s.publisher.Publish(ctx, event); err != nil { return Order{}, err } return order, nil } func (s *OrderService) PlaceOrder( ctx context.Context, customerID string, total float64, ) (Order, error) { order := Order{ ID: uuid.New().String(), CustomerID: customerID, Total: total, Status: StatusPending, CreatedAt: time.Now(), } if err := s.repo.Save(ctx, order); err != nil { return Order{}, err } event := OrderPlaced{ OrderID: order.ID, CustomerID: order.CustomerID, Total: order.Total, Timestamp: order.CreatedAt, } if err := s.publisher.Publish(ctx, event); err != nil { return Order{}, err } return order, nil } package eventbus import ( "context" "log" "sync" "yourapp/domain" ) type Handler func(ctx context.Context, event domain.Event) type ChannelPublisher struct { ch chan domain.Event handlers map[string][]Handler mu sync.RWMutex } func NewChannelPublisher(bufSize int) *ChannelPublisher { return &ChannelPublisher{ ch: make(chan domain.Event, bufSize), handlers: make(map[string][]Handler), } } func (p *ChannelPublisher) Subscribe( eventName string, handler Handler, ) { p.mu.Lock() defer p.mu.Unlock() p.handlers[eventName] = append( p.handlers[eventName], handler, ) } package eventbus import ( "context" "log" "sync" "yourapp/domain" ) type Handler func(ctx context.Context, event domain.Event) type ChannelPublisher struct { ch chan domain.Event handlers map[string][]Handler mu sync.RWMutex } func NewChannelPublisher(bufSize int) *ChannelPublisher { return &ChannelPublisher{ ch: make(chan domain.Event, bufSize), handlers: make(map[string][]Handler), } } func (p *ChannelPublisher) Subscribe( eventName string, handler Handler, ) { p.mu.Lock() defer p.mu.Unlock() p.handlers[eventName] = append( p.handlers[eventName], handler, ) } package eventbus import ( "context" "log" "sync" "yourapp/domain" ) type Handler func(ctx context.Context, event domain.Event) type ChannelPublisher struct { ch chan domain.Event handlers map[string][]Handler mu sync.RWMutex } func NewChannelPublisher(bufSize int) *ChannelPublisher { return &ChannelPublisher{ ch: make(chan domain.Event, bufSize), handlers: make(map[string][]Handler), } } func (p *ChannelPublisher) Subscribe( eventName string, handler Handler, ) { p.mu.Lock() defer p.mu.Unlock() p.handlers[eventName] = append( p.handlers[eventName], handler, ) } func (p *ChannelPublisher) Publish( ctx context.Context, events ...domain.Event, ) error { for _, e := range events { select { case p.ch <- e: case <-ctx.Done(): return ctx.Err() } } return nil } func (p *ChannelPublisher) Start(ctx context.Context) { go func() { for { select { case e := <-p.ch: p.mu.RLock() handlers := p.handlers[e.EventName()] p.mu.RUnlock() for _, h := range handlers { h(ctx, e) } case <-ctx.Done(): log.Println("event bus stopped") return } } }() } func (p *ChannelPublisher) Publish( ctx context.Context, events ...domain.Event, ) error { for _, e := range events { select { case p.ch <- e: case <-ctx.Done(): return ctx.Err() } } return nil } func (p *ChannelPublisher) Start(ctx context.Context) { go func() { for { select { case e := <-p.ch: p.mu.RLock() handlers := p.handlers[e.EventName()] p.mu.RUnlock() for _, h := range handlers { h(ctx, e) } case <-ctx.Done(): log.Println("event bus stopped") return } } }() } func (p *ChannelPublisher) Publish( ctx context.Context, events ...domain.Event, ) error { for _, e := range events { select { case p.ch <- e: case <-ctx.Done(): return ctx.Err() } } return nil } func (p *ChannelPublisher) Start(ctx context.Context) { go func() { for { select { case e := <-p.ch: p.mu.RLock() handlers := p.handlers[e.EventName()] p.mu.RUnlock() for _, h := range handlers { h(ctx, e) } case <-ctx.Done(): log.Println("event bus stopped") return } } }() } package notification import ( "context" "fmt" "yourapp/domain" ) type EmailSender interface { Send( ctx context.Context, to, subject, body string, ) error } type OrderNotifier struct { sender EmailSender lookup CustomerLookup } type CustomerLookup interface { EmailByID( ctx context.Context, id string, ) (string, error) } func NewOrderNotifier( sender EmailSender, lookup CustomerLookup, ) *OrderNotifier { return &OrderNotifier{ sender: sender, lookup: lookup, } } package notification import ( "context" "fmt" "yourapp/domain" ) type EmailSender interface { Send( ctx context.Context, to, subject, body string, ) error } type OrderNotifier struct { sender EmailSender lookup CustomerLookup } type CustomerLookup interface { EmailByID( ctx context.Context, id string, ) (string, error) } func NewOrderNotifier( sender EmailSender, lookup CustomerLookup, ) *OrderNotifier { return &OrderNotifier{ sender: sender, lookup: lookup, } } package notification import ( "context" "fmt" "yourapp/domain" ) type EmailSender interface { Send( ctx context.Context, to, subject, body string, ) error } type OrderNotifier struct { sender EmailSender lookup CustomerLookup } type CustomerLookup interface { EmailByID( ctx context.Context, id string, ) (string, error) } func NewOrderNotifier( sender EmailSender, lookup CustomerLookup, ) *OrderNotifier { return &OrderNotifier{ sender: sender, lookup: lookup, } } func (n *OrderNotifier) Handle( ctx context.Context, event domain.Event, ) { placed, ok := event.(domain.OrderPlaced) if !ok { return } email, err := n.lookup.EmailByID( ctx, placed.CustomerID, ) if err != nil { fmt.Printf( "lookup failed for %s: %v\n", placed.CustomerID, err, ) return } subject := fmt.Sprintf( "Order %s confirmed", placed.OrderID, ) body := fmt.Sprintf( "Your order of $%.2f is being processed.", placed.Total, ) if err := n.sender.Send( ctx, email, subject, body, ); err != nil { fmt.Printf( "email send failed for %s: %v\n", placed.OrderID, err, ) } } func (n *OrderNotifier) Handle( ctx context.Context, event domain.Event, ) { placed, ok := event.(domain.OrderPlaced) if !ok { return } email, err := n.lookup.EmailByID( ctx, placed.CustomerID, ) if err != nil { fmt.Printf( "lookup failed for %s: %v\n", placed.CustomerID, err, ) return } subject := fmt.Sprintf( "Order %s confirmed", placed.OrderID, ) body := fmt.Sprintf( "Your order of $%.2f is being processed.", placed.Total, ) if err := n.sender.Send( ctx, email, subject, body, ); err != nil { fmt.Printf( "email send failed for %s: %v\n", placed.OrderID, err, ) } } func (n *OrderNotifier) Handle( ctx context.Context, event domain.Event, ) { placed, ok := event.(domain.OrderPlaced) if !ok { return } email, err := n.lookup.EmailByID( ctx, placed.CustomerID, ) if err != nil { fmt.Printf( "lookup failed for %s: %v\n", placed.CustomerID, err, ) return } subject := fmt.Sprintf( "Order %s confirmed", placed.OrderID, ) body := fmt.Sprintf( "Your order of $%.2f is being processed.", placed.Total, ) if err := n.sender.Send( ctx, email, subject, body, ); err != nil { fmt.Printf( "email send failed for %s: %v\n", placed.OrderID, err, ) } } func main() { ctx, cancel := context.WithCancel( context.Background(), ) defer cancel() db := setupDB() repo := postgres.NewOrderRepository(db) bus := eventbus.NewChannelPublisher(256) smtpClient := smtp.NewClient(smtpConfig) customerRepo := postgres.NewCustomerRepository(db) notifier := notification.NewOrderNotifier( smtpClient, customerRepo, ) bus.Subscribe("order.placed", notifier.Handle) bus.Start(ctx) svc := domain.NewOrderService(repo, bus) handler := httphandler.New(svc) mux := http.NewServeMux() mux.HandleFunc("POST /orders", handler.Create()) log.Fatal(http.ListenAndServe(":8080", mux)) } func main() { ctx, cancel := context.WithCancel( context.Background(), ) defer cancel() db := setupDB() repo := postgres.NewOrderRepository(db) bus := eventbus.NewChannelPublisher(256) smtpClient := smtp.NewClient(smtpConfig) customerRepo := postgres.NewCustomerRepository(db) notifier := notification.NewOrderNotifier( smtpClient, customerRepo, ) bus.Subscribe("order.placed", notifier.Handle) bus.Start(ctx) svc := domain.NewOrderService(repo, bus) handler := httphandler.New(svc) mux := http.NewServeMux() mux.HandleFunc("POST /orders", handler.Create()) log.Fatal(http.ListenAndServe(":8080", mux)) } func main() { ctx, cancel := context.WithCancel( context.Background(), ) defer cancel() db := setupDB() repo := postgres.NewOrderRepository(db) bus := eventbus.NewChannelPublisher(256) smtpClient := smtp.NewClient(smtpConfig) customerRepo := postgres.NewCustomerRepository(db) notifier := notification.NewOrderNotifier( smtpClient, customerRepo, ) bus.Subscribe("order.placed", notifier.Handle) bus.Start(ctx) svc := domain.NewOrderService(repo, bus) handler := httphandler.New(svc) mux := http.NewServeMux() mux.HandleFunc("POST /orders", handler.Create()) log.Fatal(http.ListenAndServe(":8080", mux)) } package testutil import ( "context" "yourapp/domain" ) type SpyPublisher struct { Events []domain.Event } func (s *SpyPublisher) Publish( _ context.Context, events ...domain.Event, ) error { s.Events = append(s.Events, events...) return nil } package testutil import ( "context" "yourapp/domain" ) type SpyPublisher struct { Events []domain.Event } func (s *SpyPublisher) Publish( _ context.Context, events ...domain.Event, ) error { s.Events = append(s.Events, events...) return nil } package testutil import ( "context" "yourapp/domain" ) type SpyPublisher struct { Events []domain.Event } func (s *SpyPublisher) Publish( _ context.Context, events ...domain.Event, ) error { s.Events = append(s.Events, events...) return nil } func TestPlaceOrder_EmitsEvent(t *testing.T) { repo := &inMemoryRepo{ orders: make(map[string]domain.Order), } spy := &testutil.SpyPublisher{} svc := domain.NewOrderService(repo, spy) order, err := svc.PlaceOrder( context.Background(), "cust-1", 99.99, ) if err != nil { t.Fatalf("unexpected error: %v", err) } if len(spy.Events) != 1 { t.Fatalf( "expected 1 event, got %d", len(spy.Events), ) } placed, ok := spy.Events[0].(domain.OrderPlaced) if !ok { t.Fatal("expected OrderPlaced event") } if placed.OrderID != order.ID { t.Errorf( "event order ID = %s, want %s", placed.OrderID, order.ID, ) } } func TestPlaceOrder_EmitsEvent(t *testing.T) { repo := &inMemoryRepo{ orders: make(map[string]domain.Order), } spy := &testutil.SpyPublisher{} svc := domain.NewOrderService(repo, spy) order, err := svc.PlaceOrder( context.Background(), "cust-1", 99.99, ) if err != nil { t.Fatalf("unexpected error: %v", err) } if len(spy.Events) != 1 { t.Fatalf( "expected 1 event, got %d", len(spy.Events), ) } placed, ok := spy.Events[0].(domain.OrderPlaced) if !ok { t.Fatal("expected OrderPlaced event") } if placed.OrderID != order.ID { t.Errorf( "event order ID = %s, want %s", placed.OrderID, order.ID, ) } } func TestPlaceOrder_EmitsEvent(t *testing.T) { repo := &inMemoryRepo{ orders: make(map[string]domain.Order), } spy := &testutil.SpyPublisher{} svc := domain.NewOrderService(repo, spy) order, err := svc.PlaceOrder( context.Background(), "cust-1", 99.99, ) if err != nil { t.Fatalf("unexpected error: %v", err) } if len(spy.Events) != 1 { t.Fatalf( "expected 1 event, got %d", len(spy.Events), ) } placed, ok := spy.Events[0].(domain.OrderPlaced) if !ok { t.Fatal("expected OrderPlaced event") } if placed.OrderID != order.ID { t.Errorf( "event order ID = %s, want %s", placed.OrderID, order.ID, ) } } - Book: Hexagonal Architecture in Go - Also by me: Thinking in Go (2-book series) — Complete Guide to Go Programming + Hexagonal Architecture in Go - My project: Hermes IDE | GitHub — an IDE for developers who ship with Claude Code and other AI coding tools - Me: xgabriel.com | GitHub - The event consumer lives in the same process - You do not need delivery guarantees beyond "the process stays up" - You have fewer than a handful of event types - Consumers live in separate services - You need at-least-once delivery across restarts - You need fan-out to multiple independent consumers at scale - You need replay (re-processing historical events)