Domain Event Broker¶
Introduction¶
Domain events are a concept used in domain-driven design (DDD). Those events are published from within a bounded context when something happened inside the domain that is relevant for other domains as well. Domain events are very useful for sharing information between separate services without establishing dedicated communication channels.
Domain events are always asynchronous - allowing to completely decouple event publisher and subscriber.
This library provides a shallow layer on top of RabbitMQ topic exchanges for publishing and receiving domain events. Publisher and subscriber need not know about each other and can be started and stopped in any order. Each subscriber controls their own retry policy, whether they need a durable queue for the time they are down, or a dead-letter queue in case there is an error in the subscriber.
Publish domain events¶
Events can be sent by calling publish_domain_event()
:
from domain_event_broker import publish_domain_event
publish_domain_event('user.registered', {'user_id': user.id})
Domain events are sent immediately. When emitting domain events from within a database transaction, it’s recommended to defer publishing until the transaction is committed. Using a commit hook avoids spurious domain events if a transaction is rolled back after an error.
Subscribe to domain events¶
Subscribers can listen to one or more domain events - controlled via the binding keys. Binding keys may contain wildcards. A queue will be created for each subscriber. RabbitMQ takes care of routing only the relevant events to this queue.
This script will receive all events that are sent in the user domain:
from domain_event_broker import Subscriber
def handle_user_event(event):
print event
subscriber = Subscriber()
subscriber.register(handle_user_event, 'print-user-event', ['user.*'])
subscriber.start_consuming()
Django Integration¶
Configuration¶
This library can be configured via your Django settings. Add
domain_event_broker.django to your INSTALLED_APPS
and set the
DOMAIN_EVENT_BROKER
in your settings:
INSTALLED_APPS = (
'domain_event_broker.django',
)
DOMAIN_EVENT_BROKER = 'amqp://user:password@rabbitmq-host/domain-events'
Setting DOMAIN_EVENT_BROKER
to None
will deactivate communication with
RabbitMQ – essentially disabling domain event routing. This can be useful in
development and test environments where RabbitMQ is not available.
Database transactions¶
Domain events are published immediately. When emitting domain events from within a database transaction, it’s recommended to defer publishing until the transaction is committed. Using a commit hook avoids spurious domain events if a transaction is rolled back after an error.
We recommend using publish_on_commit
instead of using
publish_domain_event
when you’re using Django.
-
domain_event_broker.django.
publish_on_commit
(*args, **kwargs) → None[source]¶ Send domain event after transaction has been committed to the database. If there is no transaction, it’ll be sent right away. If atomic blocks are nested, it will be sent when exiting the outermost atomic block.
More information can be found here:
https://docs.djangoproject.com/en/dev/topics/db/transactions/#performing-actions-after-commit
Testing¶
If you want to test a component in isolation that is publishing domain events,
we recommend mocking publish_domain_event
or publish_on_commit
. For
testing subscribers, you can create DomainEvent
objects manually and
directly call the handler function.
Replaying dead-lettered domain events¶
If an event couldn’t be processed by an event handler it will end up in a dead-letter queue. There’s a Django management command that helps you reschedule processing of dead-lettered events:
django-admin replay_domain_event <handler-name>
The name is the one given to Subscriber.register
. The name is also used as
the queue name. If an event is dead lettered into
user-registeration-confirmation-dl
, you’d call replay_domain_event
user-registration-confirmation
.
API Documentation¶
Publish¶
-
domain_event_broker.
publish_domain_event
(routing_key: str, data: Dict[str, Any], domain_object_id: Optional[str] = None, uuid_string: Optional[str] = None, timestamp: Optional[float] = None, connection_settings: Optional[str] = '') → domain_event_broker.events.DomainEvent[source]¶ Send a domain event to the message broker. The broker will take care of dispatching the event to registered subscribers.
Parameters: - routing_key (str) – The routing key is of the form
<DOMAIN>.<EVENT_TYPE>
. The routing key should be a descriptive name of the domain event such asuser.registered
. - data (dict) – The actual event data. Must be json serializable.
- domain_object_id (str) – Domain identifier of the event. This field is optional. If used, it might make search in an event store easier.
- uuid_string (str) – This UUID identifier of the event. If left
None
, a new one will be created. - timestamp (float) – Unix timestamp. If timestamp is None, a new (UTC) timestamp will be created.
- connection_settings (str) – Specify the broker with an AMQP URL. If not
given, the default broker will be used. If set to
None
, the domain event is not published to a broker.
Returns: The domain event that was published.
Return type: - routing_key (str) – The routing key is of the form
Subscribe¶
-
class
domain_event_broker.
Subscriber
(*args, **kwargs)[source]¶ A subscriber manages the registration of one or more event handlers. Once instantiated, call
register
to add subscribers andstart_consuming
to wait for incoming events.Note
The subscriber only uses one thread for processing events. Even if multiple handlers are registered, only one event is processed at a time.
-
class
domain_event_broker.
Retry
(delay: float = 10.0)[source]¶ Raise this exception in an event handler to schedule a delayed retry. The delay is specified in seconds.
Note
Internally a delay exchange with a per-message TTL is used and all delayed events for a handler that share the same delay are placed in one queue. The RabbitMQ TTL has a Millisecond resolution.
Replay¶
-
domain_event_broker.
replay_event
(queue_name: str, message_callback: Callable = <function retry_event>, connection_settings: Optional[str] = '') → int[source]¶ Move one domain event from a dead-letter queue back into the processing queue.
Parameters: - queue_name (str) – Name of the queue where to move the event.
- message_callback (function) – A callable that receives the event and
returns either
RETRY
,LEAVE
orDISCARD
. - connection_settings (str) –
Returns: The number of messages left in the dead letter queue.
Return type: int
Domain event¶
-
class
domain_event_broker.
DomainEvent
(routing_key: str = '', data: Dict[KT, VT] = {}, domain_object_id: Optional[str] = None, uuid_string: Optional[str] = None, timestamp: Optional[float] = None, retries: int = 0)[source]¶ -
__init__
(routing_key: str = '', data: Dict[KT, VT] = {}, domain_object_id: Optional[str] = None, uuid_string: Optional[str] = None, timestamp: Optional[float] = None, retries: int = 0) → None[source]¶ Define a Domain Event.
Parameters: - routing_key (str) – The routing key is of the form
<DOMAIN>.<EVENT_TYPE>
. The routing key should be a descriptive name of the domain event such asuser.registered
. - data (dict) – The actual event data. Must be json serializable.
- domain_object_id (str) – Domain identifier of the event. This field is optional. If used, it might make search in an event store easier.
- uuid_string (str) – This UUID identifier of the event. If left
None
, a new one will be created. - timestamp (float) – Unix timestamp. If timestamp is None, a new (UTC) timestamp will be created.
- retries (int) – Number of times this event was delivered to a subscriber already.
- routing_key (str) – The routing key is of the form
-
classmethod
from_json
(json_data: Union[bytes, str]) → domain_event_broker.events.DomainEvent[source]¶ Create a DomainEvent from
json_data
. Note that you probably want to dispatch first based on domain and event type.Parameters: json_data (str) – Serialized domain event data. Return type: DomainEvent
-