Event stream

When a command has been processed by a service hook, then, an event can be sent to a stream of event in order to get some pub/sub synchronization, and also to get an obvervable architecture.

This is particulary usefull in a microservice architecture.

First, we have to define an event format.

In our example, when a book has been registered, we can raises the following event.

from uuid import UUID

from messagebus import Event, Field, Metadata


class BookRegistered(Event):
    id: UUID = Field(...)
    isbn: str = Field(...)
    title: str = Field(...)
    author: str = Field(...)
    metadata: Metadata = Metadata(
        name="register_book",
        schema_version=1,
        published=True,
    )

The event is a message, and, we can append the message of the book during the registration hook. The unit of work will process the message and send it to the event stream because the flagged published is set to True.

Note

Commands and Events can be published. It is preferable to publish all events and avoid the publication of commands. The reason is that when a message is public, then it has to be maintained and avoid any breaking changes. By the way, when a new version of a message is added, then, the service that rely on it has to be updated too. And preferably before coming to production. This is why commands may bot be published, and processed using the message bus.

We can update our unit test that our message are sent to a transport backend, represented by the fixture transport.

from lastuuid.dummies import uuidgen
from reading_club.domain.messages import RegisterBook
from reading_club.domain.model import Book
from reading_club.service.uow import AbstractUnitOfWork

from messagebus import AsyncMessageBus
from tests.conftest import EventstreamTransport


async def test_bus_handler(
    bus: AsyncMessageBus,
    register_book_cmd: RegisterBook,
    uow: AbstractUnitOfWork,
    transport: EventstreamTransport,
):
    async with uow as transaction:
        await bus.handle(register_book_cmd, transaction)
        book = await transaction.books.by_id(register_book_cmd.id)
        assert book.is_ok()
        assert book.unwrap() == Book(
            id=uuidgen(1),
            title="Domain Driven Design",
            author="Eric Evans",
            isbn="0-321-12521-5",
        )
        assert book.unwrap().messages == []
        await transaction.commit()

    assert transport.events == [
        {
            "id": transport.events[0]["id"],
            "created_at": transport.events[0]["created_at"],
            "payload": '{"id": "x", "isbn": "0-321-12521-5", "title": "Domain Driven '
            'Design", "author": "Eric Evans"}',
            "type": "register_book_v1",
        },
    ]

At that moment, have to implement the transport in our conftest.py file

from collections.abc import Iterator, Mapping, MutableSequence
from typing import Any, ClassVar
from uuid import UUID

import pytest
from lastuuid.dummies import uuidgen
from reading_club.domain.messages import RegisterBook
from reading_club.domain.model import Book
from reading_club.service.repositories import (
    AbstractBookRepository,
    BookRepositoryError,
    BookRepositoryOperationResult,
    BookRepositoryResult,
)
from reading_club.service.uow import AbstractUnitOfWork
from result import Err, Ok

from messagebus import (
    AsyncAbstractEventstreamTransport,
    AsyncEventstreamPublisher,
    AsyncMessageBus,
    AsyncSinkholeEventstoreRepository,
)


class EventstreamTransport(AsyncAbstractEventstreamTransport):
    """
    Transport a message to the event stream.
    """

    events: MutableSequence[Mapping[str, Any]]

    def __init__(self) -> None:
        self.events = []

    async def send_message_serialized(self, message: Mapping[str, Any]) -> None:
        """Publish a serialized message to the eventstream."""
        self.events.append(message)


class InMemoryBookRepository(AbstractBookRepository):
    books: ClassVar[dict[UUID, Book]] = {}
    ix_books_isbn: ClassVar[dict[str, UUID]] = {}

    async def add(self, model: Book) -> BookRepositoryOperationResult:
        if model.id in self.books:
            return Err(BookRepositoryError.integrity_error)
        if model.isbn in self.ix_books_isbn:
            return Err(BookRepositoryError.integrity_error)
        self.books[model.id] = model
        self.ix_books_isbn[model.isbn] = model.id
        self.seen.append(model)
        return Ok(...)

    async def by_id(self, id: UUID) -> BookRepositoryResult:
        if id not in self.books:
            return Err(BookRepositoryError.not_found)
        return Ok(self.books[id])


