Unit Of Work using SQLAlchemy#

Now that we have a finally modelized our application, we can start storing data in a storage backend. For the example, we will use the latestest version of SQLAlchemy, which is SQLAlchemy 2 at the moment.

We will use sqlite in memory for testing purpose.

poetry add "sqlalchemy[mypy]" aiosqlite

Note

we install the mypy extentions here, even if we don’t check mypy tests in our example.

Create the sql schema#

Actually, we are not going to use SQLAlchemy ORM, because we are going to map our models directly, and because we don’t needs layers of abstraction, only the SQLAlchemy Core is enough for our use case.

The repository pattern will implement the traditional ORM part that you usually use, but as a primary goals, we don’t wan’t any database implementation details in our core domaine model, this is why we don’t use SQLAlchemy ORM at all.

We will create all this code in an adapters (see Adapter), and create a submodul uow_sqla

mkdir -p src/reading_club/adapters/uow_sqla
touch src/reading_club/adapters/__init__.py
touch src/reading_club/adapters/uow_sqla/__init__.py
touch src/reading_club/adapters/uow_sqla/orm.py
touch src/reading_club/adapters/uow_sqla/uow.py

Lets create our sql schema now.

In the reading_club.adapters.uow_sqla.orm module, we will create the database schema using the SQLAlchemy Core:

from sqlalchemy import JSON, Column, DateTime, Index, MetaData, String, Table, Uuid

metadata = MetaData()


books = Table(
    "books",
    metadata,
    Column("id", Uuid(as_uuid=False), nullable=False, primary_key=True),
    Column("title", String, nullable=False),
    Column("author", String, nullable=False),
    Column("isbn", String(20), nullable=False),
    Index("idx_books_isbn", "isbn", unique=True),
)


messages = Table(
    "messages",
    metadata,
    Column("id", Uuid(as_uuid=False), nullable=False, primary_key=True),
    Column("created_at", DateTime(), nullable=False),
    Column("metadata", JSON(), nullable=False),
    Column("payload", JSON(), nullable=False),
    Index("idx_messages_created_at", "created_at"),
)

We will see how we map that later, before continue, lets write some tests for our unit of work.

Note

I avoid the stub of reading_club.adapters.uow_sqla.uow for the moment, we will get the proper implementation after the tests in order to get this page shorter.

Testing the unit of work#

First, We are going to separate our sql tests from others tests, in order to get a conftest that override the tests/conftest.py but we also ensure to not use sql in the rest of the tests. Then, we have to implement the jeepito.AsyncAbstractUnitOfWork.commit() and jeepito.AsyncAbstractUnitOfWork.rollback() and we have to write transaction tests to ensure it works.

mkdir tests/uow_sqla
touch tests/uow_sqla/__init__.py
touch tests/uow_sqla/conftest.py
touch tests/uow_sqla/test_transaction.py

Now, lets write the tests in uow_sqla/test_transaction.py

import uuid

from reading_club.adapters.uow_sqla.uow import SQLUnitOfWork
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession

from jeepito import AsyncAbstractEventstreamTransport


async def test_commit(
    sqla_engine: AsyncEngine,
    sqla_session: AsyncSession,
    transport: AsyncAbstractEventstreamTransport,
):
    book_id = str(uuid.uuid4())
    uow = SQLUnitOfWork(transport, sqla_engine)
    async with uow as transaction:
        await uow.session.execute(
            text(
                "insert into books(id, title, author, isbn)"
                "values (:id, :title, :author, :isbn)"
            ),
            {
                "id": book_id,
                "title": "Domain Driven Design",
                "author": "Eric Evans",
                "isbn": "0-321-12521-5",
            },
        )
        await transaction.commit()

    row = (
        await sqla_session.execute(
            text("select count(*) from books where id = :id"), {"id": book_id}
        )
    ).first()
    assert row is not None
    assert row[0] == 1


async def test_rollback(
    sqla_engine: AsyncEngine,
    sqla_session: AsyncSession,
    transport: AsyncAbstractEventstreamTransport,
):
    book_id = str(uuid.uuid4())
    uow = SQLUnitOfWork(transport, sqla_engine)
    async with uow as transaction:
        await transaction.session.execute(
            text(
                "insert into books(id, title, author, isbn)"
                "values (:id, :title, :author, :isbn)"
            ),
            {
                "id": book_id,
                "title": "Domain Driven Design",
                "author": "Eric Evans",
                "isbn": "0-321-12521-5",
            },
        )
        await transaction.rollback()

    row = (
        await sqla_session.execute(
            text("select count(*) from books where id = :id"), {"id": book_id}
        )
    ).first()
    assert row is not None
    assert row[0] == 0

