Unit Of Work¶
Now that we have a repository, and can define the unit of work for our first repository.
from reading_club.service.repositories import AbstractBookRepository
from messagebus import AsyncAbstractUnitOfWork
class AbstractUnitOfWork(AsyncAbstractUnitOfWork[AbstractBookRepository]):
books: AbstractBookRepository
Thas was fast !
The unit of work methods are defined in the parent class AsyncAbstractUnitOfWork
.
The unit of work in here to define a business transaction. It store models and message to process by the message bus.
Implementing a unit of work¶
As test driven developper, our, first implementation of the unit of work is written in our``tests/conftest.py``, we have to implment the commit and rollback methods, and, for testing purpose, we don’t need to implement real transaction.
from reading_club.service.repositories import AbstractBookRepository
from reading_club.service.uow import AbstractUnitOfWork
class InMemoryBookRepository(AbstractBookRepository):
# see chapter repository for the content of the class
...
class InMemoryUnitOfWork(AbstractUnitOfWork):
def __init__(self):
self.books = InMemoryBookRepository()
async def commit(self) -> None: ...
async def rollback(self) -> None: ...