Event-Driven Architecture

What's an event?

An event is a record that something happened — "a user registered", "a password was reset". Instead of one piece of code directly calling another to make the next thing happen, it just announces the event, and whoever cares reacts.

That indirection is the whole point. The code that does something doesn't need to know who's interested or what they'll do. You can add a new reaction (send a welcome email, update a projection, notify another team's service) without touching the code that emitted the event. Things stay decoupled.

This matters most at the seams between bounded contexts. Each context is its own little system — several may share a process today, but any of them should be splittable into its own microservice later. They collaborate only through events, never by importing each other or sharing a database, so that split is always possible.

Two kinds of events

There are two, and they exist for different reasons. Getting the distinction right is the key to everything else on this page: one kind stays inside a context, the other is how contexts talk across the boundary.

Domain events — internal to one context

A domain event is emitted by an aggregate when something meaningful happens in the business. It's handled in the same context, in the same process, and it never leaves. Because it stays internal, it can carry rich domain objects — value objects, entities, whatever the handlers need:

// auth context — in-process only, never serialized over the wire
export class UserRegistered_DomainEvent extends Base_DomainEvent {
  constructor(
    public readonly userId: Id,        // domain value objects, not strings
    public readonly email: Email,
    public readonly role: UserRole,
  ) {
    super(userId);
  }
}

Use them to keep a single context's own logic decoupled: an aggregate records what happened, and one or more handlers in that same context react to it.

Integration events — the contract between contexts

An integration event is the public contract that crosses a boundary — between bounded contexts, and between services. It travels over Kafka, so it must be easily serializable: plain, primitive fields only, no domain objects. It's also versioned, because other systems depend on its shape and you need to be able to evolve it without breaking them:

// the public "a user was created" contract — crosses context/service boundaries
export class UserCreated_IntegrationEvent extends Base_IntegrationEvent {
  public readonly userId: string;      // plain strings: serializable, stable
  public readonly email: string;
  public readonly role: string;

  constructor(props: UserCreated_IntegrationEventProps) {
    //                              topic            name                 version
    super(props, Topics.USERS.topic, Topics.USERS.events.USER_CREATED, '1.0');
    this.userId = props.userId;
    this.email = props.email;
    this.role = props.role;
    this.validate();
  }
}

The rule of thumb: a domain event says "this happened inside me." An integration event says "here's something I'm announcing to everyone else, in a shape they can rely on."

A domain-event handler is the bridge between the two: it reacts to a domain event and builds the corresponding integration event to publish.

Why the strict separation

Keeping domain objects out of integration events is what keeps contexts truly independent. If one context could hand its aggregates to another, they'd be coupled to each other's internals and you could never split them apart. Plain, versioned messages are the contract that lets each context evolve — and relocate into its own service — on its own schedule.

Getting it reliable

Publishing an integration event sounds simple: change some data, then send a message to Kafka. But that naive approach has two classic failure modes:

  • On the publish side: if you save to the database and then send to Kafka, a crash in between means the change happened but the event never went out (lost event). Send first and save second, and you can publish an event for a change that ends up rolling back (phantom event).
  • On the consume side: Kafka guarantees at-least-once delivery, so the same message can arrive twice. Processing it twice (charging a card, sending an email again) is a bug.

ForgeStack solves both with two patterns: the transactional outbox on the way out and the idempotent inbox on the way in. The rest of this page is how they fit together.

The flow end to end

1. Command mutates an aggregate, which emits a DOMAIN event.
2. A domain-event handler builds an INTEGRATION event and writes it
   to the OUTBOX — in the same DB transaction as the state change.
3. A relay reads unprocessed outbox rows and PUBLISHES them to Kafka.
4. Consumers receive the message into their INBOX (which dedupes it).
5. The inbox dispatches it to the registered handler exactly once.

Steps 1–2 commit atomically, so you can never publish an event for a change that rolled back, nor lose one for a change that committed.

The outbox (publish side)

A domain-event handler creates the integration event and saves it to the outbox. OutboxEvent.create() snapshots the event as a JSON payload and tracks retry state:

// …/domain-event-handlers/user-deleted/…publish-integration-event…ts
export class UserDeleted_PublishIntegrationEvent_DomainEventHandler
  extends Base_DomainEventHandler(UserDeleted_DomainEvent) {

  async handleEvent(event: UserDeleted_DomainEvent) {
    const user = await this.userRepository.findById(event.aggregateId);
    if (!user) return;

    const integrationEvent = new UserDeleted_IntegrationEvent({
      id: Id.random().toValue(),
      occurredOn: new Date(),
      userId: event.aggregateId.toValue(),
      email: user.email.toValue(),
    });

    const outboxEvent = OutboxEvent.create(integrationEvent);
    await this.outboxRepository.save(outboxEvent);
  }
}

The OutboxEvent aggregate owns its lifecycle — retry counting, max retries, and the processed flag — as behaviour, not setters:

