Unit Of Work using SQLAlchemy¶
Now that we have a finally modelized our application, we can start storing data in a storage backend. For the example, we will use the latestest version of SQLAlchemy, which is SQLAlchemy 2 at the moment.
We will use sqlite in memory for testing purpose.
poetry add "sqlalchemy[mypy]" aiosqlite
Note
we install the mypy extentions here, even if we don’t check mypy tests in our example.
Create the sql schema¶
Actually, we are not going to use SQLAlchemy ORM, because we are going to map our models directly, and because we don’t needs layers of abstraction, only the SQLAlchemy Core is enough for our use case.
The repository pattern will implement the traditional ORM part that you usually use, but as a primary goals, we don’t wan’t any database implementation details in our core domaine model, this is why we don’t use SQLAlchemy ORM at all.
We will create all this code in an adapters
(see Adapter),
and create a submodul uow_sqla
mkdir -p src/reading_club/adapters/uow_sqla
touch src/reading_club/adapters/__init__.py
touch src/reading_club/adapters/uow_sqla/__init__.py
touch src/reading_club/adapters/uow_sqla/orm.py
touch src/reading_club/adapters/uow_sqla/uow.py
Lets create our sql schema now.
In the reading_club.adapters.uow_sqla.orm
module, we will create the database
schema using the SQLAlchemy Core:
from sqlalchemy import JSON, Column, DateTime, Index, MetaData, String, Table, Uuid
metadata = MetaData()
books = Table(
"books",
metadata,
Column("id", Uuid(), nullable=False, primary_key=True),
Column("title", String, nullable=False),
Column("author", String, nullable=False),
Column("isbn", String(20), nullable=False),
Index("idx_books_isbn", "isbn", unique=True),
)
messages = Table(
"messages",
metadata,
Column("id", Uuid(), nullable=False, primary_key=True),
Column("created_at", DateTime(), nullable=False),
Column("metadata", JSON(), nullable=False),
Column("payload", JSON(), nullable=False),
Index("idx_messages_created_at", "created_at"),
)
We will see how we map that later, before continue, lets write some tests for our unit of work.
Note
I avoid the stub of reading_club.adapters.uow_sqla.uow
for the moment,
we will get the proper implementation after the tests in order to get this
page shorter.
Testing the unit of work¶
First, We are going to separate our sql tests from others tests,
in order to get a conftest that override the tests/conftest.py
but we also ensure to not use sql in the rest of the tests.
Then, we have to implement the messagebus.AsyncAbstractUnitOfWork.commit()
and messagebus.AsyncAbstractUnitOfWork.rollback()
and we have to write transaction
tests to ensure it works.
mkdir tests/uow_sqla
touch tests/uow_sqla/__init__.py
touch tests/uow_sqla/conftest.py
touch tests/uow_sqla/test_transaction.py
Now, lets write the tests in uow_sqla/test_transaction.py
from lastuuid.dummies import uuidgen
from reading_club.adapters.uow_sqla.uow import SQLUnitOfWork
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession
from messagebus import AsyncAbstractEventstreamTransport
async def test_commit(
sqla_engine: AsyncEngine,
sqla_session: AsyncSession,
transport: AsyncAbstractEventstreamTransport,
):
book_id = uuidgen()
uow = SQLUnitOfWork(transport, sqla_engine)
async with uow as transaction:
await uow.session.execute(
text(
"insert into books(id, title, author, isbn)"
"values (:id, :title, :author, :isbn)"
),
{
"id": book_id,
"title": "Domain Driven Design",
"author": "Eric Evans",
"isbn": "0-321-12521-5",
},
)
await transaction.commit()
row = (
await sqla_session.execute(
text("select count(*) from books where id = :id"), {"id": book_id}
)
).first()
assert row is not None
assert row[0] == 1
async def test_rollback(
sqla_engine: AsyncEngine,
sqla_session: AsyncSession,
transport: AsyncAbstractEventstreamTransport,
):
book_id = uuidgen()
uow = SQLUnitOfWork(transport, sqla_engine)
async with uow as transaction:
await transaction.session.execute(
text(
"insert into books(id, title, author, isbn)"
"values (:id, :title, :author, :isbn)"
),
{
"id": book_id,
"title": "Domain Driven Design",
"author": "Eric Evans",
"isbn": "0-321-12521-5",
},
)
await transaction.rollback()
row = (
await sqla_session.execute(
text("select count(*) from books where id = :id"), {"id": book_id}
)
).first()
assert row is not None
assert row[0] == 0
Both tests are really similar, they insert a book, using the uow sql connection, in the books table, when we commit, we ensure the book is stored, when we rollback, we ensure the book is not present.
The tests needs some sql fixture we can already provide in uow_sqla/conftest.py
from collections.abc import AsyncIterator
import pytest
from reading_club.adapters.uow_sqla import orm
from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
async_sessionmaker,
create_async_engine,
)
DATABASE_URL = "sqlite+aiosqlite:///"
@pytest.fixture
def bared_sqla_engine() -> AsyncEngine:
engine = create_async_engine(DATABASE_URL, future=True, echo=False)
return engine
@pytest.fixture
async def sqla_engine(
bared_sqla_engine: AsyncEngine,
) -> AsyncIterator[AsyncEngine]:
async with bared_sqla_engine.begin() as conn:
await conn.run_sync(orm.metadata.create_all)
yield bared_sqla_engine
async with bared_sqla_engine.begin() as conn:
await conn.run_sync(orm.metadata.drop_all)
@pytest.fixture
def sqla_session(sqla_engine: AsyncEngine) -> AsyncSession:
async_session = async_sessionmaker(sqla_engine, class_=AsyncSession)
return async_session()
we have an engine to bind an in memory database containing our schema, and that can be passed to the Unit Of Work, and a session, to retrieve data from the database, used in the tests expectation.
We have all we need, it’s time to start our implementation.
Create the sql unit of work¶
In the reading_club.adapters.uow_sqla.uow
module, we will start writing
our unit of work.
from types import TracebackType
from reading_club.domain.model import Book
from reading_club.service.repositories import (
AbstractBookRepository,
BookRepositoryOperationResult,
BookRepositoryResult,
)
from reading_club.service.uow import AbstractUnitOfWork
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker
from messagebus import (
AsyncAbstractEventstreamTransport,
AsyncEventstoreAbstractRepository,
AsyncEventstreamPublisher,
AsyncUnitOfWorkTransaction,
Message,
)
class SQLEventstoreRepository(AsyncEventstoreAbstractRepository):
def __init__(self, session: AsyncSession, publisher: AsyncEventstreamPublisher):
super().__init__(publisher)
self.session = session
async def _add(self, message: Message) -> None:
raise NotImplementedError
class SQLBookRepository(AbstractBookRepository):
def __init__(self, session: AsyncSession):
super().__init__()
self.session = session
async def add(self, model: Book) -> BookRepositoryOperationResult:
raise NotImplementedError
async def by_id(self, id: str) -> BookRepositoryResult:
raise NotImplementedError
class SQLUnitOfWork(AbstractUnitOfWork):
session: AsyncSession
def __init__(
self,
transport: AsyncAbstractEventstreamTransport,
sqla_engine: AsyncEngine,
):
super().__init__()
self.sqla_engine = sqla_engine
self.session_factory = async_sessionmaker(self.sqla_engine, class_=AsyncSession)
self.transport = transport
async def __aenter__(self) -> AsyncUnitOfWorkTransaction:
self.messages = []
self.session = self.session_factory()
self.eventstore = SQLEventstoreRepository(
self.session,
AsyncEventstreamPublisher(self.transport),
)
self.books = SQLBookRepository(self.session)
ret = await super().__aenter__()
return ret
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: TracebackType | None,
) -> None:
try:
await super().__aexit__(exc_type, exc, tb)
finally:
await self.session.close()
async def commit(self):
await self.session.commit()
async def rollback(self):
await self.session.rollback()
We have implement our unit of work and declare all our repositories without implementing them. We explicitly raise NotImplementedError to get our repositories instanciable.
If we run our tests now:
$ poetry run pytest -sxv
========================== test session starts ==========================
collected 4 items
tests/test_service_handler_add_book.py::test_register_book PASSED
tests/test_service_handler_add_book.py::test_bus_handler PASSED
tests/uow_sqla/test_transaction.py::test_commit PASSED
tests/uow_sqla/test_transaction.py::test_rollback PASSED
=========================== 4 passed in 0.04s ===========================
Implement the book repository¶
We can start with a couple of tests for the Ok and the Error cases in a
test_repositories.py
file.
from reading_club.adapters.uow_sqla import orm
from reading_club.adapters.uow_sqla.uow import SQLUnitOfWork
from reading_club.domain.model import Book
from reading_club.service.repositories import BookRepositoryError
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
async def test_book_add_ok(uow: SQLUnitOfWork, book: Book, sqla_session: AsyncSession):
async with uow as transaction:
res = await uow.books.add(book)
await transaction.commit()
assert res.is_ok()
row = (
await sqla_session.execute(select(orm.books).where(orm.books.c.id == book.id))
).first()
assert row is not None
assert row._asdict() == book.model_dump(exclude={"messages"})
# ensure the message bus can follow the book messages
assert uow.books.seen == [book]
async def test_book_add_err(uow: SQLUnitOfWork, book: Book):
# Add a book in the repository
async with uow as transaction:
res = await uow.books.add(book)
assert res.is_ok()
await transaction.commit()
uow.books.seen.clear()
# Now, tests that it wrap the error
async with uow as transaction:
res = await uow.books.add(book)
assert res.is_err()
assert res.unwrap_err() == BookRepositoryError.integrity_error
await transaction.rollback()
# Since it does not work, the bus can't see the book messages.
assert uow.books.seen == []
We can see that those tests expect that: * the add method will return Ok(…) if its works * the stored book saved correspont to what the model contains * the seen attribute, is set to let the message bus consume the book messages. * integrity error does not raise but are stored in a Err(). * the seen attribute does not contains models that can’t be stored in the repository.
We also see that those new fixtures are required in our uow_sqla/conftest.py
:
import pytest
from lastuuid.dummies import uuidgen
from reading_club.adapters.uow_sqla.uow import SQLUnitOfWork
from reading_club.domain.model import Book
from sqlalchemy.ext.asyncio import AsyncEngine
from messagebus import AsyncAbstractEventstreamTransport
@pytest.fixture
def uow(
transport: AsyncAbstractEventstreamTransport, sqla_engine: AsyncEngine
) -> SQLUnitOfWork:
return SQLUnitOfWork(transport, sqla_engine)
@pytest.fixture
def book():
return Book(
id=uuidgen(1),
title="Domain Driven Design",
author="Eric Evans",
isbn="0-321-12521-5",
)
Finally the add method implemented using SQLAlchemy
from uuid import UUID
from reading_club.domain.model import Book
from reading_club.service.repositories import (
AbstractBookRepository,
BookRepositoryError,
BookRepositoryOperationResult,
BookRepositoryResult,
)
from result import Err, Ok
from sqlalchemy import insert
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from . import orm
class SQLBookRepository(AbstractBookRepository):
def __init__(self, session: AsyncSession):
super().__init__()
self.session = session
async def add(self, model: Book) -> BookRepositoryOperationResult:
qry = insert(orm.books).values([model.model_dump(exclude={"messages"})])
try:
await self.session.execute(qry)
except IntegrityError:
return Err(BookRepositoryError.integrity_error)
self.seen.append(model)
return Ok(...)
async def by_id(self, id: UUID) -> BookRepositoryResult:
raise NotImplementedError
The tests suite should pass.
$ poetry run pytest -sxv
========================== test session starts ==========================
collected 6 items
tests/test_service_handler_add_book.py::test_register_book PASSED
tests/test_service_handler_add_book.py::test_bus_handler PASSED
tests/uow_sqla/test_repositories.py::test_book_add_ok PASSED
tests/uow_sqla/test_repositories.py::test_book_add_err PASSED
tests/uow_sqla/test_transaction.py::test_commit PASSED
tests/uow_sqla/test_transaction.py::test_rollback PASSED
=========================== 4 passed in 0.04s ===========================
Lets continue with the by_id
implementation.
here is our test
from reading_club.adapters.uow_sqla.uow import SQLUnitOfWork
from reading_club.domain.model import Book
from reading_club.service.repositories import BookRepositoryError
async def test_book_by_id_ok(uow: SQLUnitOfWork, book: Book):
# Add a book in the repository
async with uow as transaction:
res = await uow.books.add(book)
assert res.is_ok()
await transaction.commit()
uow.books.seen.clear()
# Now, tests that the book is here
async with uow as transaction:
res = await uow.books.by_id(book.id)
assert res.is_ok()
book_from_uow = res.unwrap()
await transaction.rollback()
assert book_from_uow == book
async def test_book_by_id_err(uow: SQLUnitOfWork, book: Book):
# Now, tests that the book is here
async with uow as transaction:
res = await uow.books.by_id(book.id)
assert res.is_err()
err = res.unwrap_err()
await transaction.rollback()
assert err == BookRepositoryError.not_found
..note:
you can see that our tests are a bit ugly, the initialization of the tests
is made inside the tests not in our fixtures.
Don't be afraid, we will improve that in the next chapter.
And our implmenetation
from uuid import UUID
from reading_club.domain.model import Book
from reading_club.service.repositories import (
AbstractBookRepository,
BookRepositoryError,
BookRepositoryOperationResult,
BookRepositoryResult,
)
from result import Err, Ok
from sqlalchemy import insert, select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from . import orm
class SQLBookRepository(AbstractBookRepository):
def __init__(self, session: AsyncSession):
super().__init__()
self.session = session
async def add(self, model: Book) -> BookRepositoryOperationResult:
qry = insert(orm.books).values([model.model_dump(exclude={"messages"})])
try:
await self.session.execute(qry)
except IntegrityError:
return Err(BookRepositoryError.integrity_error)
self.seen.append(model)
return Ok(...)
async def by_id(self, id: UUID) -> BookRepositoryResult:
qry = select(orm.books).where(orm.books.c.id == id)
row = (await self.session.execute(qry)).first()
if not row:
return Err(BookRepositoryError.not_found)
book = Book(**row._asdict())
return Ok(book)
Implement the event repository¶
Before implementing the BookRepository.by_id
we will take the time to implement
our event repository in order to get our bus working, which will be usefull to
create books using the message bus directly in our fixtures.
our new tests in``test_repositories.py``:
from lastuuid.dummies import uuidgen
from reading_club.adapters.uow_sqla import orm
from reading_club.adapters.uow_sqla.uow import SQLUnitOfWork
from reading_club.domain.messages import RegisterBook
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
async def test_eventstore_add(
uow: SQLUnitOfWork, register_book_cmd: RegisterBook, sqla_session: AsyncSession
):
register_book_cmd.id = uuidgen()
async with uow as transaction:
await uow.eventstore.add(register_book_cmd)
await transaction.commit()
row = (
await sqla_session.execute(
select(orm.messages).where(
orm.messages.c.id == register_book_cmd.message_id
)
)
).first()
assert row is not None
assert row.id == register_book_cmd.message_id
assert row.created_at == register_book_cmd.created_at
assert row.metadata == register_book_cmd.metadata.model_dump()
assert row.payload == {
"id": register_book_cmd.id,
"author": "Eric Evans",
"isbn": "0-321-12521-5",
"title": "Domain Driven Design",
}
And our implementation.
from sqlalchemy import insert
from sqlalchemy.ext.asyncio import AsyncSession
from messagebus import (
AsyncEventstoreAbstractRepository,
AsyncEventstreamPublisher,
Message,
)
from . import orm
class SQLEventstoreRepository(AsyncEventstoreAbstractRepository):
def __init__(self, session: AsyncSession, publisher: AsyncEventstreamPublisher):
super().__init__(publisher)
self.session = session
async def _add(self, message: Message) -> None:
qry = insert(orm.messages).values(
[
{
"id": message.message_id,
"created_at": message.created_at,
"metadata": message.metadata.model_dump(),
"payload": message.model_dump(
mode="json",
exclude={"message_id", "created_at", "metadata"},
),
}
]
)
await self.session.execute(qry)
There is no much to say here, it take the message and store in in the table. Because the messagebus does not rely on results, it does not return a Result object, our implementation raise exceptions if it does not works.
Before closing this chapter, lets run our tests and conclude
$ poetry run pytest -sxv
========================== test session starts ==========================
collected 9 items
tests/test_service_handler_add_book.py::test_register_book PASSED
tests/test_service_handler_add_book.py::test_bus_handler PASSED
tests/uow_sqla/test_repositories.py::test_book_add_ok PASSED
tests/uow_sqla/test_repositories.py::test_book_add_err PASSED
tests/uow_sqla/test_repositories.py::test_book_by_id_ok PASSED
tests/uow_sqla/test_repositories.py::test_book_by_id_err PASSED
tests/uow_sqla/test_repositories.py::test_eventstore_add PASSED
tests/uow_sqla/test_transaction.py::test_commit PASSED
tests/uow_sqla/test_transaction.py::test_rollback PASSED
=========================== 9 passed in 0.43s ===========================
At the moment, our book review model contains the book registration, with commands and events used by the messagebus.
But, we have some tests that are not clean, the test_book_add_err
that initialize
its tests inside them, which will not scale, and more for the test_book_by_id_ok
,
we retrieve a book, but we only have one book here, so, we cannot be sure that it
is the proper book that could be retrieve in real life.
This is the subject of the next chapter.