Local event store

Usually, an event store centralize all the message published by many services.

An eventstore has a backend that subscribe all services eventstream and store them in a database in order to replay them. The local event store, I don’t know if the event source world has a better name for it, is all the message that the bus handle. event the non published flagged ones.

The message bus can store them in an event repository, usually a sql table in a sql based repository.

For the moment, we will replace the default repository ( messagebus.AsyncSinkholeEventstoreRepository in previous chapter) and write our own one that store them in memory.

An EventstoreRepository is a repository for all the local events, its override the messagebus.AsyncEventstoreAbstractRepository. Only the abstract method messagebus.AsyncEventstoreAbstractRepository._add() needs to be implemented.

Lets just add this in our conftest.py file in order to get an eventstore.

from collections.abc import MutableSequence
from typing import Any, ClassVar

from messagebus import AsyncEventstoreAbstractRepository, Message


class InMemoryEventstoreRepository(AsyncEventstoreAbstractRepository):
    messages: ClassVar[MutableSequence[Message[Any]]] = []

    async def _add(self, message: Message[Any]) -> None:
        self.messages.append(message)

Now we can update our Unit Of Work in order to use our eventstore implementation.

from collections.abc import Iterator, MutableSequence
from typing import Any, ClassVar
from uuid import UUID

import pytest
from reading_club.domain.model import Book
from reading_club.service.repositories import (
    AbstractBookRepository,
    AsyncEventstoreAbstractRepository,
    BookRepositoryError,
    BookRepositoryOperationResult,
    BookRepositoryResult,
)
from reading_club.service.uow import AbstractUnitOfWork
from result import Err, Ok

from messagebus import (
    AsyncAbstractEventstreamTransport,
    AsyncEventstreamPublisher,
    Message,
)


class InMemoryEventstoreRepository(AsyncEventstoreAbstractRepository):
    messages: ClassVar[MutableSequence[Message[Any]]] = []

    async def _add(self, message: Message[Any]) -> None:
        self.messages.append(message)


class InMemoryBookRepository(AbstractBookRepository):
    books: ClassVar[dict[UUID, Book]] = {}
    ix_books_isbn: ClassVar[dict[str, UUID]] = {}

    async def add(self, model: Book) -> BookRepositoryOperationResult:
        if model.id in self.books:
            return Err(BookRepositoryError.integrity_error)
        if model.isbn in self.ix_books_isbn:
            return Err(BookRepositoryError.integrity_error)
        self.books[model.id] = model
        self.ix_books_isbn[model.isbn] = model.id
        self.seen.append(model)
        return Ok(...)

    async def by_id(self, id: UUID) -> BookRepositoryResult:
        if id not in self.books:
            return Err(BookRepositoryError.not_found)
        return Ok(self.books[id])


class InMemoryUnitOfWork(AbstractUnitOfWork):
    def __init__(self, transport: AsyncAbstractEventstreamTransport):
        self.books = InMemoryBookRepository()
        self.eventstore = InMemoryEventstoreRepository(
            publisher=AsyncEventstreamPublisher(transport)
        )

    async def commit(self) -> None: ...

    async def rollback(self) -> None: ...


@pytest.fixture
def uow(transport: AsyncAbstractEventstreamTransport) -> Iterator[InMemoryUnitOfWork]:
    uow = InMemoryUnitOfWork(transport)
    yield uow
    uow.books.books.clear()  # type: ignore
    uow.books.ix_books_isbn.clear()  # type: ignore
    uow.eventstore.messages.clear()  # type: ignore

Finally, we can update the tests to ensure that the message is stored.

from lastuuid.dummies import uuidgen
from reading_club.domain.messages import BookRegistered, RegisterBook
from reading_club.domain.model import Book
from reading_club.service.uow import AbstractUnitOfWork

from messagebus import AsyncMessageBus
from tests.conftest import EventstreamTransport


async def test_bus_handler(
    bus: AsyncMessageBus,
    register_book_cmd: RegisterBook,
    uow: AbstractUnitOfWork,
    transport: EventstreamTransport,
):
    async with uow as transaction:
        await bus.handle(register_book_cmd, transaction)
        book = await transaction.books.by_id(register_book_cmd.id)
        assert book.is_ok()
        assert book.unwrap() == Book(
            id=uuidgen(1),
            title="Domain Driven Design",
            author="Eric Evans",
            isbn="0-321-12521-5",
        )
        assert book.unwrap().messages == []
        await transaction.commit()

    assert uow.eventstore.messages == [  # type: ignore
        RegisterBook(
            id=uuidgen(1),
            isbn="0-321-12521-5",
            title="Domain Driven Design",
            author="Eric Evans",
        ),
        BookRegistered(
            id=uuidgen(1),
            isbn="0-321-12521-5",
            title="Domain Driven Design",
            author="Eric Evans",
        ),
    ]
    assert transport.events == [
        {
            "id": transport.events[0]["id"],
            "created_at": transport.events[0]["created_at"],
            "payload": f'{{"id": "{uuidgen(1)}", "isbn": "0-321-12521-5", '
            '"title": "Domain Driven '
            'Design", "author": "Eric Evans"}',
            "type": "register_book_v1",
        },
    ]

Note that there is now way to retrieve message from a messagebus.AsyncEventstoreAbstractRepository. The repository is made to be a write only interface. This is why, while testing, we add a # type: ignore by reading from our implementation detail.

Running the tests show that the eventstore is filled out by the bus.

$ poetry run pytest -sxv
...
collected 2 items

tests/test_service_handler_add_book.py::test_register_book PASSED
tests/test_service_handler_add_book.py::test_bus_handler PASSED

Important

In the real world, we don’t tests that a InMemoryUnitOfWork keep messages, it has been done here has an example. The messagebus is responsible of that part, nothing more.

By the way, what has to be is the real EventstoreRepository._add method that received all kind of messages.

All the basics of the messagebus has been introduced, so, for now, we will create a sql implementation of our repository to get a real storage backend example.