// libs/common/src/outbox/domain/outbox.aggregate.ts
export class OutboxEvent extends SharedAggregate {
  canRetry(): boolean {
    return this.retryCount.isLessThan(this.maxRetries) && !this.isProcessed();
  }
  markAsProcessed(): void {
    this.processedAt = OutboxProcessedAt.now();
  }
  incrementRetry(): void {
    if (!this.canRetry()) throw new DomainValidationException(/* … */);
    this.retryCount = this.retryCount.increment();
  }
}

When a command needs to publish as part of its own transaction, the base command handler exposes sendIntegrationEvent(event, context), which writes the outbox row using the command's transaction context — keeping state and event atomic.

Kafka (transport)

The publisher sends the serialised event to its topic, keyed by event id:

// libs/kafka/src/kafka.integration-event-publisher.ts
async publish(topic: string, message: string) {
  const producer = this.kafkaService.getProducer();
  const key = JSON.parse(message).id;            // dedupe key
  await producer.send({ topic, messages: [{ key, value: message, timestamp: `${Date.now()}` }] });
}

The listener subscribes to the topics that have registered handlers, and on each message routes it through the inbox:

// libs/kafka/src/kafka.integration-event-listener.ts — eachMessage
await consumer.run({
  eachMessage: async ({ topic, message }) => {
    await this.handleMessage(topic, this.parseMessage(message));
  },
});

The inbox (consume side)

The inbox guarantees each message is processed exactly once. Incoming messages are recorded as InboxEvents with a status that transitions through a small state machine; duplicates are detected and dropped:

// libs/common/src/inbox/domain/inbox.aggregate.ts
export class InboxEvent extends SharedAggregate {
  canProcess(): boolean { return this.status.isPending(); }

  markAsProcessing(): void {
    if (!this.canProcess()) throw new DomainValidationException(/* not pending */);
    this.status = InboxStatusVO.processing();
  }
  markAsProcessed(): void {
    if (!this.isProcessing()) throw new DomainValidationException(/* not processing */);
    this.status = InboxStatusVO.processed();
    this.processedAt = InboxProcessedAt.now();
  }
  markAsDuplicate(): void {
    this.status = InboxStatusVO.duplicate();
    this.processedAt = InboxProcessedAt.now();
  }
}

When the listener has an inbox service, handleMessage hands the message to it; the inbox stores, dedupes, and then dispatches to the registered handler. Without an inbox service, the listener dispatches directly (useful in tests).

Why outbox + inbox together

The outbox stops you from losing or phantom-publishing events. The inbox stops you from processing the same event twice. Together they turn Kafka's at-least-once delivery into effectively exactly-once business processing — without distributed transactions.

Anatomy of an integration event

Every integration event extends Base_IntegrationEvent, which standardises the envelope (topic, name, version, id, occurredOn, tracing metadata) and the serialisation that makes it safe to send over the wire:

// libs/common/src/integration-events/events/base.integration-event.ts
export abstract class Base_IntegrationEvent {
  readonly id: string;
  readonly occurredOn: Date;

  constructor(props, readonly topic: string, readonly name: string, readonly version: string) {
    this.id = props.id;
    this.occurredOn = props.occurredOn;
  }

  toJSON(): Record<string, unknown> {
    return {
      topic: this.topic, name: this.name, version: this.version,
      id: this.id, occurredOn: this.occurredOn.toISOString(),
      metadata: TracingService.getTraceMetadata() || {},
      ...this.toEventJSON(),
    };
  }

  protected abstract toEventJSON(): Record<string, unknown>;
}

A concrete event defines its payload and how to (de)serialise it — toEventJSON to send, fromJSON to reconstruct on the other side:

// libs/common/src/integration-events/events/user-deleted.integration-event.ts
export class UserDeleted_IntegrationEvent extends Base_IntegrationEvent {
  public readonly userId: string;
  public readonly email: string;

  constructor(props: UserDeleted_IntegrationEventProps) {
    super(props, Topics.USERS.topic, Topics.USERS.events.USER_DELETED, '1.0');
    this.userId = props.userId;
    this.email = props.email;
    this.validate();
  }

  protected toEventJSON() {
    return { userId: this.userId, email: this.email };
  }

  static fromJSON(json: Record<string, unknown>): UserDeleted_IntegrationEvent {
    Base_IntegrationEvent.validateJson(json);
    return new UserDeleted_IntegrationEvent({
      id: json.id as string,
      occurredOn: new Date(json.occurredOn as string),
      userId: json.userId as string,
      email: json.email as string,
    });
  }
}

The metadata carries the current trace context, so a consumer's spans link back to the producer's trace in Tempo.

Adding your own event

  1. Define an integration event extending Base_IntegrationEvent (with toEventJSON / fromJSON).
  2. In the producing context, add a domain-event handler that builds it and saves an OutboxEvent.
  3. In the consuming context, register a handler for the topic/event name; the inbox delivers it.

Next: Repositories.