Unit Of Work

Now that we have a repository, and can define the unit of work for our first repository.

from reading_club.service.repositories import AbstractBookRepository

from messagebus import AsyncAbstractUnitOfWork


class AbstractUnitOfWork(AsyncAbstractUnitOfWork[AbstractBookRepository]):
    books: AbstractBookRepository

Thas was fast !

The unit of work methods are defined in the parent class AsyncAbstractUnitOfWork.

The unit of work in here to define a business transaction. It store models and message to process by the message bus.

Implementing a unit of work

As test driven developper, our, first implementation of the unit of work is written in our``tests/conftest.py``, we have to implment the commit and rollback methods, and, for testing purpose, we don’t need to implement real transaction.

from reading_club.service.repositories import AbstractBookRepository
from reading_club.service.uow import AbstractUnitOfWork


class InMemoryBookRepository(AbstractBookRepository):
    # see chapter repository for the content of the class
    ...


class InMemoryUnitOfWork(AbstractUnitOfWork):
    def __init__(self):
        self.books = InMemoryBookRepository()

    async def commit(self) -> None: ...

    async def rollback(self) -> None: ...