Domain Event Broker

Introduction

Domain events are a concept used in domain-driven design (DDD). Those events are published from within a bounded context when something happened inside the domain that is relevant for other domains as well. Domain events are very useful for sharing information between separate services without establishing dedicated communication channels.

Domain events are always asynchronous - allowing to completely decouple event publisher and subscriber.

This library provides a shallow layer on top of RabbitMQ topic exchanges for publishing and receiving domain events. Publisher and subscriber need not know about each other and can be started and stopped in any order. Each subscriber controls their own retry policy, whether they need a durable queue for the time they are down, or a dead-letter queue in case there is an error in the subscriber.

Publish domain events

Events can be sent by calling publish_domain_event():

from domain_event_broker import publish_domain_event
publish_domain_event('user.registered', {'user_id': user.id})

Domain events are sent immediately. When emitting domain events from within a database transaction, it’s recommended to defer publishing until the transaction is committed. Using a commit hook avoids spurious domain events if a transaction is rolled back after an error.

Subscribe to domain events

Subscribers can listen to one or more domain events - controlled via the binding keys. Binding keys may contain wildcards. A queue will be created for each subscriber. RabbitMQ takes care of routing only the relevant events to this queue.

This script will receive all events that are sent in the user domain:

from domain_event_broker import Subscriber

def handle_user_event(event):
    print event

subscriber = Subscriber()
subscriber.register(handle_user_event, 'print-user-event', ['user.*'])
subscriber.start_consuming()

Django Integration

Configuration

This library can be configured via your Django settings. Add domain_event_broker.django to your INSTALLED_APPS and set the DOMAIN_EVENT_BROKER in your settings:

INSTALLED_APPS = (
    'domain_event_broker.django',
    )

DOMAIN_EVENT_BROKER = 'amqp://user:password@rabbitmq-host/domain-events'

Setting DOMAIN_EVENT_BROKER to None will deactivate communication with RabbitMQ – essentially disabling domain event routing. This can be useful in development and test environments where RabbitMQ is not available.

Database transactions

Domain events are published immediately. When emitting domain events from within a database transaction, it’s recommended to defer publishing until the transaction is committed. Using a commit hook avoids spurious domain events if a transaction is rolled back after an error.

We recommend using publish_on_commit instead of using publish_domain_event when you’re using Django.

domain_event_broker.django.publish_on_commit(*args, **kwargs) → None[source]

Send domain event after transaction has been committed to the database. If there is no transaction, it’ll be sent right away. If atomic blocks are nested, it will be sent when exiting the outermost atomic block.

More information can be found here:

https://docs.djangoproject.com/en/dev/topics/db/transactions/#performing-actions-after-commit

Testing

If you want to test a component in isolation that is publishing domain events, we recommend mocking publish_domain_event or publish_on_commit. For testing subscribers, you can create DomainEvent objects manually and directly call the handler function.

Replaying dead-lettered domain events

If an event couldn’t be processed by an event handler it will end up in a dead-letter queue. There’s a Django management command that helps you reschedule processing of dead-lettered events:

django-admin replay_domain_event <handler-name>

The name is the one given to Subscriber.register. The name is also used as the queue name. If an event is dead lettered into user-registeration-confirmation-dl, you’d call replay_domain_event user-registration-confirmation.

API Documentation

Publish

domain_event_broker.publish_domain_event(routing_key: str, data: Dict[str, Any], domain_object_id: Optional[str] = None, uuid_string: Optional[str] = None, timestamp: Optional[float] = None, connection_settings: Optional[str] = '') → domain_event_broker.events.DomainEvent[source]

Send a domain event to the message broker. The broker will take care of dispatching the event to registered subscribers.

Parameters:
  • routing_key (str) – The routing key is of the form <DOMAIN>.<EVENT_TYPE>. The routing key should be a descriptive name of the domain event such as user.registered.
  • data (dict) – The actual event data. Must be json serializable.
  • domain_object_id (str) – Domain identifier of the event. This field is optional. If used, it might make search in an event store easier.
  • uuid_string (str) – This UUID identifier of the event. If left None, a new one will be created.
  • timestamp (float) – Unix timestamp. If timestamp is None, a new (UTC) timestamp will be created.
  • connection_settings (str) – Specify the broker with an AMQP URL. If not given, the default broker will be used. If set to None, the domain event is not published to a broker.
Returns:

The domain event that was published.

Return type:

domain_event_broker.DomainEvent

class domain_event_broker.Publisher(connection_settings: Optional[str] = '', exchange: str = 'domain-events', exchange_type: str = 'topic')[source]

Subscribe

class domain_event_broker.Subscriber(*args, **kwargs)[source]

A subscriber manages the registration of one or more event handlers. Once instantiated, call register to add subscribers and start_consuming to wait for incoming events.

Note

The subscriber only uses one thread for processing events. Even if multiple handlers are registered, only one event is processed at a time.

__init__(*args, **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

class domain_event_broker.Retry(delay: float = 10.0)[source]

Raise this exception in an event handler to schedule a delayed retry. The delay is specified in seconds.

Note

Internally a delay exchange with a per-message TTL is used and all delayed events for a handler that share the same delay are placed in one queue. The RabbitMQ TTL has a Millisecond resolution.

Replay

domain_event_broker.replay_event(queue_name: str, message_callback: Callable = <function retry_event>, connection_settings: Optional[str] = '') → int[source]

Move one domain event from a dead-letter queue back into the processing queue.

Parameters:
  • queue_name (str) – Name of the queue where to move the event.
  • message_callback (function) – A callable that receives the event and returns either RETRY, LEAVE or DISCARD.
  • connection_settings (str) –
Returns:

The number of messages left in the dead letter queue.

Return type:

int

domain_event_broker.replay_all(queue_name: str, message_callback: Callable = <function retry_event>, connection_settings: Optional[str] = '') → int[source]

Replay all messages currently in the dead-letter queue. Return number of messages dead-lettered since starting the replay.

Domain event

class domain_event_broker.DomainEvent(routing_key: str = '', data: Dict[KT, VT] = {}, domain_object_id: Optional[str] = None, uuid_string: Optional[str] = None, timestamp: Optional[float] = None, retries: int = 0)[source]
__init__(routing_key: str = '', data: Dict[KT, VT] = {}, domain_object_id: Optional[str] = None, uuid_string: Optional[str] = None, timestamp: Optional[float] = None, retries: int = 0) → None[source]

Define a Domain Event.

Parameters:
  • routing_key (str) – The routing key is of the form <DOMAIN>.<EVENT_TYPE>. The routing key should be a descriptive name of the domain event such as user.registered.
  • data (dict) – The actual event data. Must be json serializable.
  • domain_object_id (str) – Domain identifier of the event. This field is optional. If used, it might make search in an event store easier.
  • uuid_string (str) – This UUID identifier of the event. If left None, a new one will be created.
  • timestamp (float) – Unix timestamp. If timestamp is None, a new (UTC) timestamp will be created.
  • retries (int) – Number of times this event was delivered to a subscriber already.
classmethod from_json(json_data: Union[bytes, str]) → domain_event_broker.events.DomainEvent[source]

Create a DomainEvent from json_data. Note that you probably want to dispatch first based on domain and event type.

Parameters:json_data (str) – Serialized domain event data.
Return type:DomainEvent