Welcome to EventFlow’s documentation!

EventFlow logo

EventFlow is a basic CQRS+ES framework designed to be easy to use.

Have a look at our getting started guide, the do’s and don’ts and the FAQ.

Contents:

Getting started

Initializing EventFlow always starts with an EventFlowOptions.New as this performs the initial bootstrap and starts the fluent configuration API. The very minimum initialization of EventFlow can be done in a single line, but wouldn’t serve any purpose as no domain has been configured.

var resolver = EventFlowOptions.New.CreateResolver();

The above line does configures several important defaults

  • Custom internal IoC container
  • In-memory event store
  • Console logger
  • A “null” snapshot store, that merely writes a warning if used (no need to do anything before going to production if you aren’t planning to use snapshots)
  • And lastly, default implementations of all the internal parts of EventFlow

Important

If you’re using ASP.NET Core, you should install the *EventFlow.AspNetCore* package and invoke AddAspNetCoreMetadataProviders in Startup.

public void ConfigureServices(IServiceCollection services)
{
    services.AddEventFlow(ef =>
    {
        ef.AddDefaults(typeof(Startup).Assembly);
        ef.AddAspNetCoreMetadataProviders();
    });
}

Important

Before using EventFlow in a production environment, you should configure an alternative event store, an alternative IoC container and another logger that sends log messages to your production log store.

To start using EventFlow, a domain must be configured which consists of the following parts

In addition to the above, EventFlow provides several optional features. Whether or not these features are utilized depends on the application in which EventFlow is used.

Example application

The example application includes one of each of the required parts: aggregate, event, aggregate identity, command and a command handler. Further down we will go through each of the individual parts.

Note

The example code provided here is located within the EventFlow code base exactly as shown, so if you would like to debug and step through the entire flow, checkout the code and execute the GettingStartedExample test.

https://github.com/eventflow/Documentation/tree/master/Source/EventFlow.Documentation/GettingStarted

All classes create for the example application are prefixed with Example.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// We wire up EventFlow with all of our classes. Instead of adding events,
// commands, etc. explicitly, we could have used the the simpler
// AddDefaults(Assembly) instead.
using (var resolver = EventFlowOptions.New
    .AddEvents(typeof(ExampleEvent))
    .AddCommands(typeof(ExampleCommand))
    .AddCommandHandlers(typeof(ExampleCommandHandler))
    .UseInMemoryReadStoreFor<ExampleReadModel>()
    .CreateResolver())
{
    // Create a new identity for our aggregate root
    var exampleId = ExampleId.New;

    // Define some important value
    const int magicNumber = 42;

    // Resolve the command bus and use it to publish a command
    var commandBus = resolver.Resolve<ICommandBus>();
    var executionResult = await commandBus.PublishAsync(
        new ExampleCommand(exampleId, magicNumber),
        CancellationToken.None)
        .ConfigureAwait(false);

    // Verify that we didn't trigger our domain validation
    executionResult.IsSuccess.Should().BeTrue();

    // Resolve the query handler and use the built-in query for fetching
    // read models by identity to get our read model representing the
    // state of our aggregate root
    var queryProcessor = resolver.Resolve<IQueryProcessor>();
    var exampleReadModel = await queryProcessor.ProcessAsync(
        new ReadModelByIdQuery<ExampleReadModel>(exampleId),
        CancellationToken.None)
        .ConfigureAwait(false);

    // Verify that the read model has the expected magic number
    exampleReadModel.MagicNumber.Should().Be(42);
}

The above example publishes the ExampleCommand to the aggregate with the exampleId identity with the magical value of 42. After the command has been published, the accompanying read model ExampleReadModel is fetched and we verify that the magical number has reached it.

During the execution of the example application, a single event is emitted and stored in the in-memory event store. The JSON for the event is shown here.

{
  "MagicNumber": 42
}

The event data itself is straightforward as it is merely the JSON serialization of an instance of the type ExampleEvent with the value we defined. A bit more interesting is the metadata that EventFlow stores alongside the event, which is used by the EventFlow event store.

{
  "timestamp": "2016-11-09T20:56:28.5019198+01:00",
  "aggregate_sequence_number": "1",
  "aggregate_name": "ExampleAggrenate",
  "aggregate_id": "example-c1d4a2b1-c75b-4c53-ae44-e67ee1ddfd79",
  "event_id": "event-d5622eaa-d1d3-5f57-8023-4b97fabace90",
  "timestamp_epoch": "1478721389",
  "batch_id": "52e9d7e9-3a98-44c5-926a-fc416e20556c",
  "source_id": "command-69176516-07b7-4142-beaf-dba82586152c",
  "event_name": "example",
  "event_version": "1"
}

All the built-in metadata is available on each instance of IDomainEvent<,,>, which is accessible from event handlers for e.g. read models or subscribers. It is also possible to create your own metadata providers or add additional EventFlow built-in providers as needed.

Aggregate identity

The aggregate ID in EventFlow is represented as a value object that inherits from the IIdentity interface. You can provide your own implementation, but EventFlow provides a convenient implementation that will suit most needs. Be sure to read the section about the Identity<> class for details on how to use it.

For our example application we use the built-in class, which makes the implementation very simple.

1
2
3
4
5
6
/// Represents the aggregate identity (ID)
public class ExampleId :
    Identity<ExampleId>
{
    public ExampleId(string value) : base(value) { }
}

Aggregate

