Event Sourcing in Python¶
A library for event sourcing in Python.
Overview¶
What is event sourcing? One definition suggests the state of an event sourced application is determined by a sequence of events. Another definition has event sourcing as a persistence mechanism for domain driven design. In any case, it is common for the state of a software application to be distributed or partitioned across a set of entities or aggregates in a domain model.
Therefore, this library provides mechanisms useful in event sourced applications: a style for coding entity behaviours that emit events; and a way for the events of an entity to be stored and replayed to obtain the entities on demand.
This documentation provides: instructions for installing the package, highlights the main features of the library, describes the design of the software, the infrastructure layer, the domain model layer, the application layer, and has some background information about the project.
This project is hosted on GitHub. Please register any issues, questions, and requests on GitHub.
Background¶
Although the event sourcing patterns are each quite simple, and they can be reproduced in code for each project, they do suggest cohesive mechanisms, for example applying and publishing the events generated within domain entities, storing and retrieving selections of the events in a highly scalable manner, replaying the stored events for a particular entity to obtain the current state, and projecting views of the event stream that are persisted in other models.
Therefore, quoting from Eric Evans’ book about domain-driven design:
“Partition a conceptually COHESIVE MECHANISM into a separate lightweight framework. Particularly watch for formalisms for well-documented categories of algorithms. Expose the capabilities of the framework with an INTENTION-REVEALING INTERFACE. Now the other elements of the domain can focus on expressing the problem (‘what’), delegating the intricacies of the solution (‘how’) to the framework.”
Inspiration:
- Martin Fowler’s article on event sourcing
- Greg Young’s discussions about event sourcing, and EventStore system
- Robert Smallshire’s brilliant example on Bitbucket
- Various professional projects that called for this approach, for which I didn’t want to rewrite the same things each time
See also:
- Evaluation of using NoSQL databases in an event sourcing system by Johan Rothsberg
- Object-relational impedance mismatch page on Wikipedia
- An introduction to event storming by a Steven Lowe, principal consultant developer at ThoughtWorks.
Quick start¶
This section shows how to make a simple event sourced application using classes from the library. It shows the general story, which is elaborated over the following pages.
Please use pip to install the library with the ‘sqlalchemy’ option.
$ pip install eventsourcing[sqlalchemy]
Define model¶
Define a domain model aggregate.
The class World
defined below is a subclass of
AggregateRoot
.
The World
has a property called history
. It also has an event sourced
attribute called ruler
.
It has a command method called make_it_so
which triggers a domain event
of type SomethingHappened
which is defined as a nested class.
The domain event class SomethingHappened
has a mutate()
method,
which happens to append triggered events to the history.
from eventsourcing.domain.model.aggregate import AggregateRoot
from eventsourcing.domain.model.decorators import attribute
class World(AggregateRoot):
def __init__(self, ruler=None, **kwargs):
super(World, self).__init__(**kwargs)
self._history = []
self._ruler = ruler
@property
def history(self):
return tuple(self._history)
@attribute
def ruler(self):
"""A mutable event-sourced attribute."""
def make_it_so(self, something):
self.__trigger_event__(World.SomethingHappened, what=something)
class SomethingHappened(AggregateRoot.Event):
def mutate(self, obj):
obj._history.append(self)
This class can be used and completely tested without any infrastructure.
Although every aggregate is a “little world”, developing a more realistic domain model would involve defining attributes, command methods, and domain events particular to a concrete domain.
Basically, you can understand everything if you understand that command methods,
such as make_it_so()
in the example above, should not update the
state of the aggregate directly with the results of their work, but instead
trigger events using domain event classes which have mutate()
methods
that can update the state of the aggregate using the values given to the
event when it was triggered. By refactoring the updating of the aggregate
state, from the command method, to a domain event object class, triggered
events can be stored and replayed to obtain persistent aggregates.
Configure environment¶
Generate cipher key (optional).
from eventsourcing.utils.random import encode_random_bytes
# Keep this safe.
cipher_key = encode_random_bytes(num_bytes=32)
Configure environment variables.
import os
# Optional cipher key (random bytes encoded with Base64).
os.environ['CIPHER_KEY'] = cipher_key
# SQLAlchemy-style database connection string.
os.environ['DB_URI'] = 'sqlite:///:memory:'
Run application¶
With the SimpleApplication
from the library, you can create,
read, update, and delete World
aggregates that are persisted
in the database identified above.
The code below demonstrates many of the features of the library, such as optimistic concurrency control, data integrity, and application-level encryption.
from eventsourcing.application.simple import SimpleApplication
from eventsourcing.exceptions import ConcurrencyError
# Construct simple application (used here as a context manager).
with SimpleApplication() as app:
# Call library factory method.
world = World.__create__(ruler='gods')
# Execute commands.
world.make_it_so('dinosaurs')
world.make_it_so('trucks')
version = world.__version__ # note version at this stage
world.make_it_so('internet')
# Assign to event-sourced attribute.
world.ruler = 'money'
# View current state of aggregate.
assert world.ruler == 'money'
assert world.history[2].what == 'internet'
assert world.history[1].what == 'trucks'
assert world.history[0].what == 'dinosaurs'
# Publish pending events (to persistence subscriber).
world.__save__()
# Retrieve aggregate (replay stored events).
copy = app.repository[world.id]
assert isinstance(copy, World)
# View retrieved state.
assert copy.ruler == 'money'
assert copy.history[2].what == 'internet'
assert copy.history[1].what == 'trucks'
assert copy.history[0].what == 'dinosaurs'
# Verify retrieved state (cryptographically).
assert copy.__head__ == world.__head__
# Discard aggregate.
world.__discard__()
# Repository raises key error (when aggregate not found).
assert world.id not in app.repository
try:
app.repository[world.id]
except KeyError:
pass
else:
raise Exception("Shouldn't get here")
# Get historical state (at version from above).
old = app.repository.get_entity(world.id, at=version)
assert old.history[-1].what == 'trucks' # internet not happened
assert len(old.history) == 2
assert old.ruler == 'gods'
# Optimistic concurrency control (no branches).
old.make_it_so('future')
try:
old.__save__()
except ConcurrencyError:
pass
else:
raise Exception("Shouldn't get here")
# Check domain event data integrity (happens also during replay).
events = app.event_store.get_domain_events(world.id)
last_hash = ''
for event in events:
event.__check_hash__()
assert event.__previous_hash__ == last_hash
last_hash = event.__event_hash__
# Verify sequence of events (cryptographically).
assert last_hash == world.__head__
# Check records are encrypted (values not visible in database).
active_record_strategy = app.event_store.active_record_strategy
items = active_record_strategy.get_items(world.id)
for item in items:
assert item.originator_id == world.id
assert 'dinosaurs' not in item.state
assert 'trucks' not in item.state
assert 'internet' not in item.state
Installation¶
Use pip to install the library from the Python Package Index.
$ pip install eventsourcing
If you want to use SQLAlchemy, then install the library with the ‘sqlalchemy’ option. Also install your chosen database driver.
$ pip install eventsourcing[sqlalchemy]
$ pip install psycopg2
Similarly, if you want to use Apache Cassandra, then please install with the ‘cassandra’ option.
$ pip install eventsourcing[cassandra]
Running the install command with again different options will just install the extra dependencies associated with that option. If you installed without any options, you can easily install optional dependencies later by running the install command again with the options you want.
Features¶
Event store — appends and retrieves domain events. Uses a sequenced item mapper with an active record strategy to map domain events to databases in ways that can be easily extended and replaced.
Data integrity - stored events can be hashed to check data integrity of individual records, so you cannot lose information in transit or get database corruption without being able to detect it. Sequences of events can be hash-chained, and the entire sequence of events checked for integrity, so if the last hash can be independently validated, then so can the entire sequence.
Optimistic concurrency control — can be used to ensure a distributed or horizontally scaled application doesn’t become inconsistent due to concurrent method execution. Leverages any optimistic concurrency controls in the database adapted by the active record strategy.
Application-level encryption — encrypts and decrypts stored events, using a cipher strategy passed as an option to the sequenced item mapper. Can be used to encrypt some events, or all events, or not applied at all (the default).
Snapshotting — avoids replaying an entire event stream to obtain the state of an entity. A snapshot strategy is included which reuses the capabilities of this library by implementing snapshots as events.
Abstract base classes — suggest how to structure an event sourced application. The library has base classes for application objects, domain entities, entity repositories, domain events of various types, mapping strategies, snapshotting strategies, cipher strategies, etc. They are well factored, relatively simple, and can be easily extended for your own purposes. If you wanted to create a domain model that is entirely stand-alone (recommended by purists for maximum longevity), you might start by replicating the library classes.
Worked examples — a simple example application, with an example entity class, example domain events, and an example database table. Plus lots of examples in the documentation.
Design¶
The design of the library follows the layered architecture: interfaces, application, domain, and infrastructure.
The infrastructure layer encapsulates infrastructural services required by an event sourced application, in particular an event store.
The domain layer contains independent domain model classes. Nothing in the domain layer depends on anything in the infrastructure layer.
The application layer is responsible for binding domain and infrastructure, and has policies such as the persistence policy, which stores domain events whenever they are published by the model.
The example application has an example repository, from which example entities can be retrieved. It also has a factory method to create new example entities. Each repository has an event player, which all share an event store with the persistence policy. The persistence policy uses the event store to store domain events. Event players use the event store to retrieve the stored events, and the model mutator functions to project entities from sequences of events.
Functionality such as mapping events to a database, or snapshotting, is implemented as strategy objects, and injected into dependents by constructor parameter, making it easy to substitute custom classes for defaults.
The sequenced item persistence model allows domain events to be stored in wide variety of database services, and optionally makes use of any optimistic concurrency controls the database system may afford.