class InMemoryUnitOfWork(AbstractUnitOfWork):
    def __init__(self, transport: AsyncAbstractEventstreamTransport):
        self.books = InMemoryBookRepository()
        self.eventstore = AsyncSinkholeEventstoreRepository(
            publisher=AsyncEventstreamPublisher(transport)
        )

    async def commit(self) -> None: ...

    async def rollback(self) -> None: ...


@pytest.fixture
def register_book_cmd():
    return RegisterBook(
        id=uuidgen(),
        title="Domain Driven Design",
        author="Eric Evans",
        isbn="0-321-12521-5",
    )


@pytest.fixture
def transport() -> AsyncAbstractEventstreamTransport:
    return EventstreamTransport()


@pytest.fixture
def uow(transport: AsyncAbstractEventstreamTransport) -> Iterator[InMemoryUnitOfWork]:
    uow = InMemoryUnitOfWork(transport)
    yield uow
    uow.books.books.clear()  # type: ignore
    uow.books.ix_books_isbn.clear()  # type: ignore


# for performance reason, we reuse the bus here,
# the scan operation is slowing down while repeated
_bus = AsyncMessageBus()
_bus.scan("reading_club.service.handlers")


@pytest.fixture
def bus() -> AsyncMessageBus:
    return _bus

First, we create an Eventstream Transport that store events in a list, and expose it as a fixture. The transport is also configured to override the eventstore property of the unit of work. We reuse the AsyncSinkholeEventstoreRepository repository. which means that we don’t store the events locally, but, we set up our transport, having the effect of sending published events to the eventstream.

Now lets update the code of the service handler to raise the event:

from reading_club.domain.messages import BookRegistered, RegisterBook
from reading_club.domain.model import Book
from reading_club.service.repositories import BookRepositoryOperationResult
from reading_club.service.uow import AbstractUnitOfWork

from messagebus import async_listen


@async_listen
async def register_book(
    cmd: RegisterBook, uow: AbstractUnitOfWork
) -> BookRepositoryOperationResult:
    book = Book(id=cmd.id, title=cmd.title, author=cmd.author, isbn=cmd.isbn)
    op = await uow.books.add(book)
    book.messages.append(
        BookRegistered(id=cmd.id, title=cmd.title, author=cmd.author, isbn=cmd.isbn)
    )
    return op

Note that we add the message to be processed by the message bus in the service handler, not in a repository. In the real world, we have multiple implementation of our unit of work and this code can’t be added anytime we create a repository implementation. The messaging part for the bus are handled by the service layers.

Lastly, note that our initial test test_register_book can also be updated to test that the unit of work will received the message before processing it. This tests directly call the message bus handler and bypass the bus.

from lastuuid.dummies import uuidgen
from reading_club.domain.messages import BookRegistered, RegisterBook
from reading_club.domain.model import Book
from reading_club.service.handlers.book import register_book
from reading_club.service.repositories import BookRepositoryError
from reading_club.service.uow import AbstractUnitOfWork


async def test_register_book(register_book_cmd: RegisterBook, uow: AbstractUnitOfWork):
    async with uow as transaction:
        operation = await register_book(register_book_cmd, transaction)
        assert operation is not None
        assert operation.is_ok()
        book = await transaction.books.by_id(register_book_cmd.id)
        assert book.is_ok()
        assert book.unwrap() == Book(
            id=uuidgen(1),
            title="Domain Driven Design",
            author="Eric Evans",
            isbn="0-321-12521-5",
        )
        assert book.unwrap().messages == [
            BookRegistered(
                id=uuidgen(1),
                isbn="0-321-12521-5",
                title="Domain Driven Design",
                author="Eric Evans",
            )
        ]
        await transaction.commit()

    async with uow as transaction:
        operation = await register_book(register_book_cmd, transaction)
        assert operation is not None
        assert operation.is_err()
        assert operation.unwrap_err() == BookRepositoryError.integrity_error
        await transaction.rollback()