Now we’ll take a look at the ExampleAggregate. It is rather simple as the only thing it can do is apply the magic number once.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ExampleAggregate :
    AggregateRoot<ExampleAggregate, ExampleId>,
    IEmit<ExampleEvent>
{
    private int? _magicNumber;

    public ExampleAggregate(ExampleId id) : base(id) { }

    // Method invoked by our command
    public IExecutionResult SetMagicNumer(int magicNumber)
    {
        if (_magicNumber.HasValue)
            return ExecutionResult.Failed("Magic number already set");

        Emit(new ExampleEvent(magicNumber));
        
        return ExecutionResult.Success();
    }

    // We apply the event as part of the event sourcing system. EventFlow
    // provides several different methods for doing this, e.g. state objects,
    // the Apply method is merely the simplest
    public void Apply(ExampleEvent aggregateEvent)
    {
        _magicNumber = aggregateEvent.MagicNumber;
    }

Be sure to read the section on aggregates to get all the details right. For now the most important thing to note, is that the state of the aggregate (updating the _magicNumber variable) happens in the Apply(ExampleEvent) method. This is the event sourcing part of EventFlow in effect. As state changes are only saved as events, mutating the aggregate state must happen in such a way that the state changes are replayed the next time the aggregate is loaded. EventFlow has a set of different approaches that you can select from. In this example we use the Apply methods as they are the simplest.

Important

The Apply(ExampleEvent) is invoked by the Emit(...) method, so after the event has been emitted, the aggregate state has changed.

The ExampleAggregate exposes the SetMagicNumer(int) method, which is used to expose the business rules for changing the magic number. If the magic number hasn’t been set before, the event ExampleEvent is emitted and the aggregate state is mutated.

If the magic numer was changed, we return a failed IExecutionResult with an error message. Returning a failed execution result will make EventFlow disregard any events the aggregate has emitted.

If you need to return something more useful than a bool in an execution result, merely create a new class that implements the IExecutionResult interface and specific the type as generic arguments for the command and command handler.

Note

While possible, do not use the execution results as a method of reading values from the aggregate, that’s what the IQueryProcessor and read models are for.

Event

Next up is the event which represents something that has happened in our domain. In this example, it’s merely that some magic number has been set. Normally these events should have a really, really good name and represent something in the ubiquitous language for the domain.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
/// A basic event containing some information
[EventVersion("example", 1)]
public class ExampleEvent :
    AggregateEvent<ExampleAggregate, ExampleId>
{
    public ExampleEvent(int magicNumber)
    {
        MagicNumber = magicNumber;
    }

    public int MagicNumber { get; }
}

We have applied the [EventVersion("example", 1)] to our event, marking it as the example event version 1, which directly corresponds to the event_name and event_version from the metadata store along side the event mentioned. The information is used by EventFlow to tie the name and version to a specific .NET type.

Important

Even though the using the EventVersion attribute is optional, it is highly recommended. EventFlow will infer the information if it isn’t provided, thus making it vulnerable to type renames among other things.

Important

Once you have aggregates in your production environment that have emitted an event, you should never change the .NET implementation. You can deprecate it, but you should never change the type or the data stored in the event store.

Command

Commands are the entry point to the domain and if you remember from the example application, they are published using the ICommandBus as shown here.

1
2
3
4
5
var commandBus = resolver.Resolve<ICommandBus>();
var executionResult = await commandBus.PublishAsync(
    new ExampleCommand(exampleId, magicNumber),
    CancellationToken.None)
    .ConfigureAwait(false);

In EventFlow commands are simple value objects that merely house the arguments for the command execution. All commands implement the ICommand<,> interface, but EventFlow provides an easy-to-use base class that you can use.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
/// Command for update magic number
public class ExampleCommand :
    Command<ExampleAggregate, ExampleId, IExecutionResult>
{
    public ExampleCommand(
        ExampleId aggregateId,
        int magicNumber)
        : base(aggregateId)
    {
        MagicNumber = magicNumber;
    }

    public int MagicNumber { get; }
}

A command doesn’t do anything without a command handler. In fact, EventFlow will throw an exception if a command doesn’t have exactly one command handler registered.

Command handler

The command handler provides the glue between the command, the aggregate and the IoC container as it defines how a command is executed. Typically they are rather simple, but they could contain more complex logic. How much is up to you.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
/// Command handler for our command
public class ExampleCommandHandler :
    CommandHandler<ExampleAggregate, ExampleId, IExecutionResult, ExampleCommand>
{
    public override Task<IExecutionResult> ExecuteCommandAsync(
        ExampleAggregate aggregate,
        ExampleCommand command,
        CancellationToken cancellationToken)
    {
        var executionResult = aggregate.SetMagicNumer(command.MagicNumber);
        return Task.FromResult(executionResult);
    }
}

The ExampleCommandHandler in our case here merely invokes the SetMagicNumer on the aggregate and returns the execution result. Remember, if a command handler returns a failed execution result, EventFlow will disregard any events the aggregate has emitted.

Important

Everything inside the ExecuteAsync(...) method of a command handler may be executed more than once if there’s an optimistic concurrency exception, i.e., something else has happened to the aggregate since it as loaded from the event store and its therefor automatically reloaded by EventFlow. It is therefor essential that the command handler doesn’t mutate anything other than the aggregate.

Read model

If you ever need to access the data in your aggregates efficiently, its important that read models are used. Loading aggregates from the event store takes time and its impossible to query for e.g. aggregates that have a specific value in its state.

In our example we merely use the built-in in-memory read model store. It is useful in many cases, e.g. executing automated domain tests in a CI build.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
/// Read model for our aggregate
public class ExampleReadModel :
    IReadModel,
    IAmReadModelFor<ExampleAggregate, ExampleId, ExampleEvent>
{
    public int MagicNumber { get; private set; }

    public void Apply(
        IReadModelContext context,
        IDomainEvent<ExampleAggregate, ExampleId, ExampleEvent> domainEvent)
    {
        MagicNumber = domainEvent.AggregateEvent.MagicNumber;
    }
}

Notice the IDomainEvent<ExampleAggrenate, ExampleId, ExampleEvent> domainEvent argument. It’s merely a wrapper around the specific event we implemented earlier. The IDomainEvent<,,> provides additional information, e.g. any metadata stored alongside the event.

The main difference between the event instance emitted in the aggregate and the instance wrapped here, is that the event has been committed to the event store.

Next steps

Although the implementation in this guide enables you to create a complete application, there are several topics that are recommended as next steps.

Identity

The Identity<> value object provides generic functionality to create and validate the IDs of aggregate roots. It is basically a wrapper around a Guid.

Lets say we want to create a new identity named TestId. We could do it like this.

public class TestId : Identity<TestId>
{
  public TestId(string value)
   : base(value)
  {
  }
}
  • The identity follows the form {classname without "Id"}-{guid} e.g. test-c93fdb8c-5c9a-4134-bbcd-87c0644ca34f for the above TestId example

  • The internal Guid can be generated using one of the following methods/properties. Note that you can access the Guid factories directly by accessing the static methods on the GuidFactories class - New: Uses the standard Guid.NewGuid() - NewDeterministic(...): Creates a name-based Guid using the

    algorithm from RFC 4122 §4.3, which allows identities to be generated based on known data, (e.g. an user e-mail). It always returns the same identity for the same arguments

    • NewComb(): Creates a sequential Guid that can be used to avoid database fragmentation
  • A string can be tested to see if its a valid identity using the static bool IsValid(string) method

  • Any validation errors can be gathered using the static IEnumerable<string> Validate(string) method

Important

Its very important to name the constructor argument value as it is significant when the identity type is deserialized.

Here are some examples of how we can use our newly created TestId

// Uses the default Guid.NewGuid()
var testId = TestId.New
// Create a namespace, put this in a constant somewhere
var emailNamespace = Guid.Parse("769077C6-F84D-46E3-AD2E-828A576AAAF3");

// Creates an identity with the value "test-9181a444-af25-567e-a866-c263b6f6119a"
var testId = TestId.NewDeterministic(emailNamespace, "test@example.com");
// Creates a new identity every time, but an identity when used in
// database indexes, minimizes fragmentation
var testId = TestId.NewComb()

Note

Be sure to read the section about value objects as the Identity<> is basically a value object.

Aggregates

Initially before you can create an aggregate, you need to create its identity. You can create your own implementation by implementing the IIdentity interface or you can use a base class Identity<> that EventFlow provides, like this.

public class TestId : Identity<TestId>
{
  public TestId(string value) : base(value)
  {
  }
}

The Identity<> value object provides generic functionality to create and validate aggregate root IDs. Please read the documentation regarding the bundled Identity<> type as it provides several useful features, e.g. several different schemes for ID generation, including one that minimizes MSSQL database fragmentation.

Next, to create a new aggregate, simply inherit from AggregateRoot<,> like this, making sure to pass test aggregate own type as the first generic argument and the identity as the second.

public class TestAggregate : AggregateRoot<TestAggregate, TestId>
{
  public TestAggregate(TestId id)
    : base(id)
  {
  }
}

Events

In an event source system like EventFlow, aggregate root data are stored on events.

public class PingEvent : AggregateEvent<TestAggregate, TestId>
{
  public string Data { get; }

  public PingEvent(string data)
  {
      Data = data;
  }
}

Please make sure to read the section on value objects and events for some important notes on creating events.

Emitting events

In order to emit an event from an aggregate, call the protected Emit(...) method which applies the event and adds it to the list of uncommitted events.

public void Ping(string data)
{
  // Fancy domain logic here that validates aggregate state...

  if (string.IsNullOrEmpty(data))
  {
    throw DomainError.With("Ping data empty")
  }

  Emit(new PingEvent(data))
}

Remember not to make any changes to the aggregate with these methods, as the state is only stored through events.

Applying events

Currently EventFlow has three methods of applying events to the aggregate when emitted or loaded from the event store. Which you choose is up to you. Implementing IEmit<SomeEvent> is the most convenient, but will expose public Apply methods.

  • Create a method called Apply that takes the event as an argument. To get the method signature right, implement the IEmit<SomeEvent> on your aggregate. This is the default fallback and you will get an exception if no other strategies are configured. Although you can implement IEmit<SomeEvent>, it’s optional. The Apply methods can be protected or private
  • Create a state object by inheriting from AggregateState<,,> and registering using the protected Register(...) in the aggregate root constructor
  • Register a specific handler for a event using the protected Register<SomeEvent>(e => Handler(e)) from within the constructor
  • Register an event applier using Register(IEventApplier eventApplier), which could be a e.g. state object

Commands

Commands are the basic value objects, or models, that represent the write operations that you can perform in your domain. As described in more detail below a command is the “thing” to be done. A command handler does the “thing”.

As an example, one might implement this command for updating user passwords.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
public class UserUpdatePasswordCommand : Command<UserAggregate, UserId>
{
    public Password NewPassword { get; }
    public Password OldPassword { get; }

    public UserUpdatePasswordCommand(
        UserId id,
        Password newPassword,
        Password oldPassword)
        : base(id)
    {
        NewPassword = newPassword;
        OldPassword = oldPassword;
    }
}

Note that the Password class is merely a value object created to hold the password and do basic validation. Read the article regarding value objects for more information. Also, you don’t have to use the default EventFlow Command<,> implementation, you can create your own, it merely has to implement the ICommand<,> interface.

A command by itself doesn’t do anything and will throw an exception if published. To make a command work, you need to implement one (and only one) command handler which is responsible for invoking the aggregate.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
public class UserUpdatePasswordCommandHandler :
    CommandHandler<UserAggregate, UserId, IExecutionResult, UserUpdatePasswordCommand>
{
    public override Task<IExecutionResult> ExecuteCommandAsync(
        UserAggregate aggregate,
        UserUpdatePasswordCommand command,
        CancellationToken cancellationToken)
    {
        var executionResult = aggregate.UpdatePassword(
            command.OldPassword,
            command.NewPassword);

        return Task.FromResult(executionResult);
    }
}

Execution results

If the aggregate detects a domain error and wants to abort execution and return an error back, then execution results are used. EventFlow ships with a basic implementation that allows returning success or failed and optionally an error message as shown here.

ExecutionResult.Success();
ExecutionResult.Failed();
ExecutionResult.Failed("With some error");

However, you can create your own custom execution results to allow aggregates to e.g. provide detailed validation results. Merely implement the IExecutionResult interface and use the type as generic arguments on the command and command handler.

Note

While possible, do not use the execution results as a method of reading values from the aggregate, that’s what the IQueryProcessor and read models are for.

Ensure idempotency

Detecting duplicate operations can be hard, especially if you have a distributed application, or simply a web application. Consider the following simplified scenario.

  1. The user wants to change his password
  2. The user fills in the “change password form”
  3. The user submits the form twice, either accidentally or impatiently
  4. The first web request completes and the password is changed. However, as the browser is waiting on the second web request, this result is ignored
  5. The second web request throws a domain error as the “old password” doesn’t match as the current password has already been changed
  6. The user is presented with a error on the web page

Handling this is simple, merely ensure that the aggregate is idempotent in regards to password changes. But instead of implementing this yourself, EventFlow has support for it that is simple to utilize and is done per command.

To use the functionality, merely ensure that commands that represent the same operation have the same ISourceId which implements IIdentity like the example below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
blic class UserUpdatePasswordCommand : Command<UserAggregate, UserId>

public Password NewPassword { get; }
public Password OldPassword { get; }

public UserCreateCommand(
  UserId id,
  ISourceId sourceId,
  Password newPassword,
  Password oldPassword)
  : base(id, sourceId)
{
  NewPassword = newPassword;
  OldPassword = oldPassword;
}

Note the use on line 11 of the protected constructor of Command<,> that takes a ISourceId in addition to the aggregate root identity.

If a duplicate command is detected, a DuplicateOperationException is thrown. The application could then ignore the exception or report the problem to the end user.

The default ISourceId history size of the aggregate root, is ten. But it can be configured using the SetSourceIdHistory(...) method in the aggregate root constructor.

Easier ISourceId calculation

Ensuring the correct calculation of the command ISourceId can be somewhat cumbersome, which is why EventFlow provides another base command you can use, the DistinctCommand<,>. By using the DistinctCommand<,> you merely have to implement the GetSourceIdComponents() and providing the IEnumerable<byte[]> that makes the command unique. The bytes are used to create a deterministic GUID to be used as an ISourceId.

public class UserUpdatePasswordCommand :
  DistinctCommand<UserAggregate, UserId>
{
  public Password NewPassword { get; }
  public Password OldPassword { get; }

  public UserUpdatePasswordCommand(
    UserId id,
    Password newPassword,
    Password oldPassword)
    : base(id)
  {
    NewPassword = newPassword;
    OldPassword = oldPassword;
  }

  protected override IEnumerable<byte[]> GetSourceIdComponents()
  {
    yield return NewPassword.GetBytes();
    yield return OldPassword.GetBytes();
  }
}

The GetBytes() merely returns the Encoding.UTF8.GetBytes(...) of the password.

Caution

Don’t use the GetHashCode(), as the implementation can be different on 32 bit and 64 bit .NET (e.g. string).

Subscribers

Whenever your application needs to perform an action when a specific event is emitted from your domain, you create a class that implements one of the following two interfaces:

  • ISubscribeSynchronousTo<TAggregate,TIdentity,TEvent>: Executed synchronously
  • ISubscribeAsynchronousTo<TAggregate,TIdentity,TEvent>: Executed asynchronously

Any subscribers that you implement need to be registered to this interface using either AddSubscriber(...), AddSubscribers(...) or AddDefaults(...) during initialization. If you have configured a custom IoC container, you can register the implementations using it instead.

Note

The synchronous and asynchronous here has nothing to do with the .NET framework keywords async, await or the Task Parallel Library. It refers to how the subscribers are executed. Read below for details.

Synchronous subscribers

Synchronous subscribers in EventFlow are executed one at a time for each emitted domain event in order. This e.g. guarantees that all subscribers have been executed when the ICommandBus.PublishAsync(...) returns.

The ISubscribeSynchronousTo<,,> interface is shown here.

public interface ISubscribeSynchronousTo<TAggregate, in TIdentity, in TEvent>
    where TAggregate : IAggregateRoot<TIdentity>
    where TIdentity : IIdentity
    where TEvent : IAggregateEvent<TAggregate, TIdentity>
{
  Task HandleAsync(
    IDomainEvent<TAggregate, TIdentity, TEvent> domainEvent,
    CancellationToken cancellationToken);
}

Out of order events

As synchronous subscribers are by their very nature executed synchronously, emitting multiple events from an aggregate and letting subscribers publish new commands based on this can lead to some unexpected behavior as “innermost” subscribers will be executed before the next “outer” event is handled by the subscriber.

  1. Aggregate emits events Event 1 and Event 2
  2. Subscriber handles Event 1 and publishes a command that results in Event 3 being emitted
  3. Subscriber handles Event 3 (doesn’t affect the domain)
  4. Subscriber handles Event 2

In the above example the subscriber will handle the events in the following order Event 1, Event 3 and then Event 2. While this could occur in a distributed system or when executing subscribers on different threads, it is a certainty when using synchronous subscribers.

Exceptions swallowed by default

By default any exceptions thrown by a subscriber are swallowed by EventFlow after it has been logged as an error. Depending on the application this might be the preferred behavior, but in some cases it isn’t. If a subscriber exception should be thrown, and thus allowing them to be caught in e.g. command handlers, the behaivor can be disabled by setting the ThrowSubscriberExceptions to true as illustrated here:

using (var resolver = EventFlowOptions.New
  .Configure(c => c.ThrowSubscriberExceptions = true)
  .CreateResolver())
{
  ...
}

Asynchronous subscribers

Asynchronous subscribers in EventFlow are executed using a scheduled job.

Important

Asynchronous subscribers are disabled by default and must be enabled using the following configuration.

eventFlowOptions.Configure(c => c.IsAsynchronousSubscribersEnabled = true);

Important

Since asynchronous subscribers are executed using a job, its important to configure proper job scheduling. The EventFlow.Hangfire NuGet package integrates with the ‘HangFire Job Scheduler <https://www.hangfire.io>, and provides a usable solution to this requirement.

The ISubscribeAsynchronousTo<,,> is shown here and is, besides its name, identical to its synchronous counterpart.

public interface ISubscribeAsynchronousTo<TAggregate, in TIdentity, in TEvent>
    where TAggregate : IAggregateRoot<TIdentity>
    where TIdentity : IIdentity
    where TEvent : IAggregateEvent<TAggregate, TIdentity>
{
  Task HandleAsync(
    IDomainEvent<TAggregate, TIdentity, TEvent> domainEvent,
    CancellationToken cancellationToken);
}

Note

Setting ThrowSubscriberExceptions = true has no effect on asynchronous subscribers.

Subscribe to every event

Instead of subscribing to every single domain, you can register an implementation of ISubscribeSynchronousToAll which is defined as shown here.

public interface ISubscribeSynchronousToAll
{
    Task HandleAsync(
        IReadOnlyCollection<IDomainEvent> domainEvents,
        CancellationToken cancellationToken);
}

Any registered implementations will be notified for every domain event emitted.

RabbitMQ

See RabbitMQ setup for details on how to get started using RabbitMQ.

After RabbitMQ has been configured, all domain events are published to an exchange named eventflow with routing keys in the following format.

eventflow.domainevent.[Aggregate name].[Event name].[Event version]

Which will be the following for an event named CreateUser version 1 for the MyUserAggregate.

eventflow.domainevent.my-user.create-user.1

Note the lowercasing and adding of - whenever there’s a capital letter.

All the above is the default behavior. If you don’t like it, replace the registered message factory service IRabbitMqMessageFactory to customize what routing key or exchange to use. Have a look at how EventFlow has done its implementation to get started.

Metadata

Metadata is all the “additional” information that resides with a emitted event, some of which is required information.

In EventFlow metadata is merely an IEnumerable of KeyValuePair<string,string> for which each is a metadata entry.

Out of the box these metadata keys are added to each aggregate event.

  • event_name and event_version - A name and version for the event which is used during event deserialization.
  • timestamp - A DateTimeOffset for when the event was emitted from the aggregate.
  • aggregate_sequence_number - The version the aggregate was after the event was emitted, e.g. 1 for the very first event emitted.

Custom metadata provider

If you require additional information to be stored along with each event, then you can implement the IMetadataProvider interface and register the class using e.g. .AddMetadataProvider(...) on EventFlowOptions.

Additional built-in providers

EventFlow ships with a collection of ready-to-use providers in some of its NuGet packages.

EventFlow

  • AddEventTypeMetadataProvider
  • event_type_assembly_name - Assembly name of the assembly containing the event
  • event_type_assembly_version - Assembly version of the assembly containing the event
  • event_type_fullname - Full name of the event corresponding to Type.FullName for the aggregate event type.
  • AddGuidMetadataProvider
  • guid - A new Guid for each event.
  • AddMachineNameMetadataProvider
  • environment_machinename - Adds the machine name handling the event from Environment.MachineName

EventFlow.Owin

  • AddRequestHeadersMetadataProvider
  • request_header[HEADER] - Adds all headers from the OWIN request as metadata, each as a separate entry for which HEADER is replaced with the name of the header. E.g. the request_header[Connection] might contain the value Keep-Alive.
  • AddUriMetadataProvider
  • request_uri - OWIN request URI.
  • request_method - OWIN request method.
  • AddUserHostAddressMetadataProvider
  • user_host_address - The provider tries to find the correct user host address by inspecting request headers, i.e., if you have a load balancer in front of your application, then the request IP is not the real user address, but the load balancer should send the user IP as a header.
  • user_host_address_source_header - The header from which the user host address was taken.
  • remote_ip_address - The remote IP address. Note that this might be the IP address of your load balancer.

Queries

Creating queries in EventFlow is simple.

First create a value object that contains the data required for the query. In this example we want to search for users based on their username.

public class GetUserByUsernameQuery : IQuery<User>
{
  public string Username { get; }

  public GetUserByUsernameQuery(string username)
  {
    Username = username;
  }
}

Next create a query handler that implements how the query is processed.

public class GetUserByUsernameQueryHandler :
  IQueryHandler<GetUserByUsernameQuery, User>
{
  private IUserReadModelRepository _userReadModelRepository;

  public GetUserByUsernameQueryHandler(
    IUserReadModelRepository userReadModelRepository)
  {
    _userReadModelRepository = userReadModelRepository;
  }

  Task<User> ExecuteQueryAsync(
    GetUserByUsernameQuery query,
    CancellationToken cancellationToken)
  {
    return _userReadModelRepository.GetByUsernameAsync(
      query.Username,
      cancellationToken)
  }
}

Last step is to register the query handler in EventFlow. Here we show the simple, but cumbersome version, you should use one of the overloads that scans an entire assembly.

...
EventFlowOptions.New
  .AddQueryHandler<GetUserByUsernameQueryHandler, GetUserByUsernameQuery, User>()
...

Then in order to use the query in your application, you need a reference to the IQueryProcessor, which in our case is stored in the _queryProcessor field.

...
var user = await _queryProcessor.ProcessAsync(
  new GetUserByUsernameQuery("root")
  cancellationToken)
  .ConfigureAwait(false);
...

Queries shipped with EventFlow

  • ReadModelByIdQuery<TReadModel>: Supported by both the in-memory and MSSQL read model stores automatically as soon as you define the read model use using the EventFlow options for that store
  • InMemoryQuery<TReadModel>: Takes a Predicate<TReadModel> and returns IEnumerable<TReadModel>, making it possible to search all of your in-memory read models based on any predicate

Sagas

EventFlow provides a simple saga system to coordinate messages between bounded contexts and aggregates.

  • Saga identity
  • Saga
  • Saga locator
  • Zero or more aggregates

This example is based on the chapter “A Saga on Sagas” from the CQRS Journey by Microsoft, in which we want to model the process of placing an order.

  1. User sends command PlaceOrder to the OrderAggregate
  2. OrderAggregate emits an OrderCreated event
  3. OrderSaga handles OrderCreated by sending a MakeReservation command to the ReservationAggregate
  4. ReservationAggregate emits a SeatsReserved event
  5. OrderSaga handles SeatsReserved by sending a MakePayment command to the PaymentAggregate
  6. PaymentAggregate emits a PaymentAccepted event
  7. OrderSaga handles PaymentAccepted by emitting a OrderConfirmed event with all the details, which via subscribers updates the user, the OrderAggregate and the ReservationAggregate

Next we need an ISagaLocator which basically maps domain events to a saga identity allowing EventFlow to find it in its store.

In our case we will add the order ID to event metadata of all events related to a specific order.

public class OrderSagaLocator : ISagaLocator
{
  public Task<ISagaId> LocateSagaAsync(
    IDomainEvent domainEvent,
    CancellationToken cancellationToken)
  {
    var orderId = domainEvent.Metadata["order-id"];
    var orderSagaId = new OrderSagaId($"ordersaga-{orderId}");

    return Task.FromResult<ISagaId>(orderSagaId);
  }
}

Alternatively the order identity could be added to every domain event emitted from the OrderAggregate, ReservationAggregate and PaymentAggregate aggregates that the OrderSaga subscribes to, but this would depend on whether or not the order identity is part of the ubiquitous language for your domain.

public class OrderSaga
  : AggregateSaga<OrderSaga, OrderSagaId, OrderSagaLocator>,
    ISagaIsStartedBy<OrderAggregate, OrderId, OrderCreated>
{
  public Task HandleAsync(
      IDomainEvent<OrderAggregate, OrderId, OrderCreated> domainEvent,
      ISagaContext sagaContext,
      CancellationToken cancellationToken)
  {
    // Update saga state with useful details.
    Emit(new OrderStarted(/*...*/));

    // Make the reservation
    Publish(new MakeReservation(/*...*/));

    return Task.FromResult(0);
  }

  public void Apply(OrderStarted e)
  {
    // Update our aggregate state with relevant order details
  }
}

Important

Even though the method for publishing commands is named Publish, the commands are only published to the command bus after the aggregate has been successfully committed to the event store (just like events). If an unexpected exception is thrown by this command publish, it should be handled by a custom implementation of ISagaErrorHandler.

The next few events and commands are omitted in this example, but at last the PaymentAggregate emits its PaymentAccepted event and the saga completes and emits the final OrderConfirmed event.

public class OrderSaga
  : AggregateSaga<OrderSaga, OrderSagaId, OrderSagaLocator>,
    ...
    ISagaHandles<PaymentAggregate, PaymentId, PaymentAccepted>
{

  ...

  public Task HandleAsync(
      IDomainEvent<PaymentAggregate, PaymentId, PaymentAccepted> domainEvent,
      ISagaContext sagaContext,
      CancellationToken cancellationToken)
  {
    Emit(new OrderConfirmed(/*...*/))
  }

  public void Apply(OrderConfirmed e)
  {
    // As this is the last event, we complete the saga by calling Complete()
    Complete();
  }
}

Note

An AggregateSaga<,,> is only considered in its running state if there has been an event and it hasn’t been marked as completed (by invoking the protected Complete() method on the AggregateSaga<,,>).

Alternative saga store

By default EventFlow is configured to use event sourcing and aggregate roots for storage of sagas. However, you can implement your own storage system by implementing ISagaStore and registering it.

Jobs

A job is basically a task that you want to execute outside of the current context, on another server or at a later time. EventFlow provides basic functionality for jobs.

There are areas where you might find jobs very useful, here are some examples

  • Publish a command at a specific time in the future
  • Transient error handling
var jobScheduler = resolver.Resolve<IJobScheduler>();
var job = PublishCommandJob.Create(new SendEmailCommand(id), resolver);
await jobScheduler.ScheduleAsync(
  job,
  TimeSpan.FromDays(7),
  CancellationToken.None)
  .ConfigureAwait(false);

In the above example the SendEmailCommand command will be published in seven days.

Important

When working with jobs, you should be aware of the following

  • The default implementation does executes the job now (completly ignoring runAt/delay parameters) and in the current context. To get support for scheduled jobs, inject another implementation of IJobScheduler, e.g. by installing EventFlow.Hangfire (Read below for details).
  • Your jobs should serialize to JSON properly, see the section on value objects for more information
  • If you use the provided PublishCommandJob, make sure that your commands serialize properly as well

Create your own jobs

To create your own jobs, your job merely needs to implement the IJob interface and be registered in EventFlow.

Here’s an example of a job implementing IJob

[JobVersion("LogMessage", 1)]
public class LogMessageJob : IJob
{
  public LogMessageJob(string message)
  {
    Message = message;
  }

  public string Message { get; }

  public Task ExecuteAsync(
    IResolver resolver,
    CancellationToken cancellationToken)
  {
    var log = resolver.Resolve<ILog>();
    log.Debug(Message);
  }
}

Note that the JobVersion attribute specifies the job name and version to EventFlow and this is how EventFlow distinguishes between the different job types. This makes it possible for you to reorder your code, even rename the job type. As long as you keep the same attribute values it is considered the same job in EventFlow. If the attribute is omitted, the name will be the type name and version will be 1.

Here’s how the job is registered in EventFlow.

var resolver = EventFlowOptions.new
  .AddJobs(typeof(LogMessageJob))
  ...
  .CreateResolver();

Then to schedule the job

var jobScheduler = resolver.Resolve<IJobScheduler>();
var job = new LogMessageJob("Great log message");
await jobScheduler.ScheduleAsync(
  job,
  TimeSpan.FromDays(7),
  CancellationToken.None)
  .ConfigureAwait(false);

Hangfire

To use Hangfire as the job scheduler, install the NuGet package EventFlow.Hangfire and configure EventFlow to use the scheduler like this.

var resolver = EventFlowOptions.new
  .UseHangfireJobScheduler() // This line
  ...
  .CreateResolver();

Note

The UseHangfireJobScheduler() doesn’t do any Hangfire configuration, but merely registers the proper scheduler in EventFlow.

Event upgrade

At some point you might find the need to replace an event with zero or more events. Some use cases might be

  • A previous application version introduced a domain error in the form of a wrong event being emitted from the aggregate
  • Domain has changed, either from a change in requirements or simply from a better understanding of the domain

EventFlow event upgraders are invoked whenever the event stream is loaded from the event store. Each event upgrader receives the entire event stream one event at a time.

A new instance of an event upgrader is created each time an aggregate is loaded. This enables you to store information from previous events on the upgrader instance to be used later, e.g. to determine an action to take on a event or to provide additional information for a new event.

Note that the ordering of event upgraders is important as you might implement two upgraders, one to upgrade an event from V1 to V2 and then another upgrading V2 to V3. EventFlow orders the event upgraders by name before starting the event upgrade.

Caution

Be careful when working with event upgraders that return zero or more than one event, as this has an influence on the aggregate version and you need to make sure that the aggregate sequence number on upgraded events are valid in regard to the aggregate history.

Example - removing a damaged event

To remove an event, simply check and only return the event if its not the event you want to remove.

public class DamagedEventRemover : IEventUpgrader<MyAggregate, MyId>
{
  public IEnumerable<IDomainEvent<TestAggregate, TestId>> Upgrade(
    IDomainEvent<TestAggregate, TestId> domainEvent)
  {
    var damagedEvent = domainEvent as IDomainEvent<MyAggregate, MyId, DamagedEvent>;
    if (damagedEvent == null)
    {
      yield return domainEvent;
    }
  }
}

Example - replace event

To upgrade one event to another, you should use the IDomainEventFactory.Upgrade to help migrate metadata and create the new event.

public class UpgradeMyEventV1ToMyEventV2 : IEventUpgrader<MyAggregate, MyId>
{
  private readonly IDomainEventFactory _domainEventFactory;

  public UpgradeTestEventV1ToTestEventV2(IDomainEventFactory domainEventFactory)
  {
    _domainEventFactory = domainEventFactory;
  }

  public IEnumerable<IDomainEvent<TestAggregate, TestId>> Upgrade(
    IDomainEvent<TestAggregate, TestId> domainEvent)
  {
    var myEventV1 = domainEvent as IDomainEvent<MyAggregate, MyId, MyEventV1>;
    yield return myEventV1 == null
      ? domainEvent
      : _domainEventFactory.Upgrade<MyAggregate, MyId>(
        domainEvent, new MyEventV2());
  }
}

Event stores

By default EventFlow uses an in-memory event store. But EventFlow provides support for alternatives.

In-memory

Important

In-memory event store shouldn’t be used for production environments, only for tests.

Using the in-memory event store is easy as it’s enabled by default, no need to do anything.

MSSQL event store

See MSSQL setup for details on how to get started using Microsoft SQL Server in EventFlow.

To configure EventFlow to use MSSQL as the event store, simply add the UseMssqlEventStore() as shown here.

IRootResolver rootResolver = EventFlowOptions.New
  ...
  .UseMssqlEventStore()
  ...
  .CreateResolver();

Create and migrate required MSSQL databases

Before you can use the MSSQL event store, the required database and tables must be created. The database specified in your MSSQL connection will not be automatically created, you have to do this yourself.

To make EventFlow create the required tables, execute the following code.

var msSqlDatabaseMigrator = rootResolver.Resolve<IMsSqlDatabaseMigrator>();
EventFlowEventStoresMsSql.MigrateDatabase(msSqlDatabaseMigrator);

You should do this either on application start or preferably upon application install or update, e.g., when the web site is installed.

Important

If you utilize user permission in your application, then you need to grant the event writer access to the user defined table type eventdatamodel_list_type. EventFlow uses this type to pass entire batches of events to the database.

PostgreSql event store

Basically all above on MS SQL server store applicable to PostgreSql. See MSSQL setup for setup documentation.

Mongo DB

See Mongo DB setup for details on how to get started using Mongo DB in EventFlow.

To configure EventFlow to use Mongo DB as the event store, simply add the UseMongoDbEventStore() as shown here.

IRootResolver rootResolver = EventFlowOptions.New
    ...
    .UseMongoDbEventStore()
    ...
    .CreateResolver();

Files

Important

The Files event store shouldn’t be used for production environments, only for tests.

The file based event store is useful if you have a set of events that represents a certain scenario and would like to create a test that verifies that the domain handles it correctly.

To use the file based event store, simply invoke .UseFilesEventStore`("...") with the path containing the files.

var storePath = @"c:\eventstore"
var rootResolver = EventFlowOptions.New
  ...
  .UseFilesEventStore(FilesEventStoreConfiguration.Create(storePath))
  ...
  .CreateResolver();

Read model stores

In order to create query handlers that perform and enable them search across multiple fields, read models or projections are used.

To get started you can use the built-in in-memory read model store, but EventFlow supports a few others as well.

Creating read models

Read models are a flattened view of a subset or all aggregate domain events created specifically for efficient queries.

Here’s a simple example of how a read model for doing searches for usernames could look. The read model handles the UserCreated domain event to get the username and user ID.

public class UserReadModel : IReadModel,
  IAmReadModelFor<UserAggregate, UserId, UserCreated>
{
  public string UserId { get; set; }
  public string Username { get; set; }

  public void Apply(
    IReadModelContext context,
    IDomainEvent<UserAggregate, UserId, UserCreated> domainEvent)
  {
    UserId = domainEvent.AggregateIdentity.Value;
    Username = domainEvent.AggregateEvent.Username.Value;
  }
}

The read model applies all UserCreated events and thereby merely saves the latest value instead of the entire history, which makes it much easier to store in an efficient manner.

Read model locators

Typically the ID of read models are the aggregate identity, but sometimes this isn’t the case. Here are some examples.

  • Items from a collection on the aggregate root
  • Deterministic ID created from event data
  • Entity within the aggregate

To create read models in these cases, use the EventFlow concept of read model locators, which is basically a mapping from a domain event to a read model ID.

As an example, consider if we could add several nicknames to a user. We might have a domain event called UserNicknameAdded similar to this.

public class UserNicknameAdded : AggregateEvent<UserAggregate, UserId>
{
  public Nickname Nickname { get; set; }
}

We could then create a read model locator that would return the ID for each nickname we add via the event like this.

public class UserNicknameReadModelLocator : IReadModelLocator
{
  public IEnumerable<string> GetReadModelIds(IDomainEvent domainEvent)
  {
    var userNicknameAdded = domainEvent as
      IDomainEvent<UserAggregate, UserId, UserNicknameAdded>;
    if (userNicknameAdded == null)
    {
      yield break;
    }

    yield return userNicknameAdded.Nickname.Id;
  }
}

And then use a read model similar to this that represents each nickname.

public class UserNicknameReadModel : IReadModel,
  IAmReadModelFor<UserAggregate, UserId, UserNicknameAdded>
{
  public string UserId { get; set; }
  public string Nickname { get; set; }

  public void Apply(
    IReadModelContext context,
    IDomainEvent<UserAggregate, UserId, UserNicknameAdded> domainEvent)
  {
    UserId = domainEvent.AggregateIdentity.Value;
    Nickname = domainEvent.AggregateEvent.Nickname.Value;
  }
}

You may need to assign the id of your readmodel from a batch of nicknames assigned on the creation event of the username. You would then read the assigned readmodel id acquired from the locator using the ‘context’ field:

public class UserNicknameReadModel : IReadModel,
  IAmReadModelFor<UserAggregate, UserId, UserCreatedEvent>
{
  public string Id { get; set; }
  public string UserId { get; set; }
  public string Nickname { get; set; }

  public void Apply(
    IReadModelContext context,
    IDomainEvent<UserAggregate, UserId, UserCreatedEvent> domainEvent)
  {
    var id = context.ReadModelId;
    UserId = domainEvent.AggregateIdentity.Value;
    var nickname = domainEvent.AggregateEvent.Nicknames.Single(n => n.Id == id);

    Id = nickname.Id;
    Nickname = nickname.Nickname;
  }
}

We could then use this nickname read model to query all the nicknames for a given user by search for read models that have a specific UserId.

Read store implementations

EventFlow has built-in support for several different read model stores.

In-memory

The in-memory read store is easy to use and easy to configure. All read models are stored in-memory, so if EventFlow is restarted all read models are lost.

To configure the in-memory read model store, simply call UseInMemoryReadStoreFor<> or UseInMemoryReadStoreFor<,> with your read model as the generic argument.

var resolver = EventFlowOptions.New
  ...
  .UseInMemoryReadStoreFor<UserReadModel>()
  .UseInMemoryReadStoreFor<UserNicknameReadModel,UserNicknameReadModelLocator>()
  ...
  .CreateResolver();

Microsoft SQL Server

To configure the MSSQL read model store, simply call UseMssqlReadModel<> or UseMssqlReadModel<,> with your read model as the generic argument.

var resolver = EventFlowOptions.New
  ...
  .UseMssqlReadModel<UserReadModel>()
  .UseMssqlReadModel<UserNicknameReadModel,UserNicknameReadModelLocator>()
  ...
  .CreateResolver();

By convention, EventFlow uses the table named ReadModel-[CLASS NAME] as the table to store the read model rows in. If you need to change this, use the Table from the System.ComponentModel.DataAnnotations.Schema namespace. So in the above example, the read model UserReadModel would be stored in a table called ReadModel-UserReadModel unless stated otherwise.

To allow EventFlow to find the read models stored, a single column is required to have the MsSqlReadModelIdentityColumn attribute. This will be used to store the read model ID.

You should also create an int column that has the MsSqlReadModelVersionColumn attribute to tell EventFlow which column the read model version is stored in.

Important

EventFlow expects the read model to exist, and thus any maintenance of the database schema for the read models must be handled before EventFlow is initialized. Or, at least before the read models are used in EventFlow.

Elasticsearch

To configure the Elasticsearch read model store, simply call UseElasticsearchReadModel<> or UseElasticsearchReadModel<,> with your read model as the generic argument.

var resolver = EventFlowOptions.New
  ...
  .ConfigureElasticsearch(new Uri("http://localhost:9200/"))
  ...
  .UseElasticsearchReadModel<UserReadModel>()
  .UseElasticsearchReadModel<UserNicknameReadModel,UserNicknameReadModelLocator>()
  ...
  .CreateResolver();

Overloads of ConfigureElasticsearch(...) are available for alternative Elasticsearch configurations.

Important

Make sure to create any mapping the read model requires in Elasticsearch before using the read model in EventFlow.

If EventFlow receives a request to purge a specific read model, it does it by deleting the index. This means that a separate index should be created for each read model.

If you want to control the index a specific read model is stored in, create an implementation of IReadModelDescriptionProvider and register it in the EventFlow IoC.

Mongo DB

To configure the Mongo DB read model store, call UseMongoDbReadModel<> or UseMongoDbReadModel<,> with your read model as the generic argument.

var resolver = EventFlowOptions.New
  ...
  .UseMongoDbReadModel<UserReadModel>()
  .UseMongoDbReadModel<UserNicknameReadModel,UserNicknameReadModelLocator>()
  ...
  .CreateResolver();

Microsoft SQL Server

To setup EventFlow Microsoft SQL Server integration, install the NuGet package EventFlow.MsSql and add this to your EventFlow setup.

IRootResolver rootResolver = EventFlowOptions.New
  .ConfigureMsSql(MsSqlConfiguration.New
    .SetConnectionString(@"Server=.\SQLEXPRESS;Database=MyApp;User Id=sa;Password=???"))
  ...
  .CreateResolver();

After setting up Microsoft SQL Server support in EventFlow, you can continue to configure it.

PostgreSql

To setup EventFlow PostgreSql integration, install the NuGet package [EventFlow.PostgreSql](https://www.nuget.org/packages/EventFlow.PostgreSql) and add this to your EventFlow setup.

IRootResolver rootResolver = EventFlowOptions.New
  .ConfigurePostgreSql(PostgreSqlConfiguration.New
    .SetConnectionString(@"User ID=me;Password=???;Host=localhost;Port=5432;Database=MyApp"))
  .UsePostgreSqlEventStore()
  .UsePostgreSqlSnapshotStore()
  .UsePostgreSqlReadModel<UserReadModel>()
  .UsePostgreSqlReadModel<UserNicknameReadModel,UserNicknameReadModelLocator>()
  .
  ...
  .CreateResolver();

This code block configures Eventflow to store events, snapshots and read models in PostgreSql. It’s not mandatory, you can mix and match, i.e. storing events in PostgreSql, read models in Elastic search and don’t using snapshots at all.

  • Event store. One big table EventFlow for all events for all aggredates.
  • Read model store. Table ReadModel-[ClassName] per read model type.
  • Snapshot store. One big table EventFlowSnapshots for all aggredates.

RabbitMQ

To setup EventFlow’s RabbitMQ integration, install the NuGet package EventFlow.RabbitMQ and add this to your EventFlow setup.

var uri = new Uri("amqp://localhost");

var resolver = EventFlowOptions.with
  .PublishToRabbitMq(RabbitMqConfiguration.With(uri))
  ...
  .CreateResolver();

After setting up RabbitMQ support in EventFlow, you can continue to configure it.

Configuration

EventFlow can be configured by invoking eventFlowOptions.Configure(c => ...)`, or by providing a custom implementation of IEventFlowConfiguration.

Each configuration is using XML documentation. The default values should be good enough for most production setups.

IoC container

EventFlow has a custom minimal IoC container implementation, but before using EventFlow in a production environment, its recommended to change to Autofac or provide another.

Autofac

EventFlow provides the NuGet package EventFlow.Autofac that allows you to set the internal ContainerBuilder used during EventFlow initialization.

Pass the ContainerBuilder to EventFlow and call CreateContainer() when configuration is done to create the container.

var containerBuilder = new ContainerBuilder();

var container = EventFlowOptions.New
  .UseAutofacContainerBuilder(containerBuilder) // Must be the first line!
  ...
  .CreateContainer();

Log

The default log implementation of EventFLow logs to the console. To have another behavior, register an implementation of ILog, use the Log as a base class to make the implementation easier.

Snapshots

When working with long-lived aggregates, performance when loading aggregates, and thereby making changes to them, becomes a real concern. Consider aggregates that are comprised of several thousands of events, some of which needs to go through a rigorous update process before they are applied to the aggregates.

EventFlow supports aggregate snapshots, which is basically a capture of the entire aggregate state every few events. So instead of loading the entire aggregate event history, the latest snapshot is loaded, then applied to the aggregate and then the remaining events that were not captured in the snapshot.

To configure an aggregate root to support snapshots, inherit from SnapshotAggregateRoot<,,> and define a serializable snapshot type that is marked with the ISnapshot interface.

[SnapshotVersion("user", 1)]
public class UserSnapshot : ISnapshot
{
  ...
}

public class UserAggregate :
  SnapshotAggregateRoot<UserAggregate, UserId, UserSnapshot>
{
  protected override Task<UserSnapshot> CreateSnapshotAsync(
    CancellationToken cancellationToken)
  {
    // Create a UserSnapshot based on the current aggregate state
    ...
  }

  protected override Task LoadSnapshotAsync(
    UserSnapshot snapshot,
    ISnapshotMetadata metadata,
    CancellationToken cancellationToken)
  {
    // Load the UserSnapshot into the current aggregate
    ...
  }
}

When using aggregate snapshots there are several important details to remember

  • Aggregates must not make any assumptions regarding the existence of snapshots
  • Aggregates must not assume that snapshots are created with increasing aggregate sequence numbers
  • Snapshots must be created in such a way, that they represent the entire history up to the point of snapshot creation

Snapshot strategy

When implementing an aggregate root that inherits from SnapshotAggregateRoot<,,>, you need to pass the base class an implementation of ISnapshotStrategy. The strategy is used to determine when a snapshot should be created, e.g. every 100 events.

EventFlow ships with two that should be enough for most purposes as they can be configured.

  • SnapshotEveryFewVersionsStrategy: Snapshots are created after a predefined number of events, the default is 100, but another frequency can be specified
  • SnapshotRandomlyStrategy: Snapshots are created randomly with a predefined chance, the default is 1%, but another can be specified

Upgrading snapshots

As an application grows over time, the data required to be stored within a snapshot will change. Either because some become obsolete or merely because a better way of storing the aggregate state is found. If this happens, the snapshots persisted in the snapshot store could potentially become useless as aggregates are unable to apply them. The easy solution would be to make change-by-addition and make sure that the old snapshots can be deserialized into the new version.

EventFlow provides an alternative solution, which is basically allowing developers to upgrade snapshots similar to how events are upgraded.

Lets say we have an application that has developed three snapshot versions over time.

[SnapshotVersion("user", 1)]
public class UserSnapshotV1 : ISnapshot
{
  ...
}

[SnapshotVersion("user", 2)]
public class UserSnapshotV2 : ISnapshot
{
  ...
}

[SnapshotVersion("user", 3)]
public class UserSnapshot : ISnapshot
{
  ...
}

Note how version three of the UserAggregate snapshot is called UserSnapshot and not UserSnapshotV3, its basically to help developers tell which snapshot version is the current one.

Remember to add the [SnapshotVersion] attribute as it enables control of the snapshot definition name. If left out, EventFlow will make a guess, which will be tied to the name of the class type.

The next step will be to implement upgraders, or mappers, that can upgrade one snapshot to another.

public class UserSnapshotV1ToV2Upgrader :
  ISnapshotUpgrader<UserSnapshotV1, UserSnapshotV2>
{
    public Task<UserSnapshotV2> UpgradeAsync(
      UserSnapshotV1 userSnapshotV1,
      CancellationToken cancellationToken)
    {
      // Map from V1 to V2 and return
    }
}

public class UserSnapshotV2ToV3Upgrader :
  ISnapshotUpgrader<UserSnapshotV2, UserSnapshot>
{
    public Task<UserSnapshot> UpgradeAsync(
      UserSnapshotV2 userSnapshotV2,
      CancellationToken cancellationToken)
    {
      // Map from V2 to V3 and return
    }
}

The snapshot types and upgraders then only needs to be registered in EventFlow.

var resolver = EventFlowOptions.New
  ...
  .AddSnapshotUpgraders(myAssembly)
  .AddSnapshots(myAssembly)
  ...
  .CreateResolver();

Now, whenever a snapshot is loaded from the snapshot store, it is automatically upgraded to the latest version and the aggregate only needs to concern itself with the latest version.

Snapshot store implementations

EventFlow has built-in support for some snapshot stores (more will be implemented).

Null (or none)

The default implementation used by EventFlow does absolutely nothing besides logging a warning if used. It exists only to help developers to select a proper snapshot store. Making in-memory the default implementation could present problems if snapshots were configured, but the snapshot store configuration forgotten.

In-memory

For testing, or small applications, the in-memory snapshot store is configured by merely calling UseInMemorySnapshotStore().

var resolver = EventFlowOptions.New
  ...
  .UseInMemorySnapshotStore()
  ...
  .CreateResolver();

Microsoft SQL Server

To use the MSSQL snapshot store you need to install the NuGet package EventFlow.MsSql.

Configuration

Configure the MSSQL connection and snapshot store as shown here.

var rootResolver = EventFlowOptions.New
  ...
  .ConfigureMsSql(MsSqlConfiguration.New
    .SetConnectionString(@"Server=.\SQLEXPRESS;Database=MyApp;User Id=sa;Password=???"))
  .UseMsSqlSnapshotStore()
  ...
  .CreateResolver();

Note that if you already use MSSQL for event- or read model store, you only need to invoke the ConfigureMsSql extension once.

Create and migrate required MSSQL databases

Before you can use the MSSQL snapshot store, the required database and tables must be created. The database specified in your MSSQL connection will not be automatically created, you have to do this yourself.

To make EventFlow create the required tables, execute the following code.

var msSqlDatabaseMigrator = rootResolver.Resolve<IMsSqlDatabaseMigrator>();
EventFlowSnapshotStoresMsSql.MigrateDatabase(msSqlDatabaseMigrator);

You should do this either on application start or preferably upon application install or update, e.g., when the web site is installed.

Custom

If none of the above stores are adequate, a custom implementation is possible by implementing the interface ISnapshotPersistence. However, there are some rules that the snapshot persistence store must follow.

  • Its valid to store snapshots in any order, e.g. first version 3 then 2
  • Its valid to overwrite existing snapshots version, e.g. storing version 3 then version 3 again
  • Fallback to old snapshots is allowed

Customize

If you are looking for how to configure EventFlow, look at the configuration documentation.

Whenever EventFlow doesn’t meet your needs, e.g. if you want to collect statistics on each command execution time, you can customize EventFlow.

Basically EventFlow relies on an IoC container to allow developers to customize the different parts of EventFlow.

Note: Read the section “Changing IoC container” for details on how to change the IoC container used if you have specific needs like e.g. integrating EventFlow into an Owin application.

You have two options for when you want to customize EventFlow

  • Decorate an implementation
  • Replace an implementation

Decorating implementations

In the case of collecting statistics, you might want to wrap the existing ICommandBus with a decorator class that can collect statistics on command execution times.

void ConfigureEventFlow()
{
  var resolver = EventFlowOptions.new
    .RegisterServices(DecorateCommandBus)
    ...
    .CreateResolver();
}

void DecorateCommandBus(IServiceRegistration sr)
{
  sr.Decorate<ICommandBus>((r, cb) => new StatsCommandBus(cb));
}

class StatsCommandBus : ICommandBus
{
  private readonly ICommandBus _internalCommandBus;

  public StatsCommandBus(ICommandBus commandBus)
  {
    _internalCommandBus = commandBus;
  }

  // Here follow implementations of ICommandBus that call the
  // internal command bus and logs statistics
  ...
}

Registering new implementations

The more drastic step is to completely replace an implementation. For this you use the Register(...) and related methods on IServiceRegistration instead of the Decorate(...) method.

Event serialization and value objects

One of the important parts of creating an event sourced application is to ensure that you can always read your event streams. It seems simple enough, but it is a problem, especially for large applications that undergo refactoring or domain changes.

The basic idea is to store events in a structure that’s easy to access and migrate if the need should arise. EventFlow, like many other event sourced systems, stores its events using JSON.

Making pretty and clean JSON

You might wonder “but, why?”, and the reason is somewhat similar to the reasoning behind semantic URLs.

Consider the following value object used to validate and contain usernames in an application.

public class Username
{
  public string Value { get; }

  public Username(string value)
  {
    if (string.IsNullOrEmpty(value) || value.Length <= 4)
    {
      throw DomainError.With($"Invalid username '{value}'");
    }

    Value = value;
  }
}

First we do some cleanup and re-write it using EventFlows SingleValueObject<>.

public class Username : SingleValueObject<string>
{
  public Username(string value) : base(value)
  {
    if (string.IsNullOrEmpty(value) || value.Length <= 4)
    {
      throw DomainError.With($"Invalid username '{value}'");
    }
  }
}

Now it looks simple and we might think we can use this value object directly in our domain events. We could, but the resulting JSON will look like this.

{
  "Username" : {
    "Value": "my-awesome-username",
  }
}

This doesn’t look very good. First, that extra property doesn’t make it easier to read and it takes up more space when serializing and transmitting the event.

In addition, if you use the value object in a web API, people using the API will need to wrap the properties in their DTOs in a similat way. What we would like is to modify our serialized event to look like this instead and still use the value object in our events.

{
  "Username" : "my-awesome-username"
}

To do this, we use the custom JSON serializer EventFlow has for single value objects called SingleValueObjectConverter on our Username class like this.

[JsonConverter(typeof(SingleValueObjectConverter))] // Only this line added
public class Username : SingleValueObject<string>
{
  public Username(string value) : base(value)
  {
    if (string.IsNullOrEmpty(value) || value.Length <= 4)
    {
      throw DomainError.With($"Invalid username '{value}'");
    }
  }
}

The JSON converter understands the single value object and will serialize and deserialize it correctly.

Using this converter also enables to you replace e.g. raw string and int properties with value objects on existing events as they will be “JSON compatible”.

Note

Consider applying this to any classes that inherit from Identity<>.

Do’s and don’ts

Whenever creating an application that uses CQRS+ES there are several things you need to keep in mind to make it easier and minimize the potential bugs. This guide will give you some details on typical problems and how EventFlow can help you minimize the risk.

Business rules

Specifications

Consider moving complex business rules to specifications. This eases both readability, testability and re-use.

Events

Produce clean JSON

Make sure that when your aggregate events are JSON serialized, they produce clean JSON as it makes it easier to work with and enables easier deserialization of events in the future.

  • No type information
  • No hints of value objects (see value objects)

Here’s an example of good clean event JSON produced from a create user event.

{
  "Username": "root",
  "PasswordHash": "1234567890ABCDEF",
  "EMail": "root@example.org",
}

Keep old event types

Keep in mind that you need to keep the event types in your code for as long as these events are in the event source, which in most cases is forever as storage is cheap and information, i.e., your domain events, are expensive.

However, you should still clean your code. Have a look at how you can upgrade and version your events for details on how EventFlow supports you in this.

Subscribers and out of order events

Be very careful if aggregates emit multiple events for a single command, subscribers will almost certainly receive these out of order.

Specifications

EventFlow ships with an implementation of the specification pattern which could be used to e.g. make complex business rules easier to read and test.

To use the specification implementation shipped with EventFlow, simply create a class that inherits from Specification<T>.

public class BelowFiveSpecification : Specification<int>
{
    protected override IEnumerable<string> IsNotSatisfiedBecause(int i)
    {
        if (5 <= i)
        {
            yield return string.Format("{0} is not below five", i);
        }
    }
}

Note that instead of simply returning a bool to indicate whether or not the specification is satisfied, this implementation requires a reason (or reasons) why the specification is not satisfied.

The ISpecification<T> interface has two methods defined, the traditional IsSatisfiedBy as well as WhyIsNotSatisfiedBy, which returns an empty enumerable if the specification was indeed satisfied.

public interface ISpecification<in T>
{
    bool IsSatisfiedBy(T obj);

    IEnumerable<string> WhyIsNotSatisfiedBy(T obj);
}

Specifications really become powerful when they are combined. EventFlow also ships with a series of extension methods for the ISpecification<T> interface that allows easy combination of implemented specifications.

// Throws a `DomainError` exception if obj doesn't satisfy the specification
spec.ThrowDomainErrorIfNotStatisfied(obj);

// Builds a new specification that requires all input specifications to be
// satified
var allSpec = specEnumerable.All();

// Builds a new specification that requires a predefined amount of the
// input specifications to be satisfied
var atLeastSpec = specEnumerable.AtLeast(4);

// Builds a new specification that requires the two input specifications
// to be satisfied
var andSpec = spec1.And(spec2);

// Builds a new specification that requires one of the two input
// specifications to be satisfied
var orSpec = spec1.Or(spec2);

// Builds a new specification that requires the input specification
// not to be satisfied
var notSpec = spec.Not();

If you need a simple expression to combine with other more complex specifications you can use the bundled ExpressionSpecification<T>, which is a specification wrapper for an expression.

var spec = new ExpressionSpecification<int>(i => 1 < i && i < 3);

// 'str' will contain the value "i => ((1 < i) && (i < 3))"
var str = spec.ToString();

If the specification isn’t satisfied, a string representation of the expression is returned.

FAQ - frequently asked questions

How can I ensure that only specific users can execute commands?

You should implement a decorator for the ICommandBus that does the authentication. Have a look at the decorator documentation to see how this can be achieved.

Why isn’t there a “global sequence number” on domain events?

While this is easy to support in some event stores like MSSQL, it doesn’t really make sense from a domain perspective. Greg Young also has this to say on the subject:

Order is only assured per a handler within an aggregate root boundary. There is no assurance of order between handlers or between aggregates. Trying to provide those things leads to the dark side. > Greg Young

Why doesn’t EventFlow have a unit of work concept?

Short answer, you shouldn’t need it. But Mike has a way better answer:

In the Domain, everything flows in one direction: forward. When something bad happens, a correction is applied. The Domain doesn’t care about the database and UoW is very coupled to the db. In my opinion, it’s a pattern which is usable only with data access objects, and in probably 99% of the cases you won’t be needing it. As with the Singleton, there are better ways but everything depends on proper domain design. > Mike Mogosanu

If your case falls within the 1% case, write a decorator for the ICommandBus that starts a transaction, use MSSQL as event store and make sure your read models are stored in MSSQL as well.

Why are subscribers receiving events out of order?

It might be that your aggregates are emitting multiple events. Read about subscribers and out of order events.

Indices and tables