Eventstream With Celery¶
In the chapter 6, we discover how the messagebus library handle eventstream.
We’ve implement an messagebus.AsyncAbstractEventstreamTransport
in order
to send events to a stream.
We’ve done a fake implementation, sending to a python list, it’s time to give a try using the popular library Celery.
Start by installing the latest celery version, celery 5 at the moment, and we are going to celery pytest fixtures too, so we can install it now.
Important
Because the library is thin as possible, in term of code and dependencies, the messagebus library does not comes with an implementation, the present documentation can be reproduce and adapted.
$ poetry add celery[pytest]
we will create a new adapter in src/reading_club/adapters/eventstream.py
and we will do a sub testing direcectory in order to reuse all the main fixtures
from our main conftest, but override the transport
to tests our new one.
Lets write a test in tests/eventstream_celery/conftest.py
import asyncio
import json
from reading_club.domain.messages import RegisterBook
async def test_eventstream(uow, bus, celery_app, celery_worker):
messages = []
@celery_app.task(name="send_message")
def send_message(message):
return messages.append(message)
celery_worker.reload()
async with uow as transaction:
await bus.handle(
RegisterBook(
id="y",
title="Architecture Patterns With Python",
author="Harry Percival and Bob Gregory",
isbn="978-1492052203",
),
transaction,
)
await transaction.commit()
# let the worker process the task
for _ in range(10):
if len(messages) != 0:
break
await asyncio.sleep(0.1)
assert len(messages) > 0
for message in messages:
message["payload"] = json.loads(message["payload"])
assert messages == [
{
"created_at": messages[0]["created_at"],
"id": messages[0]["id"],
"payload": {
"id": "y",
"isbn": "978-1492052203",
"title": "Architecture Patterns With Python",
"author": "Harry Percival and Bob Gregory",
},
"type": "book_registered_v1",
},
]
The tests here follow the celery doc. It use the celery_app
and celery_worker
fixtures from the pytest-celery package.
and our implementation in src/reading_club/adapters/eventstream.py
import asyncio
from collections.abc import Mapping
from typing import Any
from celery import Celery
from messagebus import AsyncAbstractEventstreamTransport
class EventstreamTransport(AsyncAbstractEventstreamTransport):
"""
Transport a message to the event stream using Celery.
"""
def __init__(self, app: Celery):
self.celery_client = app
async def send_message_serialized(self, message: Mapping[str, Any]) -> None:
"""Publish a serialized message to the messagestream."""
loop = asyncio.get_event_loop()
def send_message():
self.celery_client.send_task("send_message", kwargs={"message": message})
await loop.run_in_executor(None, send_message)
Note that Celery does not support asyncio at the moment, so we run the tasks in an executor.
$ poetry run pytest -sxv
========================== test session starts ==========================
collected 11 items
tests/test_service_handler_add_book.py::test_register_book[ok] PASSED
tests/test_service_handler_add_book.py::test_register_book[integrity error] PASSED
tests/test_service_handler_add_book.py::test_bus_handler PASSED
tests/eventstream_celery/test_adapter.py::test_eventstream PASSED
tests/uow_sqla/test_repositories.py::test_book_add_ok PASSED
tests/uow_sqla/test_repositories.py::test_book_add_err[params0] PASSED
tests/uow_sqla/test_repositories.py::test_book_by_id[return a known book] PASSED
tests/uow_sqla/test_repositories.py::test_book_by_id[return an error] PASSED
tests/uow_sqla/test_repositories.py::test_eventstore_add PASSED
tests/uow_sqla/test_transaction.py::test_commit PASSED
tests/uow_sqla/test_transaction.py::test_rollback PASSED
========================== 11 passed in 4.37s ===========================
The tests pass, but it is terribly slow.
So we can keep it like this, or maybe write more code in order to tests what we are responsible for.
Actually, we don’t need to tests that a worker can retrieve our message, we’ve tested it once, and it works, we probably kept that kind of tests in a functional tests suite, but now, we will just ensure we can send a task using a celery fake object.
Lets write our own fixtures.
from collections.abc import MutableSequence
from typing import Any
import pytest
from reading_club.adapters.eventstream import EventstreamTransport
from messagebus import AsyncAbstractEventstreamTransport
class FakeCelery:
def __init__(self, queue: MutableSequence[Any]):
self.queue = queue
def send_task(self, task, kwargs):
self.queue.append((task, kwargs))
@pytest.fixture
def celery_queue() -> MutableSequence[Any]:
return []
@pytest.fixture
def celery_app(celery_queue) -> FakeCelery:
return FakeCelery(celery_queue)
@pytest.fixture
def transport(celery_app) -> AsyncAbstractEventstreamTransport:
return EventstreamTransport(celery_app)
We’ve create a fake celery class, that just implementing the API things we consume,
and a fixture celery_queue
which will receive the tasks we want to track that
are sent.
import json
from lastuuid.dummies import uuidgen
from reading_club.domain.messages import RegisterBook
async def test_eventstream(uow, bus, celery_queue):
async with uow as transaction:
await bus.handle(
RegisterBook(
id=uuidgen(1),
title="Architecture Patterns With Python",
author="Harry Percival and Bob Gregory",
isbn="978-1492052203",
),
transaction,
)
await transaction.commit()
assert len(celery_queue) > 0
message = celery_queue[0][1]["message"]
message["payload"] = json.loads(message["payload"])
assert celery_queue == [
(
"send_message",
{
"message": {
"created_at": message["created_at"],
"id": str(message["id"]),
"payload": {
"id": str(uuidgen(1)),
"isbn": "978-1492052203",
"title": "Architecture Patterns With Python",
"author": "Harry Percival and Bob Gregory",
},
"type": "book_registered_v1",
}
},
),
]
We adapt our tests to ensure that the message in the celery_queue
fixture
has been tracked.
Lets run our test.
$ poetry run pytest -sxv
========================== test session starts ==========================
collected 11 items
tests/test_service_handler_add_book.py::test_register_book[ok] PASSED
tests/test_service_handler_add_book.py::test_register_book[integrity error] PASSED
tests/test_service_handler_add_book.py::test_bus_handler PASSED
tests/eventstream_celery/test_adapter.py::test_eventstream PASSED
tests/uow_sqla/test_repositories.py::test_book_add_ok PASSED
tests/uow_sqla/test_repositories.py::test_book_add_err[params0] PASSED
tests/uow_sqla/test_repositories.py::test_book_by_id[return a known book] PASSED
tests/uow_sqla/test_repositories.py::test_book_by_id[return an error] PASSED
tests/uow_sqla/test_repositories.py::test_eventstore_add PASSED
tests/uow_sqla/test_transaction.py::test_commit PASSED
tests/uow_sqla/test_transaction.py::test_rollback PASSED
========================== 11 passed in 0.30s ===========================
The test suite is now fast again.
Note
The Celery.send_task
method is used to generate a signature without having
a Celery task created.
In real life, the Celery used here just require the correct broker, routing,
and serialization directive. Says differently, only celery configuration is
required, not code.