Infrastructure¶
The library’s infrastructure layer provides a cohesive mechanism for storing events as sequences of items.
The entire mechanism is encapsulated by the library’s
EventStore
class. The event store uses a “sequenced item mapper” and an
“active record strategy”. The sequenced item mapper and the
active record strategy share a common “sequenced item” type.
The sequenced item mapper can convert objects such as domain
events to sequenced items, and the active record strategy can
write sequenced items to a database.
Sequenced items¶
A sequenced item type provides a common persistence model across the components of the mechanism. The sequenced item type is normally declared as a namedtuple.
from collections import namedtuple
SequencedItem = namedtuple('SequencedItem', ['sequence_id', 'position', 'topic', 'data'])
The names of the fields are arbitrary. However, the first field of a sequenced item namedtuple represents the identity of a sequence to which an item belongs, the second field represents the position of the item in its sequence, the third field represents a topic to which the item pertains (dimension of concern), and the fourth field represents the data associated with the item.
SequencedItem namedtuple¶
The library provides a sequenced item namedtuple called
SequencedItem
.
from eventsourcing.infrastructure.sequenceditem import SequencedItem
The attributes of SequencedItem
are sequence_id
, position
, topic
, and data
.
The sequence_id
identifies the sequence in which the item belongs.
The position
identifies the position of the item in its sequence.
The topic
identifies the dimension of concern to which the item pertains.
The data
holds the values of the item, perhaps serialized to JSON, and optionally encrypted.
from uuid import uuid4
sequence1 = uuid4()
sequenced_item1 = SequencedItem(
sequence_id=sequence1,
position=0,
topic='eventsourcing.domain.model.events#DomainEvent',
data='{"foo":"bar"}',
)
assert sequenced_item1.sequence_id == sequence1
assert sequenced_item1.position == 0
assert sequenced_item1.topic == 'eventsourcing.domain.model.events#DomainEvent'
assert sequenced_item1.data == '{"foo":"bar"}'
StoredEvent namedtuple¶
As an alternative, the library also provides a sequenced item namedtuple called StoredEvent
. The attributes of the
StoredEvent
namedtuple are originator_id
, originator_version
, event_type
, and state
.
The originator_id
is the ID of the aggregate that published the event, and is equivalent to sequence_id
above.
The originator_version
is the version of the aggregate that published the event, and is equivalent to
position
above.
The event_type
identifies the class of the domain event that is stored, and is equivalent to topic
above.
The state
holds the state of the domain event, and is equivalent to data
above.
from eventsourcing.infrastructure.sequenceditem import StoredEvent
aggregate1 = uuid4()
stored_event1 = StoredEvent(
originator_id=aggregate1,
originator_version=0,
event_type='eventsourcing.domain.model.events#DomainEvent',
state='{"foo":"bar"}',
)
assert stored_event1.originator_id == aggregate1
assert stored_event1.originator_version == 0
assert stored_event1.event_type == 'eventsourcing.domain.model.events#DomainEvent'
assert stored_event1.state == '{"foo":"bar"}'
Active record strategies¶
An active record strategy writes sequenced items to database records.
The library has an abstract base class AbstractActiveRecordStrategy
with abstract methods append()
and
get_items()
, which can be used on concrete implementations to read and write sequenced items in a
database.
An active record strategy is constructed with a sequenced_item_class
and a matching
active_record_class
. The field names of a suitable active record class will match the field names of the
sequenced item namedtuple.
SQLAlchemy¶
To run the examples below, please install the library with the ‘sqlalchemy’ option.
$ pip install eventsourcing[sqlalchemy]
The library has a concrete active record strategy for SQLAlchemy provided by the object class
SQLAlchemyActiveRecordStrategy
.
from eventsourcing.infrastructure.sqlalchemy.activerecords import SQLAlchemyActiveRecordStrategy
The library also provides active record classes for SQLAlchemy, such as IntegerSequencedItemRecord
and
StoredEventRecord
. The IntegerSequencedItemRecord
class matches the default SequencedItem
namedtuple. The StoredEventRecord
class matches the alternative StoredEvent
namedtuple.
The code below uses the namedtuple StoredEvent
and the active record StoredEventRecord
.
from eventsourcing.infrastructure.sqlalchemy.activerecords import StoredEventRecord
Database settings can be configured using SQLAlchemySettings
, which is constructed with a uri
connection
string. The code below uses an in-memory SQLite database.
from eventsourcing.infrastructure.sqlalchemy.datastore import SQLAlchemySettings
settings = SQLAlchemySettings(uri='sqlite:///:memory:')
To help setup a database connection and tables, the library has object class SQLAlchemyDatastore
.
The SQLAlchemyDatastore
is constructed with the settings
object,
and a tuple of active record classes passed using the tables
arg.
from eventsourcing.infrastructure.sqlalchemy.datastore import SQLAlchemyDatastore
datastore = SQLAlchemyDatastore(
settings=settings,
tables=(StoredEventRecord,)
)
Please note, if you have declared your own SQLAlchemy model Base
class, you may wish to define your own active
record classes which inherit from your Base
class. If so, if may help to refer to the library active record
classes to see how SQLALchemy ORM columns and indexes can be used to persist sequenced items.
The methods setup_connection()
and setup_tables()
of the datastore object
can be used to setup the database connection and the tables.
datastore.setup_connection()
datastore.setup_tables()
As well as sequenced_item_class
and a matching active_record_class
, the SQLAlchemyActiveRecordStrategy
requires a scoped session object, passed using the constructor arg session
. For convenience, the
SQLAlchemyDatabase
has a thread-scoped session facade set as its a session
attribute. You may
wish to use a different scoped session facade, such as a request-scoped session object provided by a Web
framework.
active_record_strategy = SQLAlchemyActiveRecordStrategy(
sequenced_item_class=StoredEvent,
active_record_class=StoredEventRecord,
session=datastore.session,
)
Sequenced items (or “stored events” in this example) can be appended to the database using the append()
method
of the active record strategy.
active_record_strategy.append(stored_event1)
(Please note, since the position is given by the sequenced item itself, the word “append” means here “to add something extra” rather than the perhaps more common but stricter meaning “to add to the end of a document”. That is, the database is deliberately not responsible for positioning a new item at the end of a sequence. So perhaps “save” would be a better name for this operation.)
All the previously appended items of a sequence can be retrieved by using the get_items()
method.
results = active_record_strategy.list_items(aggregate1)
Since by now only one item was stored, so there is only one item in the results.
assert len(results) == 1
assert results[0] == stored_event1
MySQL¶
For MySQL, the Python package mysqlclient can be used.
$ pip install mysqlclient
The uri
for MySQL would look something like this.
mysql://username:password@localhost/eventsourcing
PostgreSQL¶
For PostgreSQL, the Python package psycopg2 can be used.
$ pip install psycopg2
The uri
for PostgreSQL would look something like this.
postgresql://username:password@localhost:5432/eventsourcing
Apache Cassandra¶
To run the examples below, please install the library with the ‘cassandra’ option.
$ pip install eventsourcing[cassandra]
The library also has a concrete active record strategy for Apache Cassandra provided by
CassandraActiveRecordStrategy
class.
Similarly, for the CassandraActiveRecordStrategy
, the IntegerSequencedItemRecord
from eventsourcing.infrastructure.cassandra.activerecords
matches the SequencedItem
namedtuple. The StoredEventRecord
from the same module matches the StoredEvent
namedtuple.
from eventsourcing.infrastructure.cassandra.datastore import CassandraDatastore, CassandraSettings
from eventsourcing.infrastructure.cassandra.activerecords import CassandraActiveRecordStrategy, StoredEventRecord
cassandra_datastore = CassandraDatastore(
settings=CassandraSettings(),
tables=(StoredEventRecord,)
)
cassandra_datastore.setup_connection()
cassandra_datastore.setup_tables()
cassandra_active_record_strategy = CassandraActiveRecordStrategy(
active_record_class=StoredEventRecord,
sequenced_item_class=StoredEvent,
)
results = cassandra_active_record_strategy.get_items(aggregate1)
assert len(results) == 0
cassandra_active_record_strategy.append(stored_event1)
results = cassandra_active_record_strategy.get_items(aggregate1)
assert results[0] == stored_event1
cassandra_datastore.drop_tables()
cassandra_datastore.close_connection()
The CassandraDatastore
and CassandraSettings
are be used in the same was as
SQLAlchemyDatastore
and SQLAlchemySettings
above. Please investigate
library class CassandraSettings
for information about configuring away from default settings.
Sequenced item conflicts¶
It is a feature of the active record strategy that it isn’t possible successfully to append two items at the same
position in the same sequence. If such an attempt is made, a SequencedItemConflict
will be raised by the active
record strategy.
from eventsourcing.exceptions import SequencedItemConflict
# Fail to append an item at the same position in the same sequence as a previous item.
try:
active_record_strategy.append(stored_event1)
except SequencedItemConflict:
pass
else:
raise Exception("SequencedItemConflict not raised")
This feature is implemented using optimistic concurrency control features of the underlying database. With SQLAlchemy, the primary key constraint involves both the sequence and the position columns. With Cassandra the position is the primary key in the sequence partition, and the “IF NOT EXISTS” feature is applied.
The Cassandra database management system, which implements the Paxos protocol, can accomplish linearly-scalable distributed optimistic concurrency control, guaranteeing sequential consistency of the events of an entity despite the database being distributed. It is also possible to serialize calls to the methods of an entity, but that is out of the scope of this package — if you wish to do that, perhaps something like Zookeeper might help.
Sequenced item mapper¶
A sequenced item mapper is used by the event store to map between sequenced item namedtuple objects and application-level objects such as domain events.
The library provides a sequenced item mapper object class called SequencedItemMapper
.
from eventsourcing.infrastructure.sequenceditemmapper import SequencedItemMapper
The SequencedItemMapper
has a constructor arg sequenced_item_class
, which defaults to the library’s
sequenced item namedtuple SequencedItem
.
sequenced_item_mapper = SequencedItemMapper()
The method from_sequenced_item()
can be used to convert sequenced item objects to application-level objects.
domain_event = sequenced_item_mapper.from_sequenced_item(sequenced_item1)
assert domain_event.foo == 'bar'
The method to_sequenced_item()
can be used to convert application-level objects to sequenced item namedtuples.
assert sequenced_item_mapper.to_sequenced_item(domain_event).data == sequenced_item1.data
If the names of the first two fields of the sequenced item namedtuple (e.g. sequence_id
and position
) do not
match the names of the attributes of the application-level object which identify a sequence and a position (e.g.
originator_id
and originator_version
) then the attribute names can be given to the sequenced item mapper
using constructor args sequence_id_attr_name
and position_attr_name
.
from eventsourcing.domain.model.events import DomainEvent
domain_event1 = DomainEvent(
originator_id=aggregate1,
originator_version=1,
foo='baz',
)
sequenced_item_mapper = SequencedItemMapper(
sequence_id_attr_name='originator_id',
position_attr_name='originator_version'
)
assert domain_event1.foo == 'baz'
assert sequenced_item_mapper.to_sequenced_item(domain_event1).sequence_id == aggregate1
Alternatively, the constructor arg sequenced_item_class
can be set with a sequenced item namedtuple type that is
different from the default SequencedItem
namedtuple, such as the library’s StoredEvent
namedtuple.
sequenced_item_mapper = SequencedItemMapper(
sequenced_item_class=StoredEvent
)
domain_event1 = sequenced_item_mapper.from_sequenced_item(stored_event1)
assert domain_event1.foo == 'bar', domain_event1
Since the alternative StoredEvent
namedtuple can be used instead of the default
SequencedItem
namedtuple, so it is possible to use a custom namedtuple.
Which alternative you use for your project depends on your preferences for the names
in the your domain events and your persistence model.
Please note, it is required of these application-level objects that the “topic” generated by
get_topic()
from the object class is resolved by resolve_topic()
back to the same object class.
from eventsourcing.domain.model.events import Created
from eventsourcing.utils.topic import get_topic, resolve_topic
topic = get_topic(Created)
assert resolve_topic(topic) == Created
assert topic == 'eventsourcing.domain.model.events#Created'
Custom JSON transcoding¶
The SequencedItemMapper
can be constructed with optional args json_encoder_class
and
json_decoder_class
. The defaults are the library’s ObjectJSONEncoder
and
ObjectJSONDecoder
which can be extended to support types of value objects that are not
currently supported by the library.
The code below extends the JSON transcoding to support sets.
from eventsourcing.utils.transcoding import ObjectJSONEncoder, ObjectJSONDecoder
class CustomObjectJSONEncoder(ObjectJSONEncoder):
def default(self, obj):
if isinstance(obj, set):
return {'__set__': list(obj)}
else:
return super(CustomObjectJSONEncoder, self).default(obj)
class CustomObjectJSONDecoder(ObjectJSONDecoder):
@classmethod
def from_jsonable(cls, d):
if '__set__' in d:
return cls._decode_set(d)
else:
return ObjectJSONDecoder.from_jsonable(d)
@staticmethod
def _decode_set(d):
return set(d['__set__'])
customized_sequenced_item_mapper = SequencedItemMapper(
json_encoder_class=CustomObjectJSONEncoder,
json_decoder_class=CustomObjectJSONDecoder
)
domain_event = customized_sequenced_item_mapper.from_sequenced_item(
SequencedItem(
sequence_id=sequence1,
position=0,
topic='eventsourcing.domain.model.events#DomainEvent',
data='{"foo":{"__set__":["bar","baz"]}}',
)
)
assert domain_event.foo == set(["bar", "baz"])
sequenced_item = customized_sequenced_item_mapper.to_sequenced_item(domain_event)
assert sequenced_item.data.startswith('{"foo":{"__set__":["ba')
Application-level encryption¶
The SequencedItemMapper
can be constructed with a symmetric cipher. If
a cipher is given, then the state
field of every sequenced item will be
encrypted before being sent to the database. The data retrieved from the
database will be decrypted and verified, which protects against tampering.
The library provides an AES cipher object class called AESCipher
. It
uses the AES cipher from the Python Cryptography Toolkit, as forked by
the actively maintained PyCryptodome project.
The AESCipher
class uses AES in GCM mode, which is a padding-less,
authenticated encryption mode. Unlike CBC, GCM doesn’t need padding so
avoids potential padding oracle attacks. GCM will be faster than EAX
on x86 architectures, especially those with AES opcodes. The other AES
modes aren’t supported by this class, at the moment.
The AESCipher
constructor arg cipher_key
is required. The key must
be either 16, 24, or 32 random bytes (128, 192, or 256 bits). Longer keys
take more time to encrypt plaintext, but produce more secure ciphertext.
Generating and storing a secure key requires functionality beyond the scope of this library.
However, the utils package does contain a function encode_random_bytes()
that may help
to generate a unicode key string, representing random bytes encoded with Base64. A companion
function decode_random_bytes()
decodes the unicode key string into a sequence of bytes.
from eventsourcing.utils.cipher.aes import AESCipher
from eventsourcing.utils.random import encode_random_bytes, decode_random_bytes
# Unicode string representing 256 random bits encoded with Base64.
cipher_key = encode_random_bytes(num_bytes=32)
# Construct AES-256 cipher.
cipher = AESCipher(cipher_key=decode_random_bytes(cipher_key))
# Encrypt some plaintext (using nonce arguments).
ciphertext = cipher.encrypt('plaintext')
assert ciphertext != 'plaintext'
# Decrypt some ciphertext.
plaintext = cipher.decrypt(ciphertext)
assert plaintext == 'plaintext'
The SequencedItemMapper
has constructor arg cipher
, which can
be used to pass in a cipher object, and thereby enable encryption.
# Construct sequenced item mapper to always encrypt domain events.
ciphered_sequenced_item_mapper = SequencedItemMapper(
sequenced_item_class=StoredEvent,
cipher=cipher,
)
# Domain event attribute ``foo`` has value ``'bar'``.
assert domain_event1.foo == 'bar'
# Map the domain event to an encrypted stored event namedtuple.
stored_event = ciphered_sequenced_item_mapper.to_sequenced_item(domain_event1)
# Attribute names and values of the domain event are not visible in the encrypted ``state`` field.
assert 'foo' not in stored_event.state
assert 'bar' not in stored_event.state
# Recover the domain event from the encrypted state.
domain_event = ciphered_sequenced_item_mapper.from_sequenced_item(stored_event)
# Domain event has decrypted attributes.
assert domain_event.foo == 'bar'
Please note, the sequence ID and position values are not encrypted, necessarily. However, by encrypting the state of the item within the application, potentially sensitive information, for example personally identifiable information, will be encrypted in transit to the database, at rest in the database, and in all backups and other copies.
Event store¶
The library’s EventStore
provides an interface to the library’s cohesive mechanism for storing events as sequences
of items, and can be used directly within an event sourced application to append and retrieve its domain events.
The EventStore
is constructed with a sequenced item mapper and an
active record strategy, both are discussed in detail in the sections above.
from eventsourcing.infrastructure.eventstore import EventStore
event_store = EventStore(
sequenced_item_mapper=sequenced_item_mapper,
active_record_strategy=active_record_strategy,
)
The event store’s append()
method can append a domain event to its sequence. The event store uses the
sequenced_item_mapper
to obtain a sequenced item namedtuple from a domain events, and it uses the
active_record_strategy
to write a sequenced item to a database.
In the code below, a DomainEvent
is appended to sequence aggregate1
at position 1
.
event_store.append(
DomainEvent(
originator_id=aggregate1,
originator_version=1,
foo='baz',
)
)
The event store’s method get_domain_events()
is used to retrieve events that have previously been appended.
The event store uses the active_record_strategy
to read the sequenced items from a database, and it
uses the sequenced_item_mapper
to obtain domain events from the sequenced items.
results = event_store.get_domain_events(aggregate1)
Since by now two domain events have been stored, so there are two domain events in the results.
assert len(results) == 2
assert results[0].foo == 'bar'
assert results[1].foo == 'baz'
The optional arguments of get_domain_events()
can be used to select some of the items in the sequence.
The lt
arg is used to select items below the given position in the sequence.
The lte
arg is used to select items below and at the given position in the sequence.
The gte
arg is used to select items at and above the given position in the sequence.
The gt
arg is used to select items above the given position in the sequence.
The limit
arg is used to limit the number of items selected from the sequence.
The is_ascending
arg is used when selecting items. It affects how any limit
is applied, and determines the
order of the results. Hence, it can affect both the content of the results and the performance of the method.
# Get events below and at position 0.
result = event_store.get_domain_events(aggregate1, lte=0)
assert len(result) == 1, result
assert result[0].foo == 'bar'
# Get events at and above position 1.
result = event_store.get_domain_events(aggregate1, gte=1)
assert len(result) == 1, result
assert result[0].foo == 'baz'
# Get the first event in the sequence.
result = event_store.get_domain_events(aggregate1, limit=1)
assert len(result) == 1, result
assert result[0].foo == 'bar'
# Get the last event in the sequence.
result = event_store.get_domain_events(aggregate1, limit=1, is_ascending=False)
assert len(result) == 1, result
assert result[0].foo == 'baz'
Optimistic concurrency control¶
It is a feature of the event store that it isn’t possible successfully to append two events at the same position in
the same sequence. This condition is coded as a ConcurrencyError
since a correct program running in a
single thread wouldn’t attempt to append an event that it had already successfully appended.
from eventsourcing.exceptions import ConcurrencyError
# Fail to append an event at the same position in the same sequence as a previous event.
try:
event_store.append(
DomainEvent(
originator_id=aggregate1,
originator_version=1,
foo='baz',
)
)
except ConcurrencyError:
pass
else:
raise Exception("ConcurrencyError not raised")
This feature depends on the behaviour of the active record strategy’s append()
method: the event store will
raise a ConcurrencyError
if a SequencedItemConflict
is raised by its active record strategy.
If a command fails due to a concurrency error, the command can be retried with the lastest state. The @retry
decorator can help code retries on commands.
from eventsourcing.domain.model.decorators import retry
errors = []
@retry(ConcurrencyError, max_retries=5)
def set_password():
exc = ConcurrencyError()
errors.append(exc)
raise exc
try:
set_password()
except ConcurrencyError:
pass
else:
raise Exception("Shouldn't get here")
assert len(errors) == 5
Event store factory¶
As a convenience, the library function construct_sqlalchemy_eventstore()
can be used to construct an event store that uses the SQLAlchemy classes.
from eventsourcing.infrastructure.sqlalchemy import factory
event_store = factory.construct_sqlalchemy_eventstore(session=datastore.session)
By default, the event store is constructed with the StoredEvent
sequenced item namedtuple,
and the active record class StoredEventRecord
. The optional args sequenced_item_class
and active_record_class
can be used to construct different kinds of event store.
Timestamped event store¶
The examples so far have used an integer sequenced event store, where the items are sequenced by integer version.
The example below constructs an event store for timestamp-sequenced domain events, using the library active
record class TimestampedSequencedItemRecord
.
from uuid import uuid4
from eventsourcing.infrastructure.sqlalchemy.activerecords import TimestampSequencedItemRecord
from eventsourcing.utils.times import decimaltimestamp
# Setup database table for timestamped sequenced items.
datastore.setup_table(TimestampSequencedItemRecord)
# Construct event store for timestamp sequenced events.
timestamped_event_store = factory.construct_sqlalchemy_eventstore(
sequenced_item_class=SequencedItem,
active_record_class=TimestampSequencedItemRecord,
sequence_id_attr_name='originator_id',
position_attr_name='timestamp',
session=datastore.session,
)
# Construct an event.
aggregate_id = uuid4()
event = DomainEvent(
originator_id=aggregate_id,
timestamp=decimaltimestamp(),
)
# Store the event.
timestamped_event_store.append(event)
# Check the event was stored.
events = timestamped_event_store.get_domain_events(aggregate_id)
assert len(events) == 1
assert events[0].originator_id == aggregate_id
assert events[0].timestamp < decimaltimestamp()
Please note, optimistic concurrent control doesn’t work to maintain entity consistency, because each event is likely to have a unique timestamp, and so conflicts are very unlikely to arise when concurrent operations appending to the same sequence. For this reason, although domain events can be timestamped, it is not a very good idea to store the events of an entity or aggregate as timestamp-sequenced items. Timestamp-sequenced items are useful for storing events that are logically independent of others, such as messages in a log, things that do not risk causing a consistency error due to concurrent operations.
Domain model¶
The library’s domain layer has base classes for domain events and entities. These classes show how to write a domain model that uses the library’s event sourcing infrastructure. They can also be used to develop an event-sourced application as a domain driven design.
Domain events¶
The purpose of a domain event is to be published when something happens, normally the results from the
work of a command. The library has a base class for domain events called DomainEvent
.
Domain events can be freely constructed from the DomainEvent
class. Attributes are
set directly from the constructor keyword arguments.
from eventsourcing.domain.model.events import DomainEvent
domain_event = DomainEvent(a=1)
assert domain_event.a == 1
The attributes of domain events are read-only. New values cannot be assigned to existing objects. Domain events are immutable in that sense.
# Fail to set attribute of already-existing domain event.
try:
domain_event.a = 2
except AttributeError:
pass
else:
raise Exception("Shouldn't get here")
Domain events can be compared for equality as value objects, instances are equal if they have the same type and the same attributes.
DomainEvent(a=1) == DomainEvent(a=1)
DomainEvent(a=1) != DomainEvent(a=2)
DomainEvent(a=1) != DomainEvent(b=1)
Publish-subscribe¶
Domain events can be published, using the library’s publish-subscribe mechanism.
The publish()
function is used to publish events. The event
arg is required.
from eventsourcing.domain.model.events import publish
publish(event=domain_event)
The subscribe()
function is used to subscribe a handler
that will receive events.
The optional predicate
arg can be used to provide a function that will decide whether
or not the subscribed handler will actually be called when an event is published.
from eventsourcing.domain.model.events import subscribe
received_events = []
def receive_event(event):
received_events.append(event)
def is_domain_event(event):
return isinstance(event, DomainEvent)
subscribe(handler=receive_event, predicate=is_domain_event)
# Publish the domain event.
publish(domain_event)
assert len(received_events) == 1
assert received_events[0] == domain_event
The unsubscribe()
function can be used to stop the handler receiving further events.
from eventsourcing.domain.model.events import unsubscribe
unsubscribe(handler=receive_event, predicate=is_domain_event)
# Clean up.
del received_events[:] # received_events.clear()
Event library¶
The library has a small collection of domain event subclasses, such as EventWithOriginatorID
,
EventWithOriginatorVersion
, EventWithTimestamp
, EventWithTimeuuid
, Created
, AttributeChanged
,
Discarded
.
Some of these classes provide useful defaults for particular attributes, such as a timestamp
.
Timestamps can be used to sequence events.
from eventsourcing.domain.model.events import EventWithTimestamp
from eventsourcing.domain.model.events import EventWithTimeuuid
from decimal import Decimal
from uuid import UUID
# Automatic timestamp.
assert isinstance(EventWithTimestamp().timestamp, Decimal)
# Automatic UUIDv1.
assert isinstance(EventWithTimeuuid().event_id, UUID)
Some classes require particular arguments when constructed. The originator_id
can be used
to identify a sequence to which an event belongs. The originator_version
can be used to
position the event in a sequence.
from eventsourcing.domain.model.events import EventWithOriginatorVersion
from eventsourcing.domain.model.events import EventWithOriginatorID
from uuid import uuid4
# Requires originator_id.
EventWithOriginatorID(originator_id=uuid4())
# Requires originator_version.
EventWithOriginatorVersion(originator_version=0)
Some are just useful for their distinct type, for example in subscription predicates.
from eventsourcing.domain.model.events import Created, AttributeChanged, Discarded
def is_created(event):
return isinstance(event, Created)
def is_attribute_changed(event):
return isinstance(event, AttributeChanged)
def is_discarded(event):
return isinstance(event, Discarded)
assert is_created(Created()) is True
assert is_created(Discarded()) is False
assert is_created(DomainEvent()) is False
assert is_discarded(Created()) is False
assert is_discarded(Discarded()) is True
assert is_discarded(DomainEvent()) is False
assert is_domain_event(Created()) is True
assert is_domain_event(Discarded()) is True
assert is_domain_event(DomainEvent()) is True
Custom events¶
Custom domain events can be coded by subclassing the library’s domain event classes.
Domain events are normally named using the past participle of a common verb, for example a regular past participle such as “started”, “paused”, “stopped”, or an irregular past participle such as “chosen”, “done”, “found”, “paid”, “quit”, “seen”.
class SomethingHappened(DomainEvent):
"""
Published whenever something happens.
"""
It is possible to code domain events as inner or nested classes.
class Job(object):
class Seen(EventWithTimestamp):
"""
Published when the job is seen.
"""
class Done(EventWithTimestamp):
"""
Published when the job is done.
"""
Inner or nested classes can be used, and are used in the library, to define the domain events of a domain entity on the entity class itself.
seen = Job.Seen(job_id='#1')
done = Job.Done(job_id='#1')
assert done.timestamp > seen.timestamp
So long as the entity event classes inherit ultimately from library class
QualnameABC
, which DomainEvent
does, the utility functions get_topic()
and resolve_topic()
can work with domain events defined as inner or nested
classes in all versions of Python. These functions are used in the DomainEntity.Created
event class, and in the infrastructure class SequencedItemMapper
. The requirement
to inherit from QualnameABC
actually only applies when using nested classes in Python 2.7
with the utility functions get_topic()
and resolve_topic()
. Events classes that
are not nested, or that will not be run with Python 2.7, do not need to
inherit from QualnameABC
in order to work with these two functions (and
hence the library domain and infrastructure classes which use those functions).
Domain entities¶
A domain entity is an object that is not defined by its attributes, but rather by a thread of continuity and its identity. The attributes of a domain entity can change, directly by assignment, or indirectly by calling a method of the object.
The library has a base class for domain entities called DomainEntity
, which has an id
attribute.
from eventsourcing.domain.model.entity import DomainEntity
entity_id = uuid4()
entity = DomainEntity(id=entity_id)
assert entity.id == entity_id
Entity library¶
The library also has a domain entity class called VersionedEntity
, which extends the DomainEntity
class
with a __version__
attribute.
from eventsourcing.domain.model.entity import VersionedEntity
entity = VersionedEntity(id=entity_id, __version__=1)
assert entity.id == entity_id
assert entity.__version__ == 1
The library also has a domain entity class called TimestampedEntity
, which extends the DomainEntity
class
with attributes __created_on__
and __last_modified__
.
from eventsourcing.domain.model.entity import TimestampedEntity
entity = TimestampedEntity(id=entity_id, __created_on__=123)
assert entity.id == entity_id
assert entity.__created_on__ == 123
assert entity.__last_modified__ == 123
There is also a TimestampedVersionedEntity
that has id
, __version__
, __created_on__
, and __last_modified__
attributes.
from eventsourcing.domain.model.entity import TimestampedVersionedEntity
entity = TimestampedVersionedEntity(id=entity_id, __version__=1, __created_on__=123)
assert entity.id == entity_id
assert entity.__created_on__ == 123
assert entity.__last_modified__ == 123
assert entity.__version__ == 1
A timestamped, versioned entity is both a timestamped entity and a versioned entity.
assert isinstance(entity, TimestampedEntity)
assert isinstance(entity, VersionedEntity)
Entity events¶
The library’s domain entity classes have domain events defined as inner classes: Event
, Created
,
AttributeChanged
, and Discarded
.
DomainEntity.Event
DomainEntity.Created
DomainEntity.AttributeChanged
DomainEntity.Discarded
The domain event class DomainEntity.Event
is a super type of the others. The others also inherit
from the library base classes Created
, AttributeChanged
, and Discarded
. All these domain
events classes are subclasses of DomainEvent
.
assert issubclass(DomainEntity.Created, DomainEntity.Event)
assert issubclass(DomainEntity.AttributeChanged, DomainEntity.Event)
assert issubclass(DomainEntity.Discarded, DomainEntity.Event)
assert issubclass(DomainEntity.Created, Created)
assert issubclass(DomainEntity.AttributeChanged, AttributeChanged)
assert issubclass(DomainEntity.Discarded, Discarded)
assert issubclass(DomainEntity.Event, DomainEvent)
These entity event classes can be freely constructed, with suitable arguments.
All events need an originator_id
. Events of versioned entities also
need an originator_version
. Events of timestamped entities generate
a current timestamp
value, unless one is given. Created
events
also need an originator_topic
. The other events need an __previous_hash__
.
AttributeChanged
events also need name
and value
.
All the events of DomainEntity
use SHA-256 to generate an event_hash
from the event attribute
values when constructed for the first time. Events can be chained together by constructing each
subsequent event to have its __previous_hash__
as the event_hash
of the previous event.
from eventsourcing.utils.topic import get_topic
entity_id = UUID('b81d160d-d7ef-45ab-a629-c7278082a845')
created = VersionedEntity.Created(
originator_version=0,
originator_id=entity_id,
originator_topic=get_topic(VersionedEntity)
)
attribute_a_changed = VersionedEntity.AttributeChanged(
name='a',
value=1,
originator_version=1,
originator_id=entity_id,
__previous_hash__=created.__event_hash__,
)
attribute_b_changed = VersionedEntity.AttributeChanged(
name='b',
value=2,
originator_version=2,
originator_id=entity_id,
__previous_hash__=attribute_a_changed.__event_hash__,
)
entity_discarded = VersionedEntity.Discarded(
originator_version=3,
originator_id=entity_id,
__previous_hash__=attribute_b_changed.__event_hash__,
)
The events have a mutate()
function, which can be used to mutate the
state of a given object appropriately.
For example, the DomainEntity.Created
event mutates to an
entity instance. The class that is instantiated is determined by the
originator_topic
attribute of the DomainEntity.Created
event.
A domain event’s __mutate__()
method normally requires an obj
argument, but
that is not required for DomainEntity.Created
events. The default
is None
, but if a value is provided it must be callable that
returns an object, such as a domain entity class. If a domain
entity class is provided, the originator_topic
will be ignored.
entity = created.__mutate__()
assert entity.id == entity_id
As another example, when a versioned entity is mutated by an event of the
VersionedEntity
class, the entity version number is set to the event
originator_version
.
assert entity.__version__ == 0
entity = attribute_a_changed.__mutate__(entity)
assert entity.__version__ == 1
assert entity.a == 1
entity = attribute_b_changed.__mutate__(entity)
assert entity.__version__ == 2
assert entity.b == 2
Similarly, when a timestamped entity is mutated by an event of the
TimestampedEntity
class, the __last_modified__
attribute of the
entity is set to have the event’s timestamp
value.
Factory method¶
The DomainEntity
has a class method create()
which can return
new entity objects. When called, it constructs the Created
event of the
concrete class with suitable arguments such as a unique ID, and a topic representing
the concrete entity class, and then it projects that event into an entity
object using the event’s __mutate__()
method. Then it publishes the
event, and then it returns the new entity to the caller. This technique
works correctly for subclasses of both the entity and the event class.
entity = DomainEntity.__create__()
assert entity.id
assert entity.__class__ is DomainEntity
entity = VersionedEntity.__create__()
assert entity.id
assert entity.__version__ == 0
assert entity.__class__ is VersionedEntity
entity = TimestampedEntity.__create__()
assert entity.id
assert entity.__created_on__
assert entity.__last_modified__
assert entity.__class__ is TimestampedEntity
entity = TimestampedVersionedEntity.__create__()
assert entity.id
assert entity.__created_on__
assert entity.__last_modified__
assert entity.__version__ == 0
assert entity.__class__ is TimestampedVersionedEntity
Triggering events¶
Commands methods will construct, apply, and publish events, using the results from working on command arguments. The events need to be constructed with suitable arguments.
To help trigger events in an extensible manner, the DomainEntity
class has a
method called __trigger_event__()
, that is extended by subclasses in the library,
which can be used in command methods to construct, apply, and publish events with
suitable arguments. The events’ __mutate__()
methods update the entity appropriately.
For example, triggering an AttributeChanged
event on a timestamped, versioned
entity will cause the attribute value to be updated, but it will also
cause the version number to increase, and it will update the last modified time.
entity = TimestampedVersionedEntity.__create__()
assert entity.__version__ == 0
assert entity.__created_on__ == entity.__last_modified__
# Trigger domain event.
entity.__trigger_event__(entity.AttributeChanged, name='c', value=3)
# Check the event was applied.
assert entity.c == 3
assert entity.__version__ == 1
assert entity.__last_modified__ > entity.__created_on__
The command method change_attribute()
triggers an
AttributeChanged
event. In the code below, the attribute full_name
is set to ‘Mr Boots’. A subscriber receives the event.
subscribe(handler=receive_event, predicate=is_domain_event)
assert len(received_events) == 0
entity = VersionedEntity.__create__(entity_id)
# Change an attribute.
entity.__change_attribute__(name='full_name', value='Mr Boots')
# Check the event was applied.
assert entity.full_name == 'Mr Boots'
# Check two events were published.
assert len(received_events) == 2
first_event = received_events[0]
assert first_event.__class__ == VersionedEntity.Created
assert first_event.originator_id == entity_id
assert first_event.originator_version == 0
last_event = received_events[1]
assert last_event.__class__ == VersionedEntity.AttributeChanged
assert last_event.name == 'full_name'
assert last_event.value == 'Mr Boots'
assert last_event.originator_version == 1
# Check the event hash is the current entity head.
assert last_event.__event_hash__ == entity.__head__
# Clean up.
unsubscribe(handler=receive_event, predicate=is_domain_event)
del received_events[:] # received_events.clear()
Data integrity¶
Domain events that are triggered in this way are hash-chained together by default.
The state of each event, including the hash of the last event, is hashed using
SHA-256. Before an event is applied to an entity, it is validated in itself (the
event hash represents the state of the event) and as a part of the chain
(the previous event hash is included in the next event state). If the sequence
of events is accidentally damaged in any way, then a DataIntegrityError
will
almost certainly be raised from the domain layer when the sequence is replayed.
The hash of the last event applied to an entity is available as an attribute called
__head__
.
# Entity's head hash is determined exclusively
# by the entire sequence of events and SHA-256.
assert entity.__head__ == 'ae7688000c38b2bd504b3eb3cd8e015144dd9a3c4992951c87cef9cce047f86c'
# Entity's head hash is simply the event hash
# of the last event that mutated the entity.
assert entity.__head__ == last_event.__event_hash__
A different sequence of events will almost certainly result a different head hash. So the entire history of an entity can be verified by checking the head hash. This feature could be used to protect against tampering.
The hashes can be salted by setting environment variable SALT_FOR_DATA_INTEGRITY
,
perhaps with random bytes encoded as Base64.
from eventsourcing.utils.random import encode_random_bytes
# Keep this safe.
salt = encode_random_bytes(num_bytes=32)
# Configure environment (before importing library).
import os
os.environ['SALT_FOR_DATA_INTEGRITY'] = salt
Discarding entities¶
The entity method discard()
can be used to discard the entity, by triggering
a Discarded
event, after which the entity is unavailable for further changes.
from eventsourcing.exceptions import EntityIsDiscarded
entity.__discard__()
# Fail to change an attribute after entity was discarded.
try:
entity.__change_attribute__('full_name', 'Mr Boots')
except EntityIsDiscarded:
pass
else:
raise Exception("Shouldn't get here")
Custom entities¶
The library entity classes can be subclassed.
class User(VersionedEntity):
def __init__(self, full_name, *args, **kwargs):
super(User, self).__init__(*args, **kwargs)
self.full_name = full_name
Subclasses can extend the entity base classes, by adding event-based properties and methods.
Custom attributes¶
The library’s @attribute
decorator provides a property getter and setter, which will triggers an
AttributeChanged
event when the property is assigned. Simple mutable attributes can be coded as
decorated functions without a body, such as the full_name
function of User
below.
from eventsourcing.domain.model.decorators import attribute
class User(VersionedEntity):
def __init__(self, full_name, *args, **kwargs):
super(User, self).__init__(*args, **kwargs)
self._full_name = full_name
@attribute
def full_name(self):
"""Full name of the user."""
In the code below, after the entity has been created, assigning to the full_name
attribute causes
the entity to be updated. An AttributeChanged
event is published. Both the Created
and
AttributeChanged
events are received by a subscriber.
assert len(received_events) == 0
subscribe(handler=receive_event, predicate=is_domain_event)
# Publish a Created event.
user = User.__create__(full_name='Mrs Boots')
# Publish an AttributeChanged event.
user.full_name = 'Mr Boots'
assert len(received_events) == 2
assert received_events[0].__class__ == VersionedEntity.Created
assert received_events[0].full_name == 'Mrs Boots'
assert received_events[0].originator_version == 0
assert received_events[0].originator_id == user.id
assert received_events[1].__class__ == VersionedEntity.AttributeChanged
assert received_events[1].value == 'Mr Boots'
assert received_events[1].name == '_full_name'
assert received_events[1].originator_version == 1
assert received_events[1].originator_id == user.id
# Clean up.
unsubscribe(handler=receive_event, predicate=is_domain_event)
del received_events[:] # received_events.clear()
Custom commands¶
The entity base classes can be extended with custom command methods. In general, the arguments of a command will be used to perform some work. Then, the result of the work will be used to trigger a domain event that represents what happened. Please note, command methods normally have no return value.
For example, the set_password()
method of the User
entity below is given
a raw password. It creates an encoded string from the raw password, and then uses
the __change_attribute__()
method to trigger an AttributeChanged
event for
the _password
attribute with the encoded password.
from eventsourcing.domain.model.decorators import attribute
class User(VersionedEntity):
def __init__(self, *args, **kwargs):
super(User, self).__init__(*args, **kwargs)
self._password = None
def set_password(self, raw_password):
# Do some work using the arguments of a command.
password = self._encode_password(raw_password)
# Change private _password attribute.
self.__change_attribute__('_password', password)
def check_password(self, raw_password):
password = self._encode_password(raw_password)
return self._password == password
def _encode_password(self, password):
return ''.join(reversed(password))
user = User(id='1', __version__=0)
user.set_password('password')
assert user.check_password('password')
Custom events¶
Custom events can be defined as inner or nested classes of the custom entity class.
In the code below, the entity class World
has a custom event called SomethingHappened
.
Custom event classes can extend the __mutate__()
method, so it affects
entities in a way that is specific to that type of event. More conveniently, event
classes can implement a mutate()
method, which avoids the need to call the
super method and return the obj. For example, the SomethingHappened
event class
has a _mutate()
which simple appends the event object to the entity’s history
attribute.
Custom events are normally triggered by custom commands. In the example below,
the command method make_it_so()
triggers the custom event SomethingHappened
.
from eventsourcing.domain.model.decorators import mutator
class World(VersionedEntity):
def __init__(self, *args, **kwargs):
super(World, self).__init__(*args, **kwargs)
self.history = []
def make_it_so(self, something):
# Do some work using the arguments of a command.
what_happened = something
# Trigger event with the results of the work.
self.__trigger_event__(World.SomethingHappened, what=what_happened)
class SomethingHappened(VersionedEntity.Event):
"""Published when something happens in the world."""
def mutate(self, obj):
obj.history.append(self)
A new world can now be created, using the create()
method. The command make_it_so()
can
be used to make things happen in this world. When something happens, the history of the world
is augmented with the new event.
world = World.__create__()
world.make_it_so('dinosaurs')
world.make_it_so('trucks')
world.make_it_so('internet')
assert world.history[0].what == 'dinosaurs'
assert world.history[1].what == 'trucks'
assert world.history[2].what == 'internet'
Aggregate root¶
Eric Evans’ book Domain Driven Design describes an abstraction called “aggregate”:
“An aggregate is a cluster of associated objects that we treat as a unit for the purpose of data changes. Each aggregate has a root and a boundary.”
Therefore,
“Cluster the entities and value objects into aggregates and define boundaries around each. Choose one entity to be the root of each aggregate, and control all access to the objects inside the boundary through the root. Allow external objects to hold references to the root only.”
In this situation, one aggregate command may result in many events. We need to prevent the situation where other threads pick up only some of the events, but not all of them, which could present the aggregate in an inconsistent, or unusual, and perhaps unworkable state.
In other words, we need to avoid the situation where some of the events have been stored successfully but others have not been. If the events from a command were stored in a series of independent database transactions, then events could be lost due to an inconvenient database connection problem. Later events in the series could fall into conflict because another thread has started appending events to the same sequence, potentially causing an incoherent state that would be difficult to repair.
Therefore, all the events from a command on an aggregate must be appended to the event store in a single atomic transaction, so that if some of the events resulting from executing a command cannot be stored then none of them will be stored. If all the events from an aggregate are to be written to a database as a single atomic operation, they must also be published all together as a single list.
The library has a domain entity class called
AggregateRoot
that can be
useful in a domain driven design, especially where a single command can cause
many events to be published. The AggregateRoot
entity class extends
TimestampedVersionedEntity
. It overrides the __publish__()
method of
the base class, so that triggered events are published only to a private list
of pending events, rather than directly to the publish-subscribe mechanism. It
also adds a method called __save__()
, which publishes all
pending events to the publish-subscribe mechanism as a single list.
It can be subclassed by custom aggregate root entities. In the example below, the
entity class World
inherits from AggregateRoot
.
from eventsourcing.domain.model.aggregate import AggregateRoot
class World(AggregateRoot):
"""
Example domain entity, with mutator function on domain event.
"""
def __init__(self, *args, **kwargs):
super(World, self).__init__(*args, **kwargs)
self.history = []
def make_things_so(self, *somethings):
for something in somethings:
self.__trigger_event__(World.SomethingHappened, what=something)
class SomethingHappened(AggregateRoot.Event):
def mutate(self, obj):
obj.history.append(self)
The World
aggregate root has a command method make_things_so()
which publishes
SomethingHappened
events. The mutate()
method of the SomethingHappened
class
simply appends the event (self
) to the aggregate object obj
.
We can see the events that are published by subscribing to the handler receive_events()
.
assert len(received_events) == 0
subscribe(handler=receive_event)
# Create new world.
world = World.__create__()
assert isinstance(world, World)
# Command that publishes many events.
world.make_things_so('dinosaurs', 'trucks', 'internet')
# State of aggregate object has changed
# but no events have been published yet.
assert len(received_events) == 0
assert world.history[0].what == 'dinosaurs'
assert world.history[1].what == 'trucks'
assert world.history[2].what == 'internet'
Events are pending, and will not be published until the __save__()
method is called.
# Has pending events.
assert len(world.__pending_events__) == 4
# Publish pending events.
world.__save__()
# Pending events published as a list.
assert len(received_events) == 1
assert len(received_events[0]) == 4
# No longer any pending events.
assert len(world.__pending_events__) == 0
# Clean up.
unsubscribe(handler=receive_event)
del received_events[:] # received_events.clear()
Application¶
The application layer combines objects from the domain and infrastructure layers.
Overview¶
An application object normally has repositories and policies. A repository allows aggregates to be retrieved by ID, using a dictionary-like interface. Whereas aggregates implement commands that publish events, obversely, policies subscribe to events and then execute commands as events are received. An application can be well understood by understanding its policies, aggregates, commands, and events.
An application object can have methods (“application services”) which provide a relatively simple interface for client operations, hiding the complexity and usage of the application’s domain and infrastructure layers.
Application services can be developed outside-in, with a test- or behaviour-driven development approach. A test suite can be imagined as an interface that uses the application. Interfaces are outside the scope of the application layer.
To run the examples below, please install the library with the ‘sqlalchemy’ option.
$ pip install eventsourcing[sqlalchemy]
Simple application¶
The library provides a simple application class SimpleApplication
which can be constructed directly.
Its uri
attribute is an SQLAlchemy-style database connection
string. An SQLAlchemy thread-scoped session facade will be setup
using the uri
value.
uri = 'sqlite:///:memory:'
As you can see, this example is using SQLite to manage
an in memory relational database. You can change uri
to any valid connection string.
Here are some example connection strings: for an SQLite
file; for a PostgreSQL database; or for a MySQL database.
See SQLAlchemy’s create_engine() documentation for details.
You may need to install drivers for your database management
system (such as psycopg2
or mysqlclient
).
sqlite:////tmp/mydatabase
postgresql://scott:tiger@localhost:5432/mydatabase
mysql://scott:tiger@hostname/dbname
Encryption is optionally enabled in SimpleApplication
with a
suitable AES key (16, 24, or 32 random bytes encoded as Base64).
from eventsourcing.utils.random import encode_random_bytes
# Keep this safe (random bytes encoded with Base64).
cipher_key = encode_random_bytes(num_bytes=32)
An application object can be constructed with these values
as constructor argument. The uri
value can alternatively
be set as environment variable DB_URI
. The cipher_key
value can be set as environment variable CIPHER_KEY
.
from eventsourcing.application.simple import SimpleApplication
app = SimpleApplication(
uri='sqlite:///:memory:',
cipher_key=cipher_key
)
Alternatively to using the uri
argument, an already existing SQLAlchemy
session can be passed in with the session
argument, for example
a session object provided by a framework such as
Flask-SQLAlchemy.
Once constructed, the SimpleApplication
will have an event store, provided
by the library’s EventStore
class, for which it uses the library’s
infrastructure classes for SQLAlchemy.
app.event_store
The SimpleApplication
uses the library function
construct_sqlalchemy_eventstore()
to construct its event store,
for integer-sequenced items with SQLAlchemy.
To use different infrastructure for storing events, subclass the
SimpleApplication
class and override the method setup_event_store()
.
You can read about the available alternatives in the
infrastructure layer documentation.
The SimpleApplication
also has a persistence policy, provided by the
library’s PersistencePolicy
class.
app.persistence_policy
The persistence policy appends domain events to its event store whenever they are published.
The SimpleApplication
also has a repository, an instance of
the library’s EventSourcedRepository
class.
app.repository
Both the repository and persistence policy use the event store.
The aggregate repository is generic, and can retrieve all aggregates in an application, regardless of their class.
The SimpleApplication
can be used as a context manager.
The example below uses the AggregateRoot
class directly
to create a new aggregate object that is available in the
application’s repository.
from eventsourcing.domain.model.aggregate import AggregateRoot
with app:
obj = AggregateRoot.__create__()
obj.__change_attribute__(name='a', value=1)
assert obj.a == 1
obj.__save__()
# Check the repository has the latest values.
copy = app.repository[obj.id]
assert copy.a == 1
# Check the aggregate can be discarded.
copy.__discard__()
assert copy.id not in app.repository
# Check optimistic concurrency control is working ok.
from eventsourcing.exceptions import ConcurrencyError
try:
obj.__change_attribute__(name='a', value=2)
obj.__save__()
except ConcurrencyError:
pass
else:
raise Exception("Shouldn't get here")
Because of the unique constraint on the sequenced item table, it isn’t
possible to branch the evolution of an entity and store two events
at the same version. Hence, if the entity you are working on has been
updated elsewhere, an attempt to update your object will cause a
ConcurrencyError
exception to be raised.
Custom application¶
The SimpleApplication
class can be extended.
The example below shows a custom application class MyApplication
that
extends SimpleApplication
with application service create_aggregate()
that can create new CustomAggregate
entities.
class MyApplication(SimpleApplication):
def create_aggregate(self, a):
return CustomAggregate.__create__(a=1)
The application code above depends on an entity class called
CustomAggregate
, which is defined below. It extends the
library’s AggregateRoot
entity with an event sourced, mutable
attribute a
.
from eventsourcing.domain.model.decorators import attribute
class CustomAggregate(AggregateRoot):
def __init__(self, a, **kwargs):
super(CustomAggregate, self).__init__(**kwargs)
self._a = a
@attribute
def a(self):
"""Mutable attribute a."""
For more sophisticated domain models, please read about the custom entities, commands, and domain events that can be developed using classes from the library’s domain model layer.
Run the code¶
The custom application object can be constructed.
# Construct application object.
app = MyApplication(uri='sqlite:///:memory:')
The application service aggregate factor method create_aggregate()
can be called.
# Create aggregate using application service, and save it.
aggregate = app.create_aggregate(a=1)
aggregate.__save__()
Existing aggregates can be retrieved by ID using the repository’s dictionary-like interface.
# Aggregate is in the repository.
assert aggregate.id in app.repository
# Get aggregate using dictionary-like interface.
aggregate = app.repository[aggregate.id]
assert aggregate.a == 1
Changes to the aggregate’s attribute a
are visible in
the repository once pending events have been published.
# Change attribute value.
aggregate.a = 2
aggregate.a = 3
# Don't forget to save!
aggregate.__save__()
# Retrieve again from repository.
aggregate = app.repository[aggregate.id]
# Check attribute has new value.
assert aggregate.a == 3
The aggregate can be discarded. After being saved, a discarded aggregate will no longer be available in the repository.
# Discard the aggregate.
aggregate.__discard__()
# Check discarded aggregate no longer exists in repository.
assert aggregate.id not in app.repository
Attempts to retrieve an aggregate that does not
exist will cause a KeyError
to be raised.
# Fail to get aggregate from dictionary-like interface.
try:
app.repository[aggregate.id]
except KeyError:
pass
else:
raise Exception("Shouldn't get here")
Stored events¶
It is always possible to get the domain events for an aggregate,
by using the application’s event store method get_domain_events()
.
events = app.event_store.get_domain_events(originator_id=aggregate.id)
assert len(events) == 4
assert events[0].originator_id == aggregate.id
assert isinstance(events[0], CustomAggregate.Created)
assert events[0].a == 1
assert events[1].originator_id == aggregate.id
assert isinstance(events[1], CustomAggregate.AttributeChanged)
assert events[1].name == '_a'
assert events[1].value == 2
assert events[2].originator_id == aggregate.id
assert isinstance(events[2], CustomAggregate.AttributeChanged)
assert events[2].name == '_a'
assert events[2].value == 3
assert events[3].originator_id == aggregate.id
assert isinstance(events[3], CustomAggregate.Discarded)
Sequenced items¶
It is also possible to get the sequenced item namedtuples for an aggregate,
by using the event store’s active record strategy method get_items()
.
items = app.event_store.active_record_strategy.list_items(aggregate.id)
assert len(items) == 4
assert items[0].originator_id == aggregate.id
assert items[0].event_type == 'eventsourcing.domain.model.aggregate#AggregateRoot.Created'
assert '"a":1' in items[0].state, items[0].state
assert '"timestamp":' in items[0].state
assert items[1].originator_id == aggregate.id
assert items[1].event_type == 'eventsourcing.domain.model.aggregate#AggregateRoot.AttributeChanged'
assert '"name":"_a"' in items[1].state
assert '"timestamp":' in items[1].state
assert items[2].originator_id == aggregate.id
assert items[2].event_type == 'eventsourcing.domain.model.aggregate#AggregateRoot.AttributeChanged'
assert '"name":"_a"' in items[2].state
assert '"timestamp":' in items[2].state
assert items[3].originator_id == aggregate.id
assert items[3].event_type == 'eventsourcing.domain.model.aggregate#AggregateRoot.Discarded'
assert '"timestamp":' in items[3].state
In this example, the cipher_key
was not set, so the stored data is visible.
Database records¶
Of course, it is also possible to just use the active record class directly to obtain records. After all, it’s just an SQLAlchemy ORM object.
app.event_store.active_record_strategy.active_record_class
The query
property of the SQLAlchemy active record strategy
is a convenient way to get a query object for the active record
class from the session.
active_records = app.event_store.active_record_strategy.query.all()
assert len(active_records) == 4
Close¶
If the application isn’t being used as a context manager, then it is useful to unsubscribe any handlers subscribed by the policies (avoids dangling handlers being called inappropriately, if the process isn’t going to terminate immediately, such as when this documentation is tested as part of the library’s test suite).
# Clean up.
app.close()
Snapshotting¶
Snapshots provide a fast path for obtaining the state of an entity or aggregate that skips replaying some or all of the entity’s events.
If a repository is constructed with a snapshot strategy object, it will try to get the closest snapshot to the required version of a requested entity, and then replay only those events that will take the snapshot up to the state at that version.
It is recommended not to co-mingle saved snapshots with the entity event sequence.
Snapshots can be taken manually. To automatically generate snapshots, a snapshotting policy can take snapshots whenever a particular condition occurs, for example after every ten events.
Domain¶
To avoid duplicating code from the previous sections, let’s
use the example entity class Example
and its factory function create_new_example()
from the library.
from eventsourcing.example.domainmodel import Example, create_new_example
Application¶
The library class SnapshottingApplication
,
extends SimpleApplication
by setting up
infrastructure for snapshotting, such as a snapshot store, a dedicated table for
snapshots, and a policy to take snapshots every so many events.
from eventsourcing.application.simple import SnapshottingApplication
Run the code¶
In the example below, snapshots of entities are taken every period
number of
events.
with SnapshottingApplication(period=2) as app:
# Create an entity.
entity = create_new_example(foo='bar1')
# Check there's no snapshot, only one event so far.
snapshot = app.snapshot_strategy.get_snapshot(entity.id)
assert snapshot is None
# Change an attribute, generates a second event.
entity.foo = 'bar2'
# Check the snapshot.
snapshot = app.snapshot_strategy.get_snapshot(entity.id)
assert snapshot.state['_foo'] == 'bar2'
# Check can recover entity using snapshot.
assert entity.id in app.repository
assert app.repository[entity.id].foo == 'bar2'
# Check snapshot after five events.
entity.foo = 'bar3'
entity.foo = 'bar4'
entity.foo = 'bar5'
snapshot = app.snapshot_strategy.get_snapshot(entity.id)
assert snapshot.state['_foo'] == 'bar4'
# Check snapshot after seven events.
entity.foo = 'bar6'
entity.foo = 'bar7'
assert app.repository[entity.id].foo == 'bar7'
snapshot = app.snapshot_strategy.get_snapshot(entity.id)
assert snapshot.state['_foo'] == 'bar6'
# Check snapshot state is None after discarding the entity on the eighth event.
entity.__discard__()
assert entity.id not in app.repository
snapshot = app.snapshot_strategy.get_snapshot(entity.id)
assert snapshot.state is None
try:
app.repository[entity.id]
except KeyError:
pass
else:
raise Exception('KeyError was not raised')
# Get historical snapshots.
snapshot = app.snapshot_strategy.get_snapshot(entity.id, lte=2)
assert snapshot.state['___version__'] == 1 # one behind
assert snapshot.state['_foo'] == 'bar2'
snapshot = app.snapshot_strategy.get_snapshot(entity.id, lte=3)
assert snapshot.state['___version__'] == 3
assert snapshot.state['_foo'] == 'bar4'
# Get historical entities.
entity = app.repository.get_entity(entity.id, at=0)
assert entity.__version__ == 0
assert entity.foo == 'bar1', entity.foo
entity = app.repository.get_entity(entity.id, at=1)
assert entity.__version__ == 1
assert entity.foo == 'bar2', entity.foo
entity = app.repository.get_entity(entity.id, at=2)
assert entity.__version__ == 2
assert entity.foo == 'bar3', entity.foo
entity = app.repository.get_entity(entity.id, at=3)
assert entity.__version__ == 3
assert entity.foo == 'bar4', entity.foo
Stand-alone example¶
In this section, an event sourced application is developed that has minimal dependencies on the library.
A stand-alone domain model is developed without library classes, which shows how event sourcing in Python can work. The stand-alone code examples here are simplified versions of the library classes. Infrastructure classes from the library are used explicitly to show the different components involved, so you can understand how to make variations.
Domain¶
Let’s start with the domain model. If the state of an event sourced application is determined by a sequence of events, then we need to define some events.
Domain events¶
You may wish to use a technique such as “event storming” to identify or decide what happens in your domain. In this example, for the sake of general familiarity let’s assume we have a domain in which things can be “created”, “changed”, and “discarded”. With that in mind, we can begin to write some domain event classes.
In the example below, there are three domain event classes: Created
,
AttributeChanged
, and Discarded
. The common aspects of the domain
event classes have been pulled up to a layer supertype DomainEvent
.
import time
class DomainEvent(object):
"""
Supertype for domain event objects.
"""
def __init__(self, originator_id, originator_version, **kwargs):
self.originator_id = originator_id
self.originator_version = originator_version
self.__dict__.update(kwargs)
class Created(DomainEvent):
"""
Published when an entity is created.
"""
def __init__(self, **kwargs):
super(Created, self).__init__(originator_version=0, **kwargs)
class AttributeChanged(DomainEvent):
"""
Published when an attribute value is changed.
"""
def __init__(self, name, value, **kwargs):
super(AttributeChanged, self).__init__(**kwargs)
self.name = name
self.value = value
class Discarded(DomainEvent):
"""
Published when an entity is discarded.
"""
Please note, the domain event classes above do not depend on the library. The library does
however contain a collection of different kinds of domain event classes that you can use
in your models, for example see
Created
,
AttributeChanged
, and
Discarded
.
Publish-subscribe¶
Since we are dealing with events, let’s define a simple publish-subscribe mechanism for them.
subscribers = []
def publish(event):
for subscriber in subscribers:
subscriber(event)
def subscribe(subscriber):
subscribers.append(subscriber)
def unsubscribe(subscriber):
subscribers.remove(subscriber)
Domain entity¶
Now, let’s define a domain entity that publishes the event classes defined above.
The entity class Example
below has an ID and a version number. It also
has a property foo
with a “setter” method, and a method discard()
to use
when the entity is no longer needed.
The entity methods follow a similar pattern. At some point, each
constructs an event that represents the result of the operation.
Then each uses a “mutator function” mutate()
(see below) to
apply the event to the entity. Finally, each publishes the event
for the benefit of any subscribers, by using the function publish()
.
import uuid
class Example(object):
"""
Example domain entity.
"""
def __init__(self, originator_id, originator_version=0, foo=''):
self._id = originator_id
self.___version__ = originator_version
self._is_discarded = False
self._foo = foo
@property
def id(self):
return self._id
@property
def __version__(self):
return self.___version__
@property
def foo(self):
return self._foo
@foo.setter
def foo(self, value):
assert not self._is_discarded
# Construct an 'AttributeChanged' event object.
event = AttributeChanged(
originator_id=self.id,
originator_version=self.__version__,
name='foo',
value=value,
)
# Apply the event to self.
mutate(self, event)
# Publish the event for others.
publish(event)
def discard(self):
assert not self._is_discarded
# Construct a 'Discarded' event object.
event = Discarded(
originator_id=self.id,
originator_version=self.__version__
)
# Apply the event to self.
mutate(self, event)
# Publish the event for others.
publish(event)
A factory can be used to create new “example” entities. The function
create_new_example()
below works in a similar way to the entity
methods, creating new entities by firstly constructing a Created
event, then using the function mutate()
(see below) to construct the entity
object, and finally publishing the event for others before returning
the new entity object to the caller.
def create_new_example(foo):
"""
Factory for Example entities.
"""
# Construct an entity ID.
entity_id = uuid.uuid4()
# Construct a 'Created' event object.
event = Created(
originator_id=entity_id,
foo=foo
)
# Use the mutator function to construct the entity object.
entity = mutate(None, event)
# Publish the event for others.
publish(event=event)
# Return the new entity.
return entity
The example entity class does not depend on the library. In particular, it doesn’t
inherit from a “magical” entity base class that makes everything work. The example
here just publishes events that it has applied to itself. The library does however
contain domain entity classes that you can use to build your domain model, for
example the class AggregateRoot
.
The library classes are more developed than the examples here.
Mutator function¶
The mutator function mutate()
below handles Created
events by constructing
an object. It handles AttributeChanged
events by setting an attribute value, and it
handles Discarded
events by marking the entity as discarded. Each handler increases the
version of the entity, so that the version of the entity is always one plus the
the originator version of the last event that was applied.
When replaying a sequence of events, for example when reconstructing an entity from its domain events, the mutator function is called many times in order to apply each event in the sequence to an evolving initial state.
def mutate(entity, event):
"""
Mutator function for Example entities.
"""
# Handle "created" events by constructing the entity object.
if isinstance(event, Created):
entity = Example(**event.__dict__)
entity.___version__ += 1
return entity
# Handle "value changed" events by setting the named value.
elif isinstance(event, AttributeChanged):
assert not entity._is_discarded
setattr(entity, '_' + event.name, event.value)
entity.___version__ += 1
return entity
# Handle "discarded" events by returning 'None'.
elif isinstance(event, Discarded):
assert not entity._is_discarded
entity.___version__ += 1
entity._is_discarded = True
return None
else:
raise NotImplementedError(type(event))
For the sake of simplicity in this example, an if-else block is used to structure
the mutator function. The library has a function decorator
mutator()
that allows a default mutator
function to register handlers for different types of event, much like singledispatch.
Run the code¶
Let’s firstly subscribe to receive the events that will be published, so we can see what happened.
# A list of received events.
received_events = []
# Subscribe to receive published events.
subscribe(lambda e: received_events.append(e))
With this stand-alone code, we can create a new example entity object. We can update its property
foo
, and we can discard the entity using the discard()
method.
# Create a new entity using the factory.
entity = create_new_example(foo='bar')
# Check the entity has an ID.
assert entity.id
# Check the entity has a version number.
assert entity.__version__ == 1
# Check the received events.
assert len(received_events) == 1, received_events
assert isinstance(received_events[0], Created)
assert received_events[0].originator_id == entity.id
assert received_events[0].originator_version == 0
assert received_events[0].foo == 'bar'
# Check the value of property 'foo'.
assert entity.foo == 'bar'
# Update property 'foo'.
entity.foo = 'baz'
# Check the new value of 'foo'.
assert entity.foo == 'baz'
# Check the version number has increased.
assert entity.__version__ == 2
# Check the received events.
assert len(received_events) == 2, received_events
assert isinstance(received_events[1], AttributeChanged)
assert received_events[1].originator_version == 1
assert received_events[1].name == 'foo'
assert received_events[1].value == 'baz'
Infrastructure¶
Since the application state is determined by a sequence of events, the application must somehow be able both to persist the events, and then recover the entities.
Please note, storing and replaying events to persist and to reconstruct the state of an application is the primary capability of this library. The domain and application and interface capabilities are offered as a supplement to the infrastructural capabilities, and have been added to the library partly as a way of shaping and validating the infrastructure, partly to demonstrate how the core capabilities may be applied, but also as a convenient way of reusing foundational code so that attention can remain on the problem domain (framework).
To run the code in this section, please install the library with the ‘sqlalchemy’ option.
$ pip install eventsourcing[sqlalchemy]
Database table¶
Let’s start by setting up a simple database table that can store sequences of items. We can use SQLAlchemy directly to define a database table that stores items in sequences, with a single identity for each sequence, and with each item positioned in its sequence by an integer index number.
from sqlalchemy.ext.declarative.api import declarative_base
from sqlalchemy.sql.schema import Column, Sequence, Index
from sqlalchemy.sql.sqltypes import BigInteger, Integer, String, Text
from sqlalchemy_utils import UUIDType
ActiveRecord = declarative_base()
class SequencedItemRecord(ActiveRecord):
__tablename__ = 'sequenced_items'
id = Column(BigInteger().with_variant(Integer, "sqlite"), primary_key=True)
# Sequence ID (e.g. an entity or aggregate ID).
sequence_id = Column(UUIDType(), nullable=False)
# Position (index) of item in sequence.
position = Column(BigInteger(), nullable=False)
# Topic of the item (e.g. path to domain event class).
topic = Column(String(255))
# State of the item (serialized dict, possibly encrypted).
data = Column(Text())
__table_args__ = Index('index', 'sequence_id', 'position', unique=True),
The library has a class
IntegerSequencedItemRecord
which is very similar to the above.
Next, create the database table. For convenience, the SQLAlchemy objects can be adapted
with the class
SQLAlchemyDatastore
, which
provides a simple interface for the two operations we require: setup_connection()
and setup_tables()
.
from eventsourcing.infrastructure.sqlalchemy.datastore import SQLAlchemySettings, SQLAlchemyDatastore
datastore = SQLAlchemyDatastore(
base=ActiveRecord,
settings=SQLAlchemySettings(uri='sqlite:///:memory:'),
)
datastore.setup_connection()
datastore.setup_table(SequencedItemRecord)
As you can see from the uri
argument above, this example is using SQLite to manage
an in memory relational database. You can change uri
to any valid connection string.
Here are some example connection strings: for an SQLite file; for a PostgreSQL database; and
for a MySQL database. See SQLAlchemy’s create_engine() documentation for details. You may need
to install drivers for your database management system.
sqlite:////tmp/mydatabase
postgresql://scott:tiger@localhost:5432/mydatabase
mysql://scott:tiger@hostname/dbname
Event store¶
To support different kinds of sequences in the domain model, and to allow for
different database schemas, the library has an event store class
EventStore
that uses
a “sequenced item mapper” for mapping domain events to “sequenced items” - this
library’s archetype persistence model for storing events. The sequenced item
mapper derives the values of sequenced item fields from the attributes of domain
events.
The event store then uses an “active record strategy” to persist the sequenced items into a particular database management system. The active record strategy uses an active record class to manipulate records in a particular database table.
Hence you can use a different database table by substituting an alternative active record class. You can use a different database management system by substituting an alternative active record strategy.
from eventsourcing.infrastructure.eventstore import EventStore
from eventsourcing.infrastructure.sqlalchemy.activerecords import SQLAlchemyActiveRecordStrategy
from eventsourcing.infrastructure.sequenceditemmapper import SequencedItemMapper
active_record_strategy = SQLAlchemyActiveRecordStrategy(
session=datastore.session,
active_record_class=SequencedItemRecord,
)
sequenced_item_mapper = SequencedItemMapper(
sequence_id_attr_name='originator_id',
position_attr_name='originator_version'
)
event_store = EventStore(
active_record_strategy=active_record_strategy,
sequenced_item_mapper=sequenced_item_mapper
)
In the code above, the sequence_id_attr_name
value given to the sequenced item
mapper is the name of the domain events attribute that will be used as the ID
of the mapped sequenced item, The position_attr_name
argument informs the
sequenced item mapper which event attribute should be used to position the item
in the sequence. The values originator_id
and originator_version
correspond
to attributes of the domain event classes we defined in the domain model section above.
Entity repository¶
It is common to retrieve entities from a repository. An event sourced repository
for the example
entity class can be constructed directly using library class
EventSourcedRepository
.
In this example, the repository is given an event store object. The repository is
also given the mutator function mutate()
defined above.
from eventsourcing.infrastructure.eventsourcedrepository import EventSourcedRepository
example_repository = EventSourcedRepository(
event_store=event_store,
mutator_func=mutate
)
Run the code¶
Now, let’s firstly write the events we received earlier into the event store.
# Put each received event into the event store.
for event in received_events:
event_store.append(event)
# Check the events exist in the event store.
stored_events = event_store.get_domain_events(entity.id)
assert len(stored_events) == 2, (received_events, stored_events)
The entity can now be retrieved from the repository, using its dictionary-like interface.
retrieved_entity = example_repository[entity.id]
assert retrieved_entity.foo == 'baz'
Sequenced items¶
Remember that we can always get the sequenced items directly from the active record
strategy. A sequenced item is tuple containing a serialised representation of the
domain event. The library class
SequencedItem
is a Python namedtuple
with four fields: sequence_id
, position
, topic
, and data
.
In this example, an event’s originator_id
attribute is mapped to the sequence_id
field, and the event’s originator_version
attribute is mapped to the position
field. The topic
field of a sequenced item is used to identify the event class, and
the data
field represents the state of the event (normally a JSON string).
sequenced_items = event_store.active_record_strategy.list_items(entity.id)
assert len(sequenced_items) == 2
assert sequenced_items[0].sequence_id == entity.id
assert sequenced_items[0].position == 0
assert 'Created' in sequenced_items[0].topic
assert 'bar' in sequenced_items[0].data
assert sequenced_items[1].sequence_id == entity.id
assert sequenced_items[1].position == 1
assert 'AttributeChanged' in sequenced_items[1].topic
assert 'baz' in sequenced_items[1].data
Application¶
Although we can do everything at the module level, an application object brings
it all together. In the example below, the class ExampleApplication
has an
event store, and an entity repository. The application also has a persistence policy.
Persistence policy¶
The persistence policy below subscribes to receive events whenever they are published. It uses an event store to store events whenever they are received.
class PersistencePolicy(object):
def __init__(self, event_store):
self.event_store = event_store
subscribe(self.store_event)
def close(self):
unsubscribe(self.store_event)
def store_event(self, event):
self.event_store.append(event)
A slightly more developed class PersistencePolicy
is included in the library.
Application object¶
As a convenience, it is useful to make the application function as a Python context manager, so that the application can close the persistence policy, and unsubscribe from receiving further domain events.
class ExampleApplication(object):
def __init__(self, session):
# Construct event store.
self.event_store = EventStore(
active_record_strategy=SQLAlchemyActiveRecordStrategy(
active_record_class=SequencedItemRecord,
session=session,
),
sequenced_item_mapper=SequencedItemMapper(
sequence_id_attr_name='originator_id',
position_attr_name='originator_version'
)
)
# Construct persistence policy.
self.persistence_policy = PersistencePolicy(
event_store=self.event_store
)
# Construct example repository.
self.example_repository = EventSourcedRepository(
event_store=self.event_store,
mutator_func=mutate
)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.persistence_policy.close()
A more developed class ExampleApplication
can be found in the library. It is used in later sections of this guide.
Run the code¶
With the application object, we can create more example entities and expect they will be available immediately in the repository.
Please note, an entity that has been discarded by using its discard()
method
cannot subsequently be retrieved from the repository using its ID. In particular,
the repository’s dictionary-like interface will raise a Python KeyError
exception instead of returning an entity.
with ExampleApplication(datastore.session) as app:
# Create a new entity.
example = create_new_example(foo='bar')
# Read.
assert example.id in app.example_repository
assert app.example_repository[example.id].foo == 'bar'
# Update.
example.foo = 'baz'
assert app.example_repository[example.id].foo == 'baz'
# Delete.
example.discard()
assert example.id not in app.example_repository
Projections and notifications¶
If a projection is just another mutator function that operates on a sequence of events, and a persistent projection is a snapshot of the resulting state, then the new thing we need for projections of the application state is a sequence of all the events of the application. This section introduces the notification log, and assumes your projections and your persistent projections can be coded using techniques for coding mutator functions and snapshots introduced in previous sections.
Synchronous update¶
In a simple situation, you may wish to update a view of an aggregate synchronously whenever there are changes. If each view model depends only on one aggregate, you may wish simply to subscribe to the events of the aggregate. Then, whenever an event occurs, the projection can be updated.
The library has a decorator function
subscribe_to()
that can be used for this purpose.
@subscribe_to(Todo.Created)
def new_todo_projection(event):
todo = TodoProjection(id=event.originator_id, title=event.title)
todo.save()
The view model could be saved as a normal record, or stored in a sequence that follows the event originator version numbers, perhaps as snapshots, so that concurrent handling of events will not lead to a later state being overwritten by an earlier state. Older versions of the view could be deleted later.
If the view fails to update after the domain event has been stored, then the view will become inconsistent. Since it is not desirable to delete the event once it has been stored, the command must return normally despite the view update failing, so that the command is not retried. The failure to update will need to be logged, or otherwise handled, in a similar way to failures of asynchronous updates.
The big issue with this approach is that if the first event of an aggregate is not processed, there is no way of knowing the aggregate exists, and so there is nothing that can be used to check for updates to that aggregate.
Asynchronous update¶
The fundamental concern is to accomplish high fidelity when propagating a stream of events, so that events are neither missed nor are they duplicated. Once the stream of events has been propagated faithfully, it can be republished and subscribers can execute commands as above.
As Vaughn Vernon suggests in his book Implementing Domain Driven Design:
“at least two mechanisms in a messaging solution must always be consistent with each other: the persistence store used by the domain model, and the persistence store backing the messaging infrastructure used to forward the Events published by the model. This is required to ensure that when the model’s changes are persisted, Event delivery is also guaranteed, and that if an Event is delivered through messaging, it indicates a true situation reflected by the model that published it. If either of these is out of lockstep with the other, it will lead to incorrect states in one or more interdependent models.”
There are three options, he continues. The first option is to have the messaging infrastructure and the domain model share the same persistence store, so changes to the model and insertion of new messages commit in the same local transaction. The second option is to have separate datastores for domain model and messaging but have a two phase commit, or global transaction, across the two.
The third option is to have the bounded context control notifications. Vaughn Vernon is his book Implementing Domain Driven Design relies on the simple logic of an ascending sequence of integers to allow others to progress along the event stream. That is the approach taken here.
A pull mechanism that allows others to pull events they don’t yet have can be used to allow remote components to catch up. The same mechanism can be used if a component is developed after the application has been deployed and so requires initialising from an established application stream, or otherwise needs to be reconstructed from scratch.
As we will see below, updates can be triggered by pushing the notifications to messaging infrastructure, and having the remote components subscribe. If anything goes wrong with messaging infrastructure, such that a notification is not received, remote components can detect they have missed a notification and pull the notifications they have missed.
Application log¶
In order to update a projection of more than one aggregate, or the application state as a whole, we need a single sequence to log all the events of the application.
We want an application log that follows an increasing sequence of integers. The application log must also be capable of storing a very large sequence of events, neither swamping an individual database partition nor distributing things across partitions without any particular order so that iterating through the sequence is slow and expensive. We also want the application log effectively to have constant time read and write operations.
The library class
BigArray
satisfies these
requirements quite well. It is a tree of arrays, with a root array
that stores references to the current apex, with an apex that contains
references to arrays, which either contain references to lower arrays
or contain the items assigned to the big array. Each array uses one database
partition, and is limited is size (the array size) to ensure the partition
is never too large. The identity of each array can be calculated directly
from the index number, so it is possible to identify arrays directly
without traversing the tree to discover entity IDs. The capacity of base
arrays is the array size to the power of the array size. For a reasonable
size of array, it isn’t really possible to fill up the base of such an
array tree, but the slow growing properties of this tree mean that for
all imaginable scenarios, the performance will be approximately constant
as items are appended to the big array.
Items can be appended to a big array using the append()
method.
The append() method identifies the next available index in the array,
and then assigns the item to that index in the array. A
ConcurrencyError
will be raised if
the position is already taken.
The performance of the append()
method is proportional to the log of the
index in the array, to the base of the array size used in the big array, rounded
up to the nearest integer, plus one (because of the root sequence that tracks
the apex). For example, if the sub-array size is 10,000, then it will take only 50%
longer to append the 100,000,000th item to the big array than the 1st one. By
the time the 1,000,000,000,000th index position is assigned to a big array, the
append()
method will take only twice as long as the 1st.
That’s because the performance of the append()
method is dominated by the
need to walk down the big array’s tree of arrays to find the highest assigned
index. Once the index of the next position is known, the item can be assigned
directly to an array.
from uuid import uuid4
from eventsourcing.domain.model.array import BigArray, ItemAssigned
from eventsourcing.infrastructure.sqlalchemy.activerecords import SQLAlchemyActiveRecordStrategy
from eventsourcing.infrastructure.sqlalchemy.activerecords import StoredEventRecord
from eventsourcing.infrastructure.sqlalchemy.datastore import SQLAlchemyDatastore, SQLAlchemySettings
from eventsourcing.infrastructure.eventstore import EventStore
from eventsourcing.infrastructure.repositories.array import BigArrayRepository
from eventsourcing.application.policies import PersistencePolicy
from eventsourcing.infrastructure.sequenceditem import StoredEvent
from eventsourcing.infrastructure.sequenceditemmapper import SequencedItemMapper
datastore = SQLAlchemyDatastore(
settings=SQLAlchemySettings(),
tables=[StoredEventRecord],
)
datastore.setup_connection()
datastore.setup_tables()
event_store = EventStore(
active_record_strategy=SQLAlchemyActiveRecordStrategy(
session=datastore.session,
active_record_class=StoredEventRecord,
sequenced_item_class=StoredEvent,
),
sequenced_item_mapper=SequencedItemMapper(
sequenced_item_class=StoredEvent,
)
)
persistence_policy = PersistencePolicy(
event_store=event_store,
event_type=ItemAssigned,
)
array_id = uuid4()
repo = BigArrayRepository(
event_store=event_store,
array_size=10000
)
application_log = repo[array_id]
application_log.append('event0')
application_log.append('event1')
application_log.append('event2')
application_log.append('event3')
Because there is a small duration of time between checking for the next
position and using it, another thread could jump in and use the position
first. If that happens, a ConcurrencyError
will be raised by the BigArray
object. In such a case, another attempt can be made to append the item.
Items can be assigned directly to a big array using an index number. If an item has already been assigned to the same position, a concurrency error will be raised, and the original item will remain in place. Items cannot be unassigned from an array, hence each position in the array can be assigned once only.
The average performance of assigning an item is a constant time. The worst case is the log of the index with base equal to the array size, which occurs when containing arrays are added, so that the last highest assigned index can be discovered. The probability of departing from average performance is inversely proportional to the array size, since the the larger the array size, the less often the base arrays fill up. For a decent array size, the probability of needing to build the tree is very low. And when the tree does need building, it doesn’t take very long (and most of it probably already exists).
from eventsourcing.exceptions import ConcurrencyError
assert application_log.get_next_position() == 4
application_log[4] = 'event4'
try:
application_log[4] = 'event4a'
except ConcurrencyError:
pass
else:
raise
If the next available position in the array must be identified
each time an item is assigned, the amount of contention will increase
as the number of threads increases. Using the append()
method alone
will work if the time period of appending events is greater than the
time it takes to identify the next available index and assign to it.
At that rate, any contention will not lead to congestion. Different
nodes can take their chances assigning to what they believe is an
unassigned index, and if another has already taken that position,
the operation can be retried.
However, there will be an upper limit to the rate at which events can be appended, and contention will eventually lead to congestion that will cause requests to backup or be spilled.
The rate of assigning items to the big array can be greatly increased
by centralizing the generation of the sequence of integers. Instead of
discovering the next position from the array each time an item is assigned,
an integer sequence generator can be used to generate a contiguous sequence
of integers. This technique eliminates contention around assigning items to
the big array entirely. In consequence, the bandwidth of assigning to a big
array using an integer sequence generator is much greater than using the
append()
method.
If the application is executed in only one process, the number generator can
be a simple Python object. The library class
SimpleIntegerSequenceGenerator
generates a contiguous sequence of integers that can be shared across multiple
threads in the same process.
from eventsourcing.infrastructure.integersequencegenerators.base import SimpleIntegerSequenceGenerator
integers = SimpleIntegerSequenceGenerator()
generated = []
for i in integers:
if i >= 5:
break
generated.append(i)
expected = list(range(5))
assert generated == expected, (generated, expected)
If the application is deployed across many nodes, an external integer sequence
generator can be used. There are many possible solutions. The library class
RedisIncr
uses Redis’ INCR command to generate a contiguous sequence of integers
that can be shared be processes running on different nodes.
Using Redis doesn’t necessarily create a single point of failure. Redundancy can be obtained using clustered Redis. Although there aren’t synchronous updates between nodes, so that the INCR command may issue the same numbers more than once, these numbers can only ever be used once. As failures are retried, the position will eventually reach an unassigned index position. Arrangements can be made to set the value from the highest assigned index. With care, the worst case will be an occasional slight delay in storing events, caused by switching to a new Redis node and catching up with the current index number. Please note, there is currently no code in the library to update or resync the Redis key used in the Redis INCR integer sequence generator.
from eventsourcing.infrastructure.integersequencegenerators.redisincr import RedisIncr
integers = RedisIncr()
generated = []
for i in integers:
generated.append(i)
if i >= 4:
break
expected = list(range(5))
assert generated == expected, (generated, expected)
The integer sequence generator can be used when assigning items to the application log.
application_log[next(integers)] = 'event5'
application_log[next(integers)] = 'event6'
assert application_log.get_next_position() == 7
Items can be read from the application log using an index or a slice.
The performance of reading an item at a given index is always constant time with respect to the number of the index. The base array ID, and the index of the item in the base array, can be calculated from the number of the index.
The performance of reading a slice of items is proportional to the size of the slice. Consecutive items in a base array are stored consecutively in the same database partition, and if the slice overlaps more than base array, the iteration proceeds to the next partition.
assert application_log[0] == 'event0'
assert list(application_log[5:7]) == ['event5', 'event6']
The application log can be written to by a persistence policy. References to events can be assigned to the application log before the domain event is written to the aggregate’s own sequence, so that it isn’t possible to store an event in the aggregate’s sequence that is not already in the application log.
Commands that fail to write to the aggregate’s sequence (due to an operation error or concurrency error) after the event has been logged in the application log should probably raise an exception, so that the command is seen to have failed and so may be retried. This leaves an item in the notification log, but not a domain event in the aggregate stream (a dangling reference, that may be satisfied later). If the command failed due to an operational error, the same event maybe published again, and so it would appear twice in the application log. And so whilst events in the application log that aren’t in the aggregate sequence can perhaps be ignored by consumers of the application log, care should be taken to deduplicate events.
If writing the event to its aggregate sequence is successful, then it is possible to push a notification about the event to a message queue. Failing to push the notification perhaps should not prevent the command returning normally. Push notifications could also be generated by another process, something that pulls from the application log, and pushes notifications for events that have not already been sent.
Notification log¶
As described in Implementing Domain Driven Design, a notification log is presented in linked sections. The “current section” is returned by default, and contains the very latest notification and some of the preceding notifications. There are also archived sections that contain all the earlier notifications. When the current section is full, it is considered to be an archived section that links to the new current section.
Readers can navigate the linked sections from the current section backwards until the archived section is reached that contains the last notification seen by the client. If the client has not yet seen any notifications, it will navigate back to the first section. Readers can then navigate forwards, revealing all existing notifications that have not yet been seen.
The library class NotificationLog
encapsulates the application log and presents linked sections. The library class
NotificationLogReader
is an iterator
that yields notifications. It navigates the sections of the notification log, and
maintains position so that it can continue when there are further notifications.
The position can be set directly with the seek()
method. The position is set
indirectly when a slice is taken with a start index. The position is set to zero
when the reader is constructed.
The notification log uses a big array object. In this example, the big array object is directly the application log above. It is possible to project the application log into a custom notification log, perhaps to deduplicate domain events, or to anonymise data, or to send messages to messaging infrastructure with more stateful control.
from eventsourcing.interface.notificationlog import NotificationLog, NotificationLogReader
# Construct notification log.
notification_log = NotificationLog(application_log, section_size=10)
# Get the "current "section from the notification log (numbering follows Vaughn Vernon's book)
section = notification_log['current']
assert section.section_id == '1,10'
assert len(section.items) == 7, section.items
assert section.previous_id == None
assert section.next_id == None
# Construct log reader.
reader = NotificationLogReader(notification_log)
# The position is zero by default.
assert reader.position == 0
# The position can be set directly.
reader.seek(10)
assert reader.position == 10
# Reset the position.
reader.seek(0)
# Read all existing notifications.
all_notifications = list(reader)
assert all_notifications == ['event0', 'event1', 'event2', 'event3', 'event4', 'event5', 'event6']
# Check the position has advanced.
assert reader.position == 7
# Read all subsequent notifications (should be none).
subsequent_notifications = list(reader)
assert subsequent_notifications == []
# Assign more events to the application log.
application_log[next(integers)] = 'event7'
application_log[next(integers)] = 'event8'
# Read all subsequent notifications (should be two).
subsequent_notifications = list(reader)
assert subsequent_notifications == ['event7', 'event8']
# Check the position has advanced.
assert reader.position == 9
# Read all subsequent notifications (should be none).
subsequent_notifications = list(reader)
assert subsequent_notifications == []
# Assign more events to the application log.
application_log[next(integers)] = 'event9'
application_log[next(integers)] = 'event10'
application_log[next(integers)] = 'event11'
# Read all subsequent notifications (should be two).
subsequent_notifications = list(reader)
assert subsequent_notifications == ['event9', 'event10', 'event11']
# Check the position has advanced.
assert reader.position == 12
# Read all subsequent notifications (should be none).
subsequent_notifications = list(reader)
assert subsequent_notifications == []
# Get the "current "section from the notification log (numbering follows Vaughn Vernon's book)
section = notification_log['current']
assert section.section_id == '11,20'
assert section.previous_id == '1,10'
assert section.next_id == None
assert len(section.items) == 2, len(section.items)
# Get the first section from the notification log (numbering follows Vaughn Vernon's book)
section = notification_log['1,10']
assert section.section_id == '1,10'
assert section.previous_id == None
assert section.next_id == '11,20'
assert len(section.items) == 10, section.items
The RESTful API design in Implementing Domain Driven Design suggests a good way to present the notification log, a way that is simple and can scale using established HTTP technology.
The library function present_section()
serializes sections from the notification log for use in a view.
import json
from eventsourcing.interface.notificationlog import present_section
content = present_section(application_log, '1,10', 10)
expected = {
"items": [
"event0",
"event1",
"event2",
"event3",
"event4",
"event5",
"event6",
"event7",
"event8",
"event9"
],
"next_id": "11,20",
"previous_id": None,
"section_id": "1,10"
}
assert json.loads(content) == expected
A Web application view can pick out from the request path the notification
log ID and the section ID, and return an HTTP response with the JSON content
that results from calling present_section()
.
The library class RemoteNotificationLog
issues HTTP requests to a RESTful API that presents sections from the notification log.
It has the same interface as NotificationLog
and so can be used by NotificationLogReader
progressively to obtain unseen notifications.
Deployment¶
This section gives an overview of the concerns that arise when using an eventsourcing application in Web applications and task queue workers. There are many combinations of frameworks, databases, and process models. The complicated aspect is setting up the database configuration to work well with the framework. Your event sourcing application can be constructed just after the database is configured, and before requests are handled.
Please note, unlike the code snippets in the other examples,
the snippets of code in this section are merely
suggestive, and do not form a complete working program.
For a working example using Flask and SQLAlchemy, please refer
to the library module eventsourcing.example.interface.flaskapp
,
which is tested both stand-alone and with uWSGI.
Application object¶
In general you need one, and only one, instance of your application object in each process. If your eventsourcing application object has any policies, for example if is has a persistence policy that will persist events whenever they are published, then constructing more than one instance of the application causes the policy event handlers to be subscribed more than once, so for example more than one attempt will be made to save each event, which won’t work.
To make sure there is only one instance of your application object in each process, one possible arrangement (see below) is to have a module with two functions and a variable. The first function constructs an application object and assigns it to the variable, and can perhaps be called when a module is imported, or from a suitable hook or signal designed for setting things up before any requests are handled. A second function returns the application object assigned to the variable, and can be called by any views, or request or task handlers, that depend on the application’s services.
Although the first function below must be called only once, the second
function can be called many times. The example functions below have
been written relatively strictly so that, when it is called, the function
init_application()
will raise an exception if it has already been
called, and get_application()
will raise an exeception if
init_application()
has not already been called.
# Your eventsourcing application.
class ExampleApplication(object):
def __init__(*args, **kwargs):
pass
def construct_application(**kwargs):
return ExampleApplication(**kwargs)
_application = None
def init_application(**kwargs):
global _application
if _application is not None:
raise AssertionError("init_application() has already been called")
_application = construct_application(**kwargs)
def get_application():
if application is None:
raise AssertionError("init_application() must be called first")
return application
As an aside, if you will use these function also in your test suite, and your
test suite needs to set up the application more than once, you will also need
a close_application()
function that closes the application object,
unsubscribing any handlers, and resetting the module level variable so that
init_application()
can be called again. If doesn’t really matter
if you don’t close your application at the end of the process lifetime, however
you may wish to close any database or other connections to network services.
def close_application():
global application
if application is not None:
application.close()
application = None
Lazy initialization¶
An alternative to having separate “init” and “get” functions is having one “get” function that does lazy initialization of the application object when first requested. With lazy initialization, the getter will first check if the object it needs to return has been constructed, and will then return the object. If the object hasn’t been constructed, before returning the object it will construct the object. So you could use a lock around the construction of the object, to make sure it only happens once. After the lock is obtained and before the object is constructed, it is recommended to check again that the object wasn’t constructed by another thread before the lock was acquired.
import threading
application = None
lock = threading.Lock()
def get_application():
global application
if application is None:
lock.acquire()
try:
# Check again to avoid a TOCTOU bug.
if application is None:
application = construct_application()
finally:
lock.release()
return application
Database connection¶
Typically, your eventsourcing application object will be constructed after its database connection has been configured, and before any requests are handled. Views or tasks can then safely use the already constructed application object.
If your eventsourcing application depends on receiving a database session object when it is constructed, for example if you are using the SQLAlchemy classes in this library, then you will need to create a correctly scoped session object first and use it to construct the application object.
On the other hand, if your eventsourcing application does not depend on receiving a database session object when it is constructed, for example if you are using the Cassandra classes in this library, then you may construct the application object before configuring the database connection - just be careful not to use the application object before the database connection is configured otherwise your queries just won’t work.
Setting up connections to databases is out of scope of the eventsourcing application classes, and should be set up in a “normal” way. The documentation for your Web or worker framework may describe when to set up database connections, and your database documentation may also have some suggestions. It is recommended to make use of any hooks or decorators or signals intended for the purpose of setting up the database connection also to construct the application once for the process. See below for some suggestions.
SQLAlchemy¶
SQLAlchemy has very good documentation about constructing sessions. If you are an SQLAlchemy user, it is well worth reading the documentation about sessions in full. Here’s a small quote:
Some web frameworks include infrastructure to assist in the task of aligning the lifespan of a Session with that of a web request. This includes products such as Flask-SQLAlchemy for usage in conjunction with the Flask web framework, and Zope-SQLAlchemy, typically used with the Pyramid framework. SQLAlchemy recommends that these products be used as available.
In those situations where the integration libraries are not provided or are insufficient, SQLAlchemy includes its own “helper” class known as scoped_session. A tutorial on the usage of this object is at Contextual/Thread-local Sessions. It provides both a quick way to associate a Session with the current thread, as well as patterns to associate Session objects with other kinds of scopes.
The important thing is to use a scoped session, and it is better to have the session scoped to the request or task, rather than the thread, but scoping to the thread is ok.
As soon as you have a scoped session object, you can construct your eventsourcing application.
Web interfaces¶
uWSGI¶
If you are running uWSGI in prefork mode, and not using a Web application framework, please note that uWSGI has a postfork decorator which may help.
Your “wsgi.py” file can have a module-level function decorated with the @postfork
decorator that initialises your eventsourcing application for the Web application process
after child workers have been forked.
from uwsgidecorators import postfork
@postfork
def init_process():
# Set up database connection.
database = {}
# Construct eventsourcing application.
init_application()
Other decorators are available.
Flask with Cassandra¶
The Cassandra Driver FAQ has a code snippet about establishing the connection with the uWSGI postfork decorator, when running in a forked mode.
from flask import Flask
from uwsgidecorators import postfork
from cassandra.cluster import Cluster
session = None
prepared = None
@postfork
def connect():
global session, prepared
session = Cluster().connect()
prepared = session.prepare("SELECT release_version FROM system.local WHERE key=?")
app = Flask(__name__)
@app.route('/')
def server_version():
row = session.execute(prepared, ('local',))[0]
return row.release_version
Flask-Cassandra¶
The Flask-Cassandra project serves a similar function to Flask-SQLAlchemy.
Flask-SQLAlchemy¶
If you wish to use eventsourcing with Flask and SQLAlchemy, then you may wish to use Flask-SQLAlchemy. You just need to define your active record class using the model classes from that library, and then use it instead of the library classes in your eventsourcing application object, along with the session object it provides.
The docs snippet below shows that it can work simply to construct the eventsourcing application in the same place as the Flask application object.
The Flask-SQLAlchemy class SQLAlchemy is used to set up a session object that is scoped to the request.
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy_utils.types.uuid import UUIDType
from eventsourcing.infrastructure.sqlalchemy.activerecords import SQLAlchemyActiveRecordStrategy
# Construct Flask application.
application = Flask(__name__)
# Construct Flask-SQLAlchemy object.
db = SQLAlchemy(application)
# Define database table using Flask-SQLAlchemy library.
class IntegerSequencedItem(db.Model):
__tablename__ = 'integer_sequenced_items'
id = Column(BigInteger().with_variant(Integer, "sqlite"), primary_key=True)
# Sequence ID (e.g. an entity or aggregate ID).
sequence_id = db.Column(UUIDType(), nullable=False)
# Position (index) of item in sequence.
position = db.Column(db.BigInteger(), nullable=False)
# Topic of the item (e.g. path to domain event class).
topic = db.Column(db.String(255))
# State of the item (serialized dict, possibly encrypted).
data = db.Column(db.Text())
# Index.
__table_args__ = db.Index('index', 'sequence_id', 'position', unique=True),
# Construct eventsourcing application with db table and session.
init_application(
entity_active_record_strategy=SQLAlchemyActiveRecordStrategy(
active_record_class=IntegerSequencedItem,
session=db.session,
)
)
For a working example using Flask and SQLAlchemy, please
refer to the library module eventsourcing.example.interface.flaskapp
,
which is tested both stand-alone and with uWSGI.
The Flask application method “before_first_request” is used to decorate an
application object constructor, just before a request is made, so that the
module can be imported by the test suite, without immediately constructing
the application.
Django-Cassandra¶
If you wish to use eventsourcing with Django and Cassandra, you may wish to use Django-Cassandra.
It’s also possible to use this library directly with Django and Cassandra. You just need to configure the connection and initialise the application before handling requests in a way that is correct for your configuration.
Django ORM¶
The excellent project djangoevents by Applause is a Django app that provides a neat way of taking an event sourcing approach in a Django project. It allows this library to be used seamlessly with Django, by using the Django ORM to store events. Using djangoevents is well documented in the README file. It adds some nice enhancements to the capabilities of this library, and shows how various components can be extended or replaced. Please note, the djangoevents project currently works with a previous version of this library.
Zope-SQLAlchemy¶
The Zope-SQLAlchemy project serves a similar function to Flask-SQLAlchemy.
Task queues¶
This section contains suggestions about using an eventsourcing application in task queue workers.
Celery¶
Celery has a worker_process_init signal decorator, which may be appropriate if you are running Celery workers in prefork mode. Other decorators are available.
Your Celery tasks or config module can have a module-level function decorated with
the @worker-process-init
decorator that initialises your eventsourcing application
for the Celery worker process.
from celery.signals import worker_process_init
@worker_process_init.connect
def init_process(sender=None, conf=None, **kwargs):
# Set up database connection.
database = {}
# Construct eventsourcing application.
init_application()
As an alternative, it may work to use decorator @task_prerun
with a getter that supports lazy initialization.
from celery.signals import task_prerun
@task_prerun.connect
def init_process(*args, **kwargs):
get_appliation(lazy_init=True)
Once the application has been safely initialized once
in the process, your Celery tasks can use function get_application()
to complete their work. Of course, you could just call a getter with lazy
initialization from the tasks.
from celery import Celery
app = Celery()
# Use Celery app to route the task to the worker.
@app.task
def hello_world():
# Use eventsourcing app to complete the task.
app = get_application()
return "Hello World, {}".format(id(app))
Again, the most important thing is configuring the database, and making things work across all modes of execution, including your test suite.
Redis Queue¶
Redis queue workers are
quite similar to Celery workers. You can call get_application()
from within a job function. To fit with the style in the RQ
documentation, you could perhaps use your eventsourcing application
as a context manager, just like the Redis connection example.
Release notes¶
It is the aim of the project that releases with the same major version number are backwards compatible, within the scope of the documented examples. New major versions indicate a backward incompatible changes have been introduced since the previous major version.
Version 4.x series was released after quite a lot of refactoring made things backward-incompatible. Object namespaces for entity and event classes was cleaned up, by moving library names to double-underscore prefixed and postfixed names. Domain events can be hashed, and also hash-chained together, allowing entity state to be verified. Created events were changed to have originator_topic, which allowed other things such as mutators and repositories to be greatly simplified. Mutators are now by default expected to be implemented on entity event classes. Event timestamps were changed from floats to decimal objects, an exact number type. Cipher was changed to use AES-GCM to allow verification of encrypted data retrieved from a database.
Also, the active record classes for SQLAlchemy were changed to have an auto-incrementing ID, to make it easy to follow the events of an application, for example when updating view models, without additional complication of a separate application log. This change makes the SQLAlchemy library classes ultimately less “scalable” than the Cassandra classes, because an auto-incrementing ID must operate from a single thread. Overall, it seems like a good trade-off for early-stage development. Later, when the auto-incrementing ID bottleneck would otherwise throttle performance, “scaling-up” could involve switching application infrastructure to use a separate application log.
Version 3.x series was a released after quite of a lot of refactoring made things backwards-incompatible. Documentation was greatly improved, in particular with pages reflecting the architectural layers of the library (infrastructure, domain, application).
Version 2.x series was a major rewrite that implemented two distinct kinds of sequences: events sequenced by integer version numbers and events sequenced in time, with an archetypal “sequenced item” persistence model for storing events.
Version 1.x series was an extension of the version 0.x series, and attempted to bridge between sequencing events with both timestamps and version numbers.
Version 0.x series was the initial cut of the code, all events were sequenced by timestamps, or TimeUUIDs in Cassandra, because the project originally emerged whilst working with Cassandra.
Module docs¶
This document describes the packages, modules, classes, functions and other code details of the library.
eventsourcing¶
The eventsourcing package contains packages for the application layer, the domain layer, the infrastructure layer, and the interface layer. There is also a module for exceptions, an example package, and a utils module.
application¶
The application layer brings together the domain and infrastructure layers.
base¶
-
class
eventsourcing.application.base.
ApplicationWithEventStores
(entity_active_record_strategy=None, log_active_record_strategy=None, snapshot_active_record_strategy=None, always_encrypt=False, cipher=None)[source]¶ Bases:
object
Event sourced application object class.
Can construct event stores using given active records. Supports three different event stores: for log events, for entity events, and for snapshot events.
-
construct_event_store
(event_sequence_id_attr, event_position_attr, active_record_strategy, always_encrypt=False, cipher=None)[source]¶
-
construct_sequenced_item_mapper
(sequenced_item_class, event_sequence_id_attr, event_position_attr, json_encoder_class=<class 'eventsourcing.utils.transcoding.ObjectJSONEncoder'>, json_decoder_class=<class 'eventsourcing.utils.transcoding.ObjectJSONDecoder'>, always_encrypt=False, cipher=None)[source]¶
-
-
class
eventsourcing.application.base.
ApplicationWithPersistencePolicies
(**kwargs)[source]¶ Bases:
eventsourcing.application.base.ApplicationWithEventStores
policies¶
domain.model¶
The domain layer contains a domain model, and optionally services that work across different entities or aggregates.
The domain model package contains classes and functions that can help develop an event sourced domain model.
aggregate¶
Base classes for aggregates in a domain driven design.
-
class
eventsourcing.domain.model.aggregate.
AggregateRoot
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.TimestampedVersionedEntity
Root entity for an aggregate in a domain driven design.
-
class
AttributeChanged
(originator_version, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.aggregate.Event
,eventsourcing.domain.model.entity.AttributeChanged
Published when an AggregateRoot is changed.
-
class
Created
(originator_version=0, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.aggregate.Event
,eventsourcing.domain.model.entity.Created
Published when an AggregateRoot is created.
-
class
Discarded
(originator_version, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.aggregate.Event
,eventsourcing.domain.model.entity.Discarded
Published when an AggregateRoot is discarded.
-
class
array¶
A kind of collection, indexed by integer. Doesn’t need to replay all events to exist.
-
class
eventsourcing.domain.model.array.
AbstractArrayRepository
(array_size=10000, *args, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.AbstractEntityRepository
Repository for sequence objects.
-
class
eventsourcing.domain.model.array.
AbstractBigArrayRepository
[source]¶ Bases:
eventsourcing.domain.model.entity.AbstractEntityRepository
Repository for compound sequence objects.
-
subrepo
¶ Sub-sequence repository.
-
-
class
eventsourcing.domain.model.array.
Array
(array_id, repo)[source]¶ Bases:
object
-
class
eventsourcing.domain.model.array.
BigArray
(array_id, repo)[source]¶ Bases:
eventsourcing.domain.model.array.Array
A virtual array holding items in indexed positions, across a number of Array instances.
Getting and setting items at index position is supported. Slices are supported, and operate across the underlying arrays. Appending is also supported.
BigArray is designed to overcome the concern of needing a single large sequence that may not be suitably stored in any single partiton. In simple terms, if events of an aggregate can fit in a partition, we can use the same size partition to make a tree of arrays that will certainly be capable of sequencing all the events of the application in a single stream.
With normal size base arrays, enterprise applications can expect read and write time to be approximately constant with respect to the number of items in the array.
The array is composed of a tree of arrays, which gives the capacity equal to the size of each array to the power of the size of each array. If the arrays are limited to be about the maximum size of an aggregate event stream (a large number but not too many that would cause there to be too much data in any one partition, let’s say 1000s to be safe) then it would be possible to fit such a large number of aggregates in the corresponding BigArray, that we can be confident it would be full.
Write access time in the worst case, and the time to identify the index of the last item in the big array, is proportional to the log of the highest assigned index to the base of the underlying array size. Write time on average, and read time given an index, is contant with respect to the number of items in a BigArray.
Items can be appended in log time in a single thread. However, the time between reading the current last index and claiming the next position leads to contention and retries when there are lots of threads of execution all attempting to append items, which inherently limits throughput.
Todo: Not possible in Cassandra, but maybe do it in a transaction in SQLAlchemy?
An alternative to reading the last item before writing the next is to use an integer sequence generator to generate a stream of integers. Items can be assigned to index positions in a big array, according to the integers that are issued. Throughput will then be much better, and will be limited only by the rate at which the database can have events written to it (unless the number generator is quite slow).
An external integer sequence generator, such as Redis’ INCR command, or an auto-incrementing database column, may constitute a single point of failure.
collection¶
Decorators useful in domain models based on the classes in this library.
-
eventsourcing.domain.model.decorators.
attribute
(getter)[source]¶ When used as a method decorator, returns a property object with the method as the getter and a setter defined to call instance method change_attribute(), which publishes an AttributeChanged event.
-
eventsourcing.domain.model.decorators.
mutator
(arg=None)[source]¶ Structures mutator functions by allowing handlers to be registered for different types of event. When the decorated function is called with an initial value and an event, it will call the handler that has been registered for that type of event.
It works like singledispatch, which it uses. The difference is that when the decorated function is called, this decorator dispatches according to the type of last call arg, which fits better with reduce(). The builtin Python function reduce() is used by the library to replay a sequence of events against an initial state. If a mutator function is given to reduce(), along with a list of events and an initializer, reduce() will call the mutator function once for each event in the list, but the initializer will be the first value, and the event will be the last argument, and we want to dispatch according to the type of the event. It happens that singledispatch is coded to switch on the type of the first argument, which makes it unsuitable for structuring a mutator function without the modifications introduced here.
The other aspect introduced by this decorator function is the option to set the type of the handled entity in the decorator. When an entity is replayed from scratch, in other words when all its events are replayed, the initial state is None. The handler which handles the first event in the sequence will probably construct an object instance. It is possible to write the type into the handler, but that makes the entity more difficult to subclass because you will also need to write a handler for it. If the decorator is invoked with the type, when the initial value passed as a call arg to the mutator function is None, the handler will instead receive the type of the entity, which it can use to construct the entity object.
class Entity(object): class Created(object): pass @mutator(Entity) def mutate(initial, event): raise NotImplementedError(type(event)) @mutate.register(Entity.Created) def _(initial, event): return initial(**event.__dict__) entity = mutate(None, Entity.Created())
-
eventsourcing.domain.model.decorators.
random
() → x in the interval [0, 1).¶
-
eventsourcing.domain.model.decorators.
retry
(exc=<class 'Exception'>, max_retries=1, wait=0)[source]¶
-
eventsourcing.domain.model.decorators.
subscribe_to
(event_class)[source]¶ Decorator for making a custom event handler function subscribe to a certain event type
event_class: DomainEvent class or its child classes that the handler function should subscribe to
The following example shows a custom handler that reacts to Todo.Created event and saves a projection of a Todo model object.
@subscribe_to(Todo.Created) def new_todo_projection(event): todo = TodoProjection(id=event.originator_id, title=event.title) todo.save()
entity¶
Base classes for domain entities of different kinds.
The entity module provides base classes for domain entities.
-
class
eventsourcing.domain.model.entity.
AbstractEntityRepository
[source]¶ Bases:
eventsourcing.domain.model.entity.AbstractEventPlayer
-
event_store
¶ Returns event store object used by this repository.
-
-
class
eventsourcing.domain.model.entity.
DomainEntity
(id)[source]¶ Bases:
eventsourcing.domain.model.events.QualnameABC
Base class for domain entities.
-
class
AttributeChanged
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
,eventsourcing.domain.model.events.AttributeChanged
Published when a DomainEntity is discarded.
-
class
Created
(originator_topic, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
,eventsourcing.domain.model.events.Created
Published when an entity is created.
-
originator_topic
¶
-
-
class
Discarded
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.Discarded
,eventsourcing.domain.model.entity.Event
Published when a DomainEntity is discarded.
-
class
Event
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.EventWithOriginatorID
,eventsourcing.domain.model.events.DomainEvent
Supertype for events of domain entities.
-
__change_attribute__
(name, value)[source]¶ Changes named attribute with the given value, by triggering an AttributeChanged event.
-
__publish__
(event)[source]¶ Publishes given event for subscribers in the application.
Parameters: event – domain event or list of events
-
__publish_to_subscribers__
(event)[source]¶ Actually dispatches given event to publish-subscribe mechanism.
Parameters: event – domain event or list of events
-
__trigger_event__
(event_class, **kwargs)[source]¶ Constructs, applies, and publishes a domain event.
-
id
¶ Entity ID allows an entity instance to be referenced and distinguished from others, even though its state may change over time.
-
class
-
class
eventsourcing.domain.model.entity.
TimestampedEntity
(__created_on__, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.DomainEntity
-
class
AttributeChanged
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
,eventsourcing.domain.model.entity.AttributeChanged
Published when a TimestampedEntity is changed.
-
class
Created
(originator_topic, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Created
,eventsourcing.domain.model.entity.Event
Published when a TimestampedEntity is created.
-
class
Discarded
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
,eventsourcing.domain.model.entity.Discarded
Published when a TimestampedEntity is discarded.
-
class
Event
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
,eventsourcing.domain.model.events.EventWithTimestamp
Supertype for events of timestamped entities.
-
class
-
class
eventsourcing.domain.model.entity.
TimestampedVersionedEntity
(__created_on__, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.TimestampedEntity
,eventsourcing.domain.model.entity.VersionedEntity
-
class
AttributeChanged
(originator_version, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
,eventsourcing.domain.model.entity.AttributeChanged
,eventsourcing.domain.model.entity.AttributeChanged
Published when a TimestampedVersionedEntity is created.
-
class
Created
(originator_version=0, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Created
,eventsourcing.domain.model.entity.Created
,eventsourcing.domain.model.entity.Event
Published when a TimestampedVersionedEntity is created.
-
class
-
class
eventsourcing.domain.model.entity.
TimeuuidedVersionedEntity
(event_id, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.TimeuuidedEntity
,eventsourcing.domain.model.entity.VersionedEntity
-
class
eventsourcing.domain.model.entity.
VersionedEntity
(__version__=None, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.DomainEntity
-
class
AttributeChanged
(originator_version, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
,eventsourcing.domain.model.entity.AttributeChanged
Published when a VersionedEntity is changed.
-
class
Created
(originator_version=0, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Created
,eventsourcing.domain.model.entity.Event
Published when a VersionedEntity is created.
-
class
Discarded
(originator_version, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
,eventsourcing.domain.model.entity.Discarded
Published when a VersionedEntity is discarded.
-
class
Event
(originator_version, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.EventWithOriginatorVersion
,eventsourcing.domain.model.entity.Event
Supertype for events of versioned entities.
-
class
events¶
Base classes for domain events of different kinds.
-
class
eventsourcing.domain.model.events.
AttributeChanged
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.DomainEvent
Can be published when an attribute of an entity is created.
-
name
¶
-
value
¶
-
-
class
eventsourcing.domain.model.events.
Created
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.DomainEvent
Can be published when an entity is created.
-
class
eventsourcing.domain.model.events.
Discarded
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.DomainEvent
Published when something is discarded.
-
class
eventsourcing.domain.model.events.
DomainEvent
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.QualnameABC
Base class for domain events.
Implements methods to make instances read-only, comparable for equality, have recognisable representations, and hashable.
-
__hash__
()[source]¶ Computes a Python integer hash for an event, using its event hash string if available.
Supports equality and inequality comparisons.
-
__mutate__
(obj)[source]¶ Update obj with values from self.
Can be extended, but subclasses must call super method, and return an object.
Parameters: obj – object to be mutated Returns: mutated object
-
mutate
(obj)[source]¶ Convenience for use in custom models, to update obj with values from self without needing to call super method and return obj (two extra lines).
Can be overridden by subclasses. Any value returned by this method will be ignored.
Please note, subclasses that extend mutate() might not have fully completed that method before this method is called. To ensure all base classes have completed their mutate behaviour before mutating an event in a concrete class, extend mutate() instead of overriding this method.
Parameters: obj – object to be mutated
-
-
class
eventsourcing.domain.model.events.
EventWithOriginatorID
(originator_id, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.DomainEvent
-
originator_id
¶
-
-
class
eventsourcing.domain.model.events.
EventWithOriginatorVersion
(originator_version, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.DomainEvent
For events that have an originator version number.
-
originator_version
¶
-
-
class
eventsourcing.domain.model.events.
EventWithTimestamp
(timestamp=None, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.DomainEvent
For events that have a timestamp value.
-
timestamp
¶
-
-
class
eventsourcing.domain.model.events.
EventWithTimeuuid
(event_id=None, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.DomainEvent
For events that have an UUIDv1 event ID.
-
event_id
¶
-
-
class
eventsourcing.domain.model.events.
Logged
(**kwargs)[source]¶ Bases:
eventsourcing.domain.model.events.DomainEvent
Published when something is logged.
-
class
eventsourcing.domain.model.events.
QualnameABC
[source]¶ Bases:
object
Base class that introduces __qualname__ for objects in Python 2.7.
snapshot¶
Snapshotting is implemented in the domain layer as an event.
-
class
eventsourcing.domain.model.snapshot.
AbstractSnapshop
[source]¶ Bases:
object
-
originator_id
¶ ID of the snapshotted entity.
-
originator_version
¶ Version of the last event applied to the entity.
-
state
¶ State of the snapshotted entity.
-
topic
¶ Path to the class of the snapshotted entity.
-
-
class
eventsourcing.domain.model.snapshot.
Snapshot
(originator_id, originator_version, topic, state)[source]¶ Bases:
eventsourcing.domain.model.events.EventWithTimestamp
,eventsourcing.domain.model.events.EventWithOriginatorVersion
,eventsourcing.domain.model.events.EventWithOriginatorID
,eventsourcing.domain.model.snapshot.AbstractSnapshop
-
state
¶ State of the snapshotted entity.
-
topic
¶ Path to the class of the snapshotted entity.
-
timebucketedlog¶
Time-bucketed logs allow a sequence of the items that is sequenced by timestamp to be split across a number of different database partitions, which avoids one partition becoming very large (and then unworkable).
-
class
eventsourcing.domain.model.timebucketedlog.
MessageLogged
(message, originator_id)[source]¶ Bases:
eventsourcing.domain.model.events.EventWithTimestamp
,eventsourcing.domain.model.events.EventWithOriginatorID
,eventsourcing.domain.model.events.Logged
-
message
¶
-
-
class
eventsourcing.domain.model.timebucketedlog.
Timebucketedlog
(name, bucket_size=None, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.TimestampedVersionedEntity
-
class
BucketSizeChanged
(originator_version, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.timebucketedlog.Event
,eventsourcing.domain.model.entity.AttributeChanged
-
class
Event
(originator_version, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
Supertype for events of time-bucketed log.
-
class
Started
(originator_version=0, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Created
,eventsourcing.domain.model.timebucketedlog.Event
-
bucket_size
¶
-
name
¶
-
started_on
¶
-
class
-
class
eventsourcing.domain.model.timebucketedlog.
TimebucketedlogRepository
[source]¶ Bases:
eventsourcing.domain.model.entity.AbstractEntityRepository
-
get_or_create
(log_name, bucket_size)[source]¶ Gets or creates a log.
Return type: Timebucketedlog
-
-
eventsourcing.domain.model.timebucketedlog.
make_timebucket_id
(log_id, timestamp, bucket_size)[source]¶
infrastructure¶
The infrastructure layer adapts external devices in ways that are useful for the application, such as the way an event store encapsulates a database.
activerecord¶
Abstract base class for active record strategies.
-
class
eventsourcing.infrastructure.activerecord.
AbstractActiveRecordStrategy
(active_record_class, sequenced_item_class=<class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>)[source]¶ Bases:
object
-
all_items
()[source]¶ Returns all stored items from all sequences (possibly in chronological order, depending on database).
-
all_records
(*arg, **kwargs)[source]¶ Returns all records in the table (possibly in chronological order, depending on database).
-
cassandra¶
Classes for event sourcing with Apache Cassandra.
-
class
eventsourcing.infrastructure.cassandra.datastore.
ActiveRecord
(**values)[source]¶ Bases:
cassandra.cqlengine.models.Model
Supertype for active records in Cassandra.
-
exception
DoesNotExist
¶ Bases:
cassandra.cqlengine.models.DoesNotExist
-
exception
MultipleObjectsReturned
¶ Bases:
cassandra.cqlengine.models.MultipleObjectsReturned
-
pk
¶
-
exception
-
class
eventsourcing.infrastructure.cassandra.datastore.
CassandraDatastore
(tables, *args, **kwargs)[source]¶
-
class
eventsourcing.infrastructure.cassandra.datastore.
CassandraSettings
(hosts=None, port=None, protocol_version=None, default_keyspace=None, consistency=None, replication_factor=None, username=None, password=None)[source]¶ Bases:
eventsourcing.infrastructure.datastore.DatastoreSettings
-
CONSISTENCY_LEVEL
= 'LOCAL_QUORUM'¶
-
DEFAULT_KEYSPACE
= 'eventsourcing'¶
-
HOSTS
= ['127.0.0.1']¶
-
PORT
= 9042¶
-
PROTOCOL_VERSION
= 3¶
-
REPLICATION_FACTOR
= 1¶
-
-
class
eventsourcing.infrastructure.cassandra.activerecords.
CassandraActiveRecordStrategy
(active_record_class, sequenced_item_class=<class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>)[source]¶ Bases:
eventsourcing.infrastructure.activerecord.AbstractActiveRecordStrategy
-
from_active_record
(active_record)[source]¶ Returns a sequenced item instance, from given active record.
-
-
class
eventsourcing.infrastructure.cassandra.activerecords.
CqlTimeuuidSequencedItem
(**values)[source]¶ Bases:
eventsourcing.infrastructure.cassandra.datastore.ActiveRecord
Stores timeuuid-sequenced items in Cassandra.
-
exception
DoesNotExist
¶ Bases:
cassandra.cqlengine.models.DoesNotExist
-
exception
MultipleObjectsReturned
¶ Bases:
cassandra.cqlengine.models.MultipleObjectsReturned
-
data
= <cassandra.cqlengine.models.ColumnQueryEvaluator object>¶
-
pk
= <cassandra.cqlengine.models.ColumnQueryEvaluator object>¶
-
position
= <cassandra.cqlengine.models.ColumnQueryEvaluator object>¶
-
sequence_id
= <cassandra.cqlengine.models.ColumnQueryEvaluator object>¶
-
topic
= <cassandra.cqlengine.models.ColumnQueryEvaluator object>¶
-
exception
-
class
eventsourcing.infrastructure.cassandra.activerecords.
IntegerSequencedItemRecord
(**values)[source]¶ Bases:
eventsourcing.infrastructure.cassandra.datastore.ActiveRecord
Stores integer-sequenced items in Cassandra.
-
exception
DoesNotExist
¶ Bases:
cassandra.cqlengine.models.DoesNotExist
-
exception
MultipleObjectsReturned
¶ Bases:
cassandra.cqlengine.models.MultipleObjectsReturned
-
data
= <cassandra.cqlengine.models.ColumnQueryEvaluator object>¶
-
pk
= <cassandra.cqlengine.models.ColumnQueryEvaluator object>¶
-
position
= <cassandra.cqlengine.models.ColumnQueryEvaluator object>¶
-
sequence_id
= <cassandra.cqlengine.models.ColumnQueryEvaluator object>¶
-
topic
= <cassandra.cqlengine.models.ColumnQueryEvaluator object>¶
-
exception
-
class
eventsourcing.infrastructure.cassandra.activerecords.
SnapshotRecord
(**values)[source]¶ Bases:
eventsourcing.infrastructure.cassandra.datastore.ActiveRecord
Stores snapshots in Cassandra.
-
exception
DoesNotExist
¶ Bases:
cassandra.cqlengine.models.DoesNotExist
-
exception
MultipleObjectsReturned
¶ Bases:
cassandra.cqlengine.models.MultipleObjectsReturned
-
data
= <cassandra.cqlengine.models.ColumnQueryEvaluator object>¶
-
pk
= <cassandra.cqlengine.models.ColumnQueryEvaluator object>¶
-
position
= <cassandra.cqlengine.models.ColumnQueryEvaluator object>¶
-
sequence_id
= <cassandra.cqlengine.models.ColumnQueryEvaluator object>¶
-
topic
= <cassandra.cqlengine.models.ColumnQueryEvaluator object>¶
-
exception
-
class
eventsourcing.infrastructure.cassandra.activerecords.
StoredEventRecord
(**values)[source]¶ Bases:
eventsourcing.infrastructure.cassandra.datastore.ActiveRecord
Stores integer-sequenced items in Cassandra.
-
exception
DoesNotExist
¶ Bases:
cassandra.cqlengine.models.DoesNotExist
-
exception
MultipleObjectsReturned
¶ Bases:
cassandra.cqlengine.models.MultipleObjectsReturned
-
event_type
= <cassandra.cqlengine.models.ColumnQueryEvaluator object>¶
-
originator_id
= <cassandra.cqlengine.models.ColumnQueryEvaluator object>¶
-
originator_version
= <cassandra.cqlengine.models.ColumnQueryEvaluator object>¶
-
pk
= <cassandra.cqlengine.models.ColumnQueryEvaluator object>¶
-
state
= <cassandra.cqlengine.models.ColumnQueryEvaluator object>¶
-
exception
-
class
eventsourcing.infrastructure.cassandra.activerecords.
TimestampSequencedItemRecord
(**values)[source]¶ Bases:
eventsourcing.infrastructure.cassandra.datastore.ActiveRecord
Stores timestamp-sequenced items in Cassandra.
-
exception
DoesNotExist
¶ Bases:
cassandra.cqlengine.models.DoesNotExist
-
exception
MultipleObjectsReturned
¶ Bases:
cassandra.cqlengine.models.MultipleObjectsReturned
-
data
= <cassandra.cqlengine.models.ColumnQueryEvaluator object>¶
-
pk
= <cassandra.cqlengine.models.ColumnQueryEvaluator object>¶
-
position
= <cassandra.cqlengine.models.ColumnQueryEvaluator object>¶
-
sequence_id
= <cassandra.cqlengine.models.ColumnQueryEvaluator object>¶
-
topic
= <cassandra.cqlengine.models.ColumnQueryEvaluator object>¶
-
exception
datastore¶
Base classes for concrete datastore classes.
-
exception
eventsourcing.infrastructure.datastore.
DatastoreConnectionError
[source]¶ Bases:
eventsourcing.infrastructure.datastore.DatastoreError
-
class
eventsourcing.infrastructure.datastore.
DatastoreSettings
[source]¶ Bases:
object
Base class for settings for database connection used by a stored event repository.
-
exception
eventsourcing.infrastructure.datastore.
DatastoreTableError
[source]¶ Bases:
eventsourcing.infrastructure.datastore.DatastoreError
eventplayer¶
Base classes for event players of different kinds.
-
class
eventsourcing.infrastructure.eventplayer.
EventPlayer
(event_store, snapshot_strategy=None, use_cache=False, mutator_func=None)[source]¶ Bases:
eventsourcing.domain.model.entity.AbstractEventPlayer
-
event_store
¶
-
eventsourcedrepository¶
Base classes for event sourced repositories (not abstract, can be used directly).
-
class
eventsourcing.infrastructure.eventsourcedrepository.
EventSourcedRepository
(event_store, snapshot_strategy=None, use_cache=False, mutator_func=None)[source]¶ Bases:
eventsourcing.infrastructure.eventplayer.EventPlayer
,eventsourcing.domain.model.entity.AbstractEntityRepository
-
__contains__
(entity_id)[source]¶ Returns a boolean value according to whether entity with given ID exists.
-
eventstore¶
The event store provides the application-level interface to the event sourcing persistence mechanism.
-
class
eventsourcing.infrastructure.eventstore.
AbstractEventStore
[source]¶ Bases:
object
Abstract base class for event stores. Defines the methods expected of an event store by other classes in the library.
-
class
eventsourcing.infrastructure.eventstore.
EventStore
(active_record_strategy, sequenced_item_mapper)[source]¶ Bases:
eventsourcing.infrastructure.eventstore.AbstractEventStore
Event store appends domain events to stored sequences. It uses an active record strategy to map named tuples to database records, and it uses a sequenced item mapper to map named tuples to application-level objects.
-
__init__
(active_record_strategy, sequenced_item_mapper)[source]¶ Initialises event store object.
Parameters: - active_record_strategy – active record strategy
- sequenced_item_mapper – sequenced item mapper
-
all_domain_events
()[source]¶ Gets all domain events in the event store.
Returns: map object, yielding a sequence of domain events
-
append
(domain_event_or_events)[source]¶ Appends given domain event, or list of domain events, to their sequence.
Parameters: domain_event_or_events – domain event, or list of domain events
-
get_domain_event
(originator_id, eq)[source]¶ Gets a domain event from the sequence identified by originator_id at position eq.
Parameters: - originator_id – ID of a sequence of events
- eq – get item at this position
Returns: domain event
-
get_domain_events
(originator_id, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True, page_size=None)[source]¶ Gets domain events from the sequence identified by originator_id.
Parameters: - originator_id – ID of a sequence of events
- gt – get items after this position
- gte – get items at or after this position
- lt – get items before this position
- lte – get items before or at this position
- limit – get limited number of items
- is_ascending – get items from lowest position
- page_size – restrict and repeat database query
Returns: list of domain events
-
get_most_recent_event
(originator_id, lt=None, lte=None)[source]¶ Gets a domain event from the sequence identified by originator_id at the highest position.
Parameters: - originator_id – ID of a sequence of events
- lt – get highest before this position
- lte – get highest at or before this position
Returns: domain event
-
iterator_class
¶ alias of
SequencedItemIterator
-
integersequencegenerators¶
Different ways of generating sequences of integers.
-
class
eventsourcing.infrastructure.integersequencegenerators.base.
AbstractIntegerSequenceGenerator
[source]¶ Bases:
object
-
class
eventsourcing.infrastructure.integersequencegenerators.base.
SimpleIntegerSequenceGenerator
(i=0)[source]¶ Bases:
eventsourcing.infrastructure.integersequencegenerators.base.AbstractIntegerSequenceGenerator
-
class
eventsourcing.infrastructure.integersequencegenerators.redisincr.
RedisIncr
(redis=None, key=None)[source]¶ Bases:
eventsourcing.infrastructure.integersequencegenerators.base.AbstractIntegerSequenceGenerator
Generates a sequence of integers, using Redis’ INCR command.
Maximum number is 2**63, or 9223372036854775807, the maximum value of a 64 bit signed integer.
iterators¶
Different ways of getting sequenced items from a datastore.
-
class
eventsourcing.infrastructure.iterators.
AbstractSequencedItemIterator
(active_record_strategy, sequence_id, page_size=None, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True)[source]¶ Bases:
object
-
DEFAULT_PAGE_SIZE
= 1000¶
-
_inc_page_counter
()[source]¶ Increments the page counter.
- Each query result as a page, even if there are no items in the page. This really counts queries.
- it is easy to divide the number of events by the page size if the “correct” answer is required
- there will be a difference in the counts when the number of events can be exactly divided by the page size, because there is no way to know in advance that a full page is also the last page.
-
-
class
eventsourcing.infrastructure.iterators.
GetEntityEventsThread
(active_record_strategy, sequence_id, gt=None, gte=None, lt=None, lte=None, page_size=None, is_ascending=True, *args, **kwargs)[source]¶ Bases:
threading.Thread
-
class
eventsourcing.infrastructure.iterators.
SequencedItemIterator
(active_record_strategy, sequence_id, page_size=None, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True)[source]¶ Bases:
eventsourcing.infrastructure.iterators.AbstractSequencedItemIterator
-
class
eventsourcing.infrastructure.iterators.
ThreadedSequencedItemIterator
(active_record_strategy, sequence_id, page_size=None, gt=None, gte=None, lt=None, lte=None, limit=None, is_ascending=True)[source]¶ Bases:
eventsourcing.infrastructure.iterators.AbstractSequencedItemIterator
repositories¶
Repository base classes for entity classes defined in the library.
-
class
eventsourcing.infrastructure.repositories.array.
ArrayRepository
(array_size=10000, *args, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.array.AbstractArrayRepository
,eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository
-
class
eventsourcing.infrastructure.repositories.array.
BigArrayRepository
(array_size=10000, *args, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.array.AbstractBigArrayRepository
,eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository
-
subrepo
¶
-
subrepo_class
¶ alias of
ArrayRepository
-
-
class
eventsourcing.infrastructure.repositories.collection_repo.
CollectionRepository
(event_store, snapshot_strategy=None, use_cache=False, mutator_func=None)[source]¶ Bases:
eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository
,eventsourcing.domain.model.collection.AbstractCollectionRepository
Event sourced repository for the Collection domain model entity.
-
class
eventsourcing.infrastructure.repositories.timebucketedlog_repo.
TimebucketedlogRepo
(event_store, snapshot_strategy=None, use_cache=False, mutator_func=None)[source]¶ Bases:
eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository
,eventsourcing.domain.model.timebucketedlog.TimebucketedlogRepository
Event sourced repository for the Example domain model entity.
sequenceditem¶
The persistence model for storing events.
-
class
eventsourcing.infrastructure.sequenceditem.
SequencedItem
(sequence_id, position, topic, data)¶ Bases:
tuple
-
__getnewargs__
()¶ Return self as a plain tuple. Used by copy and pickle.
-
static
__new__
(_cls, sequence_id, position, topic, data)¶ Create new instance of SequencedItem(sequence_id, position, topic, data)
-
__repr__
()¶ Return a nicely formatted representation string
-
_asdict
()¶ Return a new OrderedDict which maps field names to their values.
-
classmethod
_make
(iterable, new=<built-in method __new__ of type object at 0xa385c0>, len=<built-in function len>)¶ Make a new SequencedItem object from a sequence or iterable
-
_replace
(_self, **kwds)¶ Return a new SequencedItem object replacing specified fields with new values
-
data
¶ Alias for field number 3
-
position
¶ Alias for field number 1
-
sequence_id
¶ Alias for field number 0
-
topic
¶ Alias for field number 2
-
-
class
eventsourcing.infrastructure.sequenceditem.
SequencedItemFieldNames
(sequenced_item_class)[source]¶ Bases:
object
-
data
¶
-
other_names
¶
-
position
¶
-
sequence_id
¶
-
topic
¶
-
-
class
eventsourcing.infrastructure.sequenceditem.
StoredEvent
(originator_id, originator_version, event_type, state)¶ Bases:
tuple
-
__getnewargs__
()¶ Return self as a plain tuple. Used by copy and pickle.
-
static
__new__
(_cls, originator_id, originator_version, event_type, state)¶ Create new instance of StoredEvent(originator_id, originator_version, event_type, state)
-
__repr__
()¶ Return a nicely formatted representation string
-
_asdict
()¶ Return a new OrderedDict which maps field names to their values.
-
classmethod
_make
(iterable, new=<built-in method __new__ of type object at 0xa385c0>, len=<built-in function len>)¶ Make a new StoredEvent object from a sequence or iterable
-
_replace
(_self, **kwds)¶ Return a new StoredEvent object replacing specified fields with new values
-
event_type
¶ Alias for field number 2
-
originator_id
¶ Alias for field number 0
-
originator_version
¶ Alias for field number 1
-
state
¶ Alias for field number 3
-
sequenceditemmapper¶
The sequenced item mapper maps sequenced items to application-level objects.
-
class
eventsourcing.infrastructure.sequenceditemmapper.
AbstractSequencedItemMapper
[source]¶ Bases:
object
-
class
eventsourcing.infrastructure.sequenceditemmapper.
SequencedItemMapper
(sequenced_item_class=<class 'eventsourcing.infrastructure.sequenceditem.SequencedItem'>, sequence_id_attr_name=None, position_attr_name=None, json_encoder_class=<class 'eventsourcing.utils.transcoding.ObjectJSONEncoder'>, json_decoder_class=<class 'eventsourcing.utils.transcoding.ObjectJSONDecoder'>, cipher=None, other_attr_names=())[source]¶ Bases:
eventsourcing.infrastructure.sequenceditemmapper.AbstractSequencedItemMapper
Uses JSON to transcode domain events.
-
construct_item_args
(domain_event)[source]¶ Constructs attributes of a sequenced item from the given domain event.
-
snapshotting¶
Snapshotting avoids having to replay an entire sequence of events to obtain the current state of a projection.
-
class
eventsourcing.infrastructure.snapshotting.
AbstractSnapshotStrategy
[source]¶ Bases:
object
-
get_snapshot
(entity_id, lt=None, lte=None)[source]¶ Gets the last snapshot for entity, optionally until a particular version number.
Return type: Snapshot
-
take_snapshot
(entity_id, entity, last_event_version)[source]¶ Takes a snapshot of entity, using given ID, state and version number.
Return type: AbstractSnapshop
-
-
class
eventsourcing.infrastructure.snapshotting.
EventSourcedSnapshotStrategy
(event_store)[source]¶ Bases:
eventsourcing.infrastructure.snapshotting.AbstractSnapshotStrategy
Snapshot strategy that uses an event sourced snapshot.
sqlalchemy¶
Classes for event sourcing with SQLAlchemy.
-
class
eventsourcing.infrastructure.sqlalchemy.activerecords.
IntegerSequencedItemRecord
(**kwargs)[source]¶ Bases:
sqlalchemy.ext.declarative.api.Base
-
__init__
(**kwargs)¶ A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs
.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
-
data
¶
-
id
¶
-
position
¶
-
sequence_id
¶
-
topic
¶
-
-
class
eventsourcing.infrastructure.sqlalchemy.activerecords.
SQLAlchemyActiveRecordStrategy
(session, *args, **kwargs)[source]¶ Bases:
eventsourcing.infrastructure.activerecord.AbstractActiveRecordStrategy
-
get_items
(sequence_id, gt=None, gte=None, lt=None, lte=None, limit=None, query_ascending=True, results_ascending=True)[source]¶
-
query
¶
-
-
class
eventsourcing.infrastructure.sqlalchemy.activerecords.
SnapshotRecord
(**kwargs)[source]¶ Bases:
sqlalchemy.ext.declarative.api.Base
-
__init__
(**kwargs)¶ A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs
.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
-
data
¶
-
position
¶
-
sequence_id
¶
-
topic
¶
-
-
class
eventsourcing.infrastructure.sqlalchemy.activerecords.
StoredEventRecord
(**kwargs)[source]¶ Bases:
sqlalchemy.ext.declarative.api.Base
-
__init__
(**kwargs)¶ A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs
.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
-
event_type
¶
-
id
¶
-
originator_id
¶
-
originator_version
¶
-
state
¶
-
-
class
eventsourcing.infrastructure.sqlalchemy.activerecords.
TimestampSequencedItemRecord
(**kwargs)[source]¶ Bases:
sqlalchemy.ext.declarative.api.Base
-
__init__
(**kwargs)¶ A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs
.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
-
data
¶
-
id
¶
-
position
¶
-
sequence_id
¶
-
topic
¶
-
-
class
eventsourcing.infrastructure.sqlalchemy.datastore.
SQLAlchemyDatastore
(base=<class 'sqlalchemy.ext.declarative.api.Base'>, tables=None, connection_strategy='plain', session=None, **kwargs)[source]¶ Bases:
eventsourcing.infrastructure.datastore.Datastore
-
session
¶
-
-
class
eventsourcing.infrastructure.sqlalchemy.datastore.
SQLAlchemySettings
(uri=None)[source]¶ Bases:
eventsourcing.infrastructure.datastore.DatastoreSettings
-
DB_URI
= 'sqlite:///:memory:'¶
-
timebucketedlog_reader¶
Reader for timebucketed logs.
-
class
eventsourcing.infrastructure.timebucketedlog_reader.
TimebucketedlogReader
(log, event_store, page_size=50)[source]¶ Bases:
object
-
eventsourcing.infrastructure.timebucketedlog_reader.
get_timebucketedlog_reader
(log, event_store)[source]¶ Return type: TimebucketedlogReader
interface¶
The interface layer uses an application to service client requests.
notificationlog¶
Notification log is a pull-based mechanism for updating other applications.
-
class
eventsourcing.interface.notificationlog.
AbstractNotificationLog
[source]¶ Bases:
object
Presents a sequence of sections from a sequence of notifications.
-
class
eventsourcing.interface.notificationlog.
NotificationLog
(big_array, section_size)[source]¶ Bases:
eventsourcing.interface.notificationlog.AbstractNotificationLog
-
class
eventsourcing.interface.notificationlog.
NotificationLogReader
(notification_log)[source]¶ Bases:
object
-
class
eventsourcing.interface.notificationlog.
RemoteNotificationLog
(base_url, notification_log_id)[source]¶ Bases:
eventsourcing.interface.notificationlog.AbstractNotificationLog
-
class
eventsourcing.interface.notificationlog.
Section
(section_id, items, previous_id=None, next_id=None)[source]¶ Bases:
object
Section of a notification log.
Contains items, and has an ID.
May also have either IDs of previous and next sections of the notification log.
utils¶
The utils package contains common functions that are used in more than one layer.
cipher¶
time¶
-
eventsourcing.utils.times.
datetime_from_timestamp
(t)[source]¶ Returns a datetime from a decimal UNIX timestamp.
Parameters: t – timestamp, either Decimal or float Returns: datetime.datetime object
-
eventsourcing.utils.times.
decimaltimestamp
(t=None)[source]¶ A UNIX timestamp as a Decimal object (exact number type).
Returns current time when called without args, otherwise converts given floating point number
t
to a Decimal with 9 decimal places.Parameters: t – Floating point UNIX timestamp (“seconds since epoch”). Returns: A Decimal with 6 decimal places, representing the given floating point or the value returned by time.time().
topic¶
-
eventsourcing.utils.topic.
get_topic
(domain_class)[source]¶ Returns a string describing a class.
- Args:
- domain_class: A class.
- Returns:
- A string describing the class.
-
eventsourcing.utils.topic.
resolve_attr
(obj, path)[source]¶ A recursive version of getattr for navigating dotted paths.
- Args:
- obj: An object for which we want to retrieve a nested attribute. path: A dot separated string containing zero or more attribute names.
- Returns:
- The attribute referred to by obj.a1.a2.a3…
- Raises:
- AttributeError: If there is no such attribute.
exceptions¶
A few exception classes are defined by the library to indicate particular kinds of error.
-
exception
eventsourcing.exceptions.
ArrayIndexError
[source]¶ Bases:
IndexError
,eventsourcing.exceptions.EventSourcingError
Raised when appending item to an array that is full.
-
exception
eventsourcing.exceptions.
ConcurrencyError
[source]¶ Bases:
eventsourcing.exceptions.EventSourcingError
Raised when appending events at the wrong version to a versioned stream.
-
exception
eventsourcing.exceptions.
ConsistencyError
[source]¶ Bases:
eventsourcing.exceptions.EventSourcingError
Raised when applying an event stream to a versioned entity.
-
exception
eventsourcing.exceptions.
DataIntegrityError
[source]¶ Bases:
ValueError
,eventsourcing.exceptions.EventSourcingError
Raised when a sequenced item data is damaged (hash doesn’t match data)
-
exception
eventsourcing.exceptions.
DatasourceSettingsError
[source]¶ Bases:
eventsourcing.exceptions.EventSourcingError
Raised when an error is detected in settings for a datasource.
-
exception
eventsourcing.exceptions.
EntityIsDiscarded
[source]¶ Bases:
AssertionError
Raised when access to a recently discarded entity object is attempted.
-
exception
eventsourcing.exceptions.
EntityVersionNotFound
[source]¶ Bases:
eventsourcing.exceptions.EventSourcingError
Raise when accessing an entity version that does not exist.
-
exception
eventsourcing.exceptions.
EventHashError
[source]¶ Bases:
eventsourcing.exceptions.DataIntegrityError
Raised when an event’s seal hash doesn’t match the hash of the state of the event.
-
exception
eventsourcing.exceptions.
EventSourcingError
[source]¶ Bases:
Exception
Base eventsourcing exception.
-
exception
eventsourcing.exceptions.
HeadHashError
[source]¶ Bases:
eventsourcing.exceptions.DataIntegrityError
,eventsourcing.exceptions.MismatchedOriginatorError
Raised when applying an event with hash different from aggregate head.
-
exception
eventsourcing.exceptions.
MismatchedOriginatorError
[source]¶ Bases:
eventsourcing.exceptions.ConsistencyError
Raised when applying an event to an inappropriate object.
-
exception
eventsourcing.exceptions.
MutatorRequiresTypeNotInstance
[source]¶ Bases:
eventsourcing.exceptions.ConsistencyError
Raised when mutator function received a class rather than an entity.
-
exception
eventsourcing.exceptions.
OriginatorIDError
[source]¶ Bases:
eventsourcing.exceptions.MismatchedOriginatorError
Raised when applying an event to the wrong entity or aggregate.
-
exception
eventsourcing.exceptions.
OriginatorVersionError
[source]¶ Bases:
eventsourcing.exceptions.MismatchedOriginatorError
Raised when applying an event to the wrong version of an entity or aggregate.
-
exception
eventsourcing.exceptions.
ProgrammingError
[source]¶ Bases:
eventsourcing.exceptions.EventSourcingError
Raised when programming errors are encountered.
-
exception
eventsourcing.exceptions.
RepositoryKeyError
[source]¶ Bases:
KeyError
,eventsourcing.exceptions.EventSourcingError
Raised when using entity repository’s dictionary like interface to get an entity that does not exist.
-
exception
eventsourcing.exceptions.
SequencedItemConflict
[source]¶ Bases:
eventsourcing.exceptions.EventSourcingError
Raised when an integer sequence error occurs e.g. trying to save a version that already exists.
-
exception
eventsourcing.exceptions.
TimeSequenceError
[source]¶ Bases:
eventsourcing.exceptions.EventSourcingError
Raised when a time sequence error occurs e.g. trying to save a timestamp that already exists.
-
exception
eventsourcing.exceptions.
TopicResolutionError
[source]¶ Bases:
eventsourcing.exceptions.EventSourcingError
Raised when unable to resolve a topic to a Python class.
example¶
A simple, unit-tested, event sourced application.
application¶
-
class
eventsourcing.example.application.
ExampleApplication
(**kwargs)[source]¶ Bases:
eventsourcing.application.base.ApplicationWithPersistencePolicies
Example event sourced application with entity factory and repository.
-
eventsourcing.example.application.
close_example_application
()[source]¶ Shuts down single global instance of application.
To be called when tearing down, perhaps between tests, in order to allow a subsequent call to init_example_application().
-
eventsourcing.example.application.
construct_example_application
(**kwargs)[source]¶ Application object factory.
domainmodel¶
-
class
eventsourcing.example.domainmodel.
AbstractExampleRepository
[source]¶ Bases:
eventsourcing.domain.model.entity.AbstractEntityRepository
-
class
eventsourcing.example.domainmodel.
Example
(foo='', a='', b='', **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.TimestampedVersionedEntity
An example event sourced domain model entity.
-
class
AttributeChanged
(originator_version, **kwargs)[source]¶ Bases:
eventsourcing.example.domainmodel.Event
,eventsourcing.domain.model.entity.AttributeChanged
Published when an Example is created.
-
class
Created
(originator_version=0, **kwargs)[source]¶ Bases:
eventsourcing.example.domainmodel.Event
,eventsourcing.domain.model.entity.Created
Published when an Example is created.
-
class
Discarded
(originator_version, **kwargs)[source]¶ Bases:
eventsourcing.example.domainmodel.Event
,eventsourcing.domain.model.entity.Discarded
Published when an Example is discarded.
-
class
Event
(originator_version, **kwargs)[source]¶ Bases:
eventsourcing.domain.model.entity.Event
Supertype for events of example entities.
-
class
Heartbeat
(originator_version, **kwargs)[source]¶ Bases:
eventsourcing.example.domainmodel.Event
,eventsourcing.domain.model.entity.Event
Published when a heartbeat in the entity occurs (see below).
-
a
¶ An example attribute.
-
b
¶ Another example attribute.
-
foo
¶ An example attribute.
-
class
infrastructure¶
-
class
eventsourcing.example.infrastructure.
ExampleRepository
(event_store, snapshot_strategy=None, use_cache=False, mutator_func=None)[source]¶ Bases:
eventsourcing.infrastructure.eventsourcedrepository.EventSourcedRepository
,eventsourcing.example.domainmodel.AbstractExampleRepository
Event sourced repository for the Example domain model entity.
interface¶
-
class
eventsourcing.example.interface.flaskapp.
IntegerSequencedItem
(**kwargs)[source]¶ Bases:
flask_sqlalchemy.Model
-
__init__
(**kwargs)¶ A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs
.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
-
data
¶
-
id
¶
-
position
¶
-
sequence_id
¶
-
topic
¶
-