Both tests are really similar, they insert a book, using the uow sql connection, in the books table, when we commit, we ensure the book is stored, when we rollback, we ensure the book is not present.

The tests needs some sql fixture we can already provide in uow_sqla/conftest.py

from typing import AsyncIterator

import pytest
from reading_club.adapters.uow_sqla import orm
from sqlalchemy.ext.asyncio import (
    AsyncEngine,
    AsyncSession,
    async_sessionmaker,
    create_async_engine,
)

DATABASE_URL = "sqlite+aiosqlite:///"


@pytest.fixture
def bared_sqla_engine() -> AsyncEngine:
    engine = create_async_engine(DATABASE_URL, future=True, echo=False)
    return engine


@pytest.fixture
async def sqla_engine(
    bared_sqla_engine: AsyncEngine,
) -> AsyncIterator[AsyncEngine]:
    async with bared_sqla_engine.begin() as conn:
        await conn.run_sync(orm.metadata.create_all)

    yield bared_sqla_engine

    async with bared_sqla_engine.begin() as conn:
        await conn.run_sync(orm.metadata.drop_all)


@pytest.fixture
def sqla_session(sqla_engine: AsyncEngine) -> AsyncSession:
    async_session = async_sessionmaker(sqla_engine, class_=AsyncSession)
    return async_session()

we have an engine to bind an in memory database containing our schema, and that can be passed to the Unit Of Work, and a session, to retrieve data from the database, used in the tests expectation.

We have all we need, it’s time to start our implementation.

Create the sql unit of work#

In the reading_club.adapters.uow_sqla.uow module, we will start writing our unit of work.

from types import TracebackType
from typing import Optional, Type

from reading_club.domain.model import Book
from reading_club.service.repositories import (
    AbstractBookRepository,
    BookRepositoryOperationResult,
    BookRepositoryResult,
)
from reading_club.service.uow import AbstractUnitOfWork
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker

from jeepito import (
    AsyncAbstractEventstreamTransport,
    AsyncEventstoreAbstractRepository,
    AsyncEventstreamPublisher,
    AsyncUnitOfWorkTransaction,
    Message,
)


class SQLEventstoreRepository(AsyncEventstoreAbstractRepository):
    def __init__(self, session: AsyncSession, publisher: AsyncEventstreamPublisher):
        super().__init__(publisher)
        self.session = session

    async def _add(self, message: Message) -> None:
        raise NotImplementedError


class SQLBookRepository(AbstractBookRepository):
    def __init__(self, session: AsyncSession):
        super().__init__()
        self.session = session

    async def add(self, model: Book) -> BookRepositoryOperationResult:
        raise NotImplementedError

    async def by_id(self, id: str) -> BookRepositoryResult:
        raise NotImplementedError


class SQLUnitOfWork(AbstractUnitOfWork):
    session: AsyncSession

    def __init__(
        self,
        transport: AsyncAbstractEventstreamTransport,
        sqla_engine: AsyncEngine,
    ):
        super().__init__()
        self.sqla_engine = sqla_engine
        self.session_factory = async_sessionmaker(self.sqla_engine, class_=AsyncSession)
        self.transport = transport

    async def __aenter__(self) -> AsyncUnitOfWorkTransaction:
        self.messages = []
        self.session = self.session_factory()
        self.eventstore = SQLEventstoreRepository(
            self.session,
            AsyncEventstreamPublisher(self.transport),
        )
        self.books = SQLBookRepository(self.session)
        ret = await super().__aenter__()
        return ret

    async def __aexit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc: Optional[BaseException],
        tb: Optional[TracebackType],
    ) -> None:
        try:
            await super().__aexit__(exc_type, exc, tb)
        finally:
            await self.session.close()

    async def commit(self):
        await self.session.commit()

    async def rollback(self):
        await self.session.rollback()

We have implement our unit of work and declare all our repositories without implementing them. We explicitly raise NotImplementedError to get our repositories instanciable.

If we run our tests now:

$ poetry run pytest -sxv
========================== test session starts ==========================
collected 4 items

tests/test_service_handler_add_book.py::test_register_book PASSED
tests/test_service_handler_add_book.py::test_bus_handler PASSED
tests/uow_sqla/test_transaction.py::test_commit PASSED
tests/uow_sqla/test_transaction.py::test_rollback PASSED
=========================== 4 passed in 0.04s ===========================

Implement the book repository#

We can start with a couple of tests for the Ok and the Error cases in a test_repositories.py file.

from reading_club.adapters.uow_sqla import orm
from reading_club.adapters.uow_sqla.uow import SQLUnitOfWork
from reading_club.domain.model import Book
from reading_club.service.repositories import BookRepositoryError
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession


async def test_book_add_ok(uow: SQLUnitOfWork, book: Book, sqla_session: AsyncSession):
    async with uow as transaction:
        res = await uow.books.add(book)
        await transaction.commit()

    assert res.is_ok()

    row = (
        await sqla_session.execute(select(orm.books).where(orm.books.c.id == book.id))
    ).first()
    assert row is not None
    assert row._asdict() == book.model_dump(exclude={"messages"})

    # ensure the message bus can follow the book messages
    assert uow.books.seen == [book]


async def test_book_add_err(uow: SQLUnitOfWork, book: Book):
    # Add a book in the repository
    async with uow as transaction:
        res = await uow.books.add(book)
        assert res.is_ok()
        await transaction.commit()
    uow.books.seen.clear()

    # Now, tests that it wrap the error
    async with uow as transaction:
        res = await uow.books.add(book)
        assert res.is_err()
        assert res.unwrap_err() == BookRepositoryError.integrity_error
        await transaction.rollback()

    # Since it does not work, the bus can't see the book messages.
    assert uow.books.seen == []

We can see that those tests expect that: * the add method will return Ok(…) if its works * the stored book saved correspont to what the model contains * the seen attribute, is set to let the message bus consume the book messages. * integrity error does not raise but are stored in a Err(). * the seen attribute does not contains models that can’t be stored in the repository.

We also see that those new fixtures are required in our uow_sqla/conftest.py:

import uuid

import pytest
from reading_club.adapters.uow_sqla.uow import SQLUnitOfWork
from reading_club.domain.model import Book
from sqlalchemy.ext.asyncio import AsyncEngine

from jeepito import AsyncAbstractEventstreamTransport


@pytest.fixture
def uow(
    transport: AsyncAbstractEventstreamTransport, sqla_engine: AsyncEngine
) -> SQLUnitOfWork:
    return SQLUnitOfWork(transport, sqla_engine)


@pytest.fixture
def book():
    return Book(
        id=str(uuid.uuid4()),
        title="Domain Driven Design",
        author="Eric Evans",
        isbn="0-321-12521-5",
    )

Finally the add method implemented using SQLAlchemy

from reading_club.domain.model import Book
from reading_club.service.repositories import (
    AbstractBookRepository,
    BookRepositoryError,
    BookRepositoryOperationResult,
    BookRepositoryResult,
)
from result import Err, Ok
from sqlalchemy import insert
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession

from . import orm


class SQLBookRepository(AbstractBookRepository):
    def __init__(self, session: AsyncSession):
        super().__init__()
        self.session = session

    async def add(self, model: Book) -> BookRepositoryOperationResult:
        qry = insert(orm.books).values([model.model_dump(exclude={"messages"})])
        try:
            await self.session.execute(qry)
        except IntegrityError:
            return Err(BookRepositoryError.integrity_error)

        self.seen.append(model)
        return Ok(...)

    async def by_id(self, id: str) -> BookRepositoryResult:
        raise NotImplementedError

The tests suite should pass.

$ poetry run pytest -sxv
========================== test session starts ==========================
collected 6 items

tests/test_service_handler_add_book.py::test_register_book PASSED
tests/test_service_handler_add_book.py::test_bus_handler PASSED
tests/uow_sqla/test_repositories.py::test_book_add_ok PASSED
tests/uow_sqla/test_repositories.py::test_book_add_err PASSED
tests/uow_sqla/test_transaction.py::test_commit PASSED
tests/uow_sqla/test_transaction.py::test_rollback PASSED
=========================== 4 passed in 0.04s ===========================

Lets continue with the by_id implementation.

here is our test

from reading_club.adapters.uow_sqla.uow import SQLUnitOfWork
from reading_club.domain.model import Book
from reading_club.service.repositories import BookRepositoryError


async def test_book_by_id_ok(uow: SQLUnitOfWork, book: Book):
    # Add a book in the repository
    async with uow as transaction:
        res = await uow.books.add(book)
        assert res.is_ok()
        await transaction.commit()
    uow.books.seen.clear()

    # Now, tests that the book is here
    async with uow as transaction:
        res = await uow.books.by_id(book.id)
        assert res.is_ok()
        book_from_uow = res.unwrap()
        await transaction.rollback()

    assert book_from_uow == book


async def test_book_by_id_err(uow: SQLUnitOfWork, book: Book):
    # Now, tests that the book is here
    async with uow as transaction:
        res = await uow.books.by_id(book.id)
        assert res.is_err()
        err = res.unwrap_err()
        await transaction.rollback()

    assert err == BookRepositoryError.not_found

..note:

you can see that our tests are a bit ugly, the initialization of the tests
is made inside the tests not in our fixtures.
Don't be afraid, we will improve that in the next chapter.

And our implmenetation

from reading_club.domain.model import Book
from reading_club.service.repositories import (
    AbstractBookRepository,
    BookRepositoryError,
    BookRepositoryOperationResult,
    BookRepositoryResult,
)
from result import Err, Ok
from sqlalchemy import insert, select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession

from . import orm


class SQLBookRepository(AbstractBookRepository):
    def __init__(self, session: AsyncSession):
        super().__init__()
        self.session = session

    async def add(self, model: Book) -> BookRepositoryOperationResult:
        qry = insert(orm.books).values([model.model_dump(exclude={"messages"})])
        try:
            await self.session.execute(qry)
        except IntegrityError:
            return Err(BookRepositoryError.integrity_error)

        self.seen.append(model)
        return Ok(...)

    async def by_id(self, id: str) -> BookRepositoryResult:
        qry = select(orm.books).where(orm.books.c.id == id)
        row = (await self.session.execute(qry)).first()
        if not row:
            return Err(BookRepositoryError.not_found)
        book = Book(**row._asdict())
        return Ok(book)

Implement the event repository#

Before implementing the BookRepository.by_id we will take the time to implement our event repository in order to get our bus working, which will be usefull to create books using the message bus directly in our fixtures.

our new tests in``test_repositories.py``:

import uuid

from reading_club.adapters.uow_sqla import orm
from reading_club.adapters.uow_sqla.uow import SQLUnitOfWork
from reading_club.domain.messages import RegisterBook
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession


async def test_eventstore_add(
    uow: SQLUnitOfWork, register_book_cmd: RegisterBook, sqla_session: AsyncSession
):
    register_book_cmd.id = str(uuid.uuid4())
    async with uow as transaction:
        await uow.eventstore.add(register_book_cmd)
        await transaction.commit()

    row = (
        await sqla_session.execute(
            select(orm.messages).where(
                orm.messages.c.id == register_book_cmd.message_id
            )
        )
    ).first()
    assert row is not None
    assert row.id == register_book_cmd.message_id
    assert row.created_at == register_book_cmd.created_at
    assert row.metadata == register_book_cmd.metadata.model_dump()
    assert row.payload == {
        "id": register_book_cmd.id,
        "author": "Eric Evans",
        "isbn": "0-321-12521-5",
        "title": "Domain Driven Design",
    }

And our implementation.

from sqlalchemy import insert
from sqlalchemy.ext.asyncio import AsyncSession

from jeepito import (
    AsyncEventstoreAbstractRepository,
    AsyncEventstreamPublisher,
    Message,
)

from . import orm


class SQLEventstoreRepository(AsyncEventstoreAbstractRepository):
    def __init__(self, session: AsyncSession, publisher: AsyncEventstreamPublisher):
        super().__init__(publisher)
        self.session = session

    async def _add(self, message: Message) -> None:
        qry = insert(orm.messages).values(
            [
                {
                    "id": message.message_id,
                    "created_at": message.created_at,
                    "metadata": message.metadata.model_dump(),
                    "payload": message.model_dump(
                        exclude={"message_id", "created_at", "metadata"}
                    ),
                }
            ]
        )
        await self.session.execute(qry)

There is no much to say here, it take the message and store in in the table. Because the jeepito does not rely on results, it does not return a Result object, our implementation raise exceptions if it does not works.

Before closing this chapter, lets run our tests and conclude

$ poetry run pytest -sxv
========================== test session starts ==========================
collected 9 items

tests/test_service_handler_add_book.py::test_register_book PASSED
tests/test_service_handler_add_book.py::test_bus_handler PASSED
tests/uow_sqla/test_repositories.py::test_book_add_ok PASSED
tests/uow_sqla/test_repositories.py::test_book_add_err PASSED
tests/uow_sqla/test_repositories.py::test_book_by_id_ok PASSED
tests/uow_sqla/test_repositories.py::test_book_by_id_err PASSED
tests/uow_sqla/test_repositories.py::test_eventstore_add PASSED
tests/uow_sqla/test_transaction.py::test_commit PASSED
tests/uow_sqla/test_transaction.py::test_rollback PASSED
=========================== 9 passed in 0.43s ===========================

At the moment, our book review model contains the book registration, with commands and events used by the jeepito.

But, we have some tests that are not clean, the test_book_add_err that initialize its tests inside them, which will not scale, and more for the test_book_by_id_ok, we retrieve a book, but we only have one book here, so, we cannot be sure that it is the proper book that could be retrieve in real life.

This is the subject of the next chapter.