Eventstream With Celery#

In the chapter 6, we discover how the jeepito library handle eventstream. We’ve implement an jeepito.AsyncAbstractEventstreamTransport in order to send events to a stream.

We’ve done a fake implementation, sending to a python list, it’s time to give a try using the popular library Celery.

Start by installing the latest celery version, celery 5 at the moment, and we are going to celery pytest fixtures too, so we can install it now.

Important

Because the library is thin as possible, in term of code and dependencies, the jeepito library does not comes with an implementation, the present documentation can be reproduce and adapted.

$ poetry add celery[pytest]

we will create a new adapter in src/reading_club/adapters/eventstream.py and we will do a sub testing direcectory in order to reuse all the main fixtures from our main conftest, but override the transport to tests our new one.

Lets write a test in tests/eventstream_celery/conftest.py

import asyncio
import json

from reading_club.domain.messages import RegisterBook


async def test_eventstream(uow, bus, celery_app, celery_worker):
    messages = []

    @celery_app.task(name="send_message")
    def send_message(message):
        return messages.append(message)

    celery_worker.reload()

    async with uow as transaction:
        await bus.handle(
            RegisterBook(
                id="y",
                title="Architecture Patterns With Python",
                author="Harry Percival and Bob Gregory",
                isbn="978-1492052203",
            ),
            transaction,
        )
        await transaction.commit()

    # let the worker process the task
    for _ in range(10):
        if len(messages) != 0:
            break
        await asyncio.sleep(0.1)

    assert len(messages) > 0
    for message in messages:
        message["payload"] = json.loads(message["payload"])
    assert messages == [
        {
            "created_at": messages[0]["created_at"],
            "id": messages[0]["id"],
            "payload": {
                "id": "y",
                "isbn": "978-1492052203",
                "title": "Architecture Patterns With Python",
                "author": "Harry Percival and Bob Gregory",
            },
            "type": "book_registered_v1",
        },
    ]

The tests here follow the celery doc. It use the celery_app and celery_worker fixtures from the pytest-celery package.

and our implementation in src/reading_club/adapters/eventstream.py

import asyncio
from typing import Any, Mapping

from celery import Celery

from jeepito import AsyncAbstractEventstreamTransport


class EventstreamTransport(AsyncAbstractEventstreamTransport):
    """
    Transport a message to the event stream using Celery.
    """

    def __init__(self, app: Celery):
        self.celery_client = app

    async def send_message_serialized(self, message: Mapping[str, Any]) -> None:
        """Publish a serialized message to the messagestream."""
        loop = asyncio.get_event_loop()

        def send_message():
            self.celery_client.send_task("send_message", kwargs={"message": message})

        await loop.run_in_executor(None, send_message)

Note that Celery does not support asyncio at the moment, so we run the tasks in an executor.

$ poetry run pytest -sxv
========================== test session starts ==========================
collected 11 items

tests/test_service_handler_add_book.py::test_register_book[ok] PASSED
tests/test_service_handler_add_book.py::test_register_book[integrity error] PASSED
tests/test_service_handler_add_book.py::test_bus_handler PASSED
tests/eventstream_celery/test_adapter.py::test_eventstream PASSED
tests/uow_sqla/test_repositories.py::test_book_add_ok PASSED
tests/uow_sqla/test_repositories.py::test_book_add_err[params0] PASSED
tests/uow_sqla/test_repositories.py::test_book_by_id[return a known book] PASSED
tests/uow_sqla/test_repositories.py::test_book_by_id[return an error] PASSED
tests/uow_sqla/test_repositories.py::test_eventstore_add PASSED
tests/uow_sqla/test_transaction.py::test_commit PASSED
tests/uow_sqla/test_transaction.py::test_rollback PASSED
========================== 11 passed in 4.37s ===========================

The tests pass, but it is terribly slow.

So we can keep it like this, or maybe write more code in order to tests what we are responsible for.

Actually, we don’t need to tests that a worker can retrieve our message, we’ve tested it once, and it works, we probably kept that kind of tests in a functional tests suite, but now, we will just ensure we can send a task using a celery fake object.

Lets write our own fixtures.

from typing import Any, MutableSequence

import pytest
from reading_club.adapters.eventstream import EventstreamTransport

from jeepito import AsyncAbstractEventstreamTransport


class FakeCelery:
    def __init__(self, queue: MutableSequence[Any]):
        self.queue = queue

    def send_task(self, task, kwargs):
        self.queue.append((task, kwargs))


@pytest.fixture
def celery_queue() -> MutableSequence[Any]:
    return []


@pytest.fixture
def celery_app(celery_queue) -> FakeCelery:
    return FakeCelery(celery_queue)


@pytest.fixture
def transport(celery_app) -> AsyncAbstractEventstreamTransport:
    return EventstreamTransport(celery_app)

We’ve create a fake celery class, that just implementing the API things we consume, and a fixture celery_queue which will receive the tasks we want to track that are sent.

import json

from reading_club.domain.messages import RegisterBook


async def test_eventstream(uow, bus, celery_queue):
    async with uow as transaction:
        await bus.handle(
            RegisterBook(
                id="y",
                title="Architecture Patterns With Python",
                author="Harry Percival and Bob Gregory",
                isbn="978-1492052203",
            ),
            transaction,
        )
        await transaction.commit()

    assert len(celery_queue) > 0

    message = celery_queue[0][1]["message"]
    message["payload"] = json.loads(message["payload"])
    assert celery_queue == [
        (
            "send_message",
            {
                "message": {
                    "created_at": message["created_at"],
                    "id": message["id"],
                    "payload": {
                        "id": "y",
                        "isbn": "978-1492052203",
                        "title": "Architecture Patterns With Python",
                        "author": "Harry Percival and Bob Gregory",
                    },
                    "type": "book_registered_v1",
                }
            },
        ),
    ]

We adapt our tests to ensure that the message in the celery_queue fixture has been tracked.

Lets run our test.

$ poetry run pytest -sxv
========================== test session starts ==========================
collected 11 items

tests/test_service_handler_add_book.py::test_register_book[ok] PASSED
tests/test_service_handler_add_book.py::test_register_book[integrity error] PASSED
tests/test_service_handler_add_book.py::test_bus_handler PASSED
tests/eventstream_celery/test_adapter.py::test_eventstream PASSED
tests/uow_sqla/test_repositories.py::test_book_add_ok PASSED
tests/uow_sqla/test_repositories.py::test_book_add_err[params0] PASSED
tests/uow_sqla/test_repositories.py::test_book_by_id[return a known book] PASSED
tests/uow_sqla/test_repositories.py::test_book_by_id[return an error] PASSED
tests/uow_sqla/test_repositories.py::test_eventstore_add PASSED
tests/uow_sqla/test_transaction.py::test_commit PASSED
tests/uow_sqla/test_transaction.py::test_rollback PASSED

========================== 11 passed in 0.30s ===========================

The test suite is now fast again.

Note

The Celery.send_task method is used to generate a signature without having a Celery task created. In real life, the Celery used here just require the correct broker, routing, and serialization directive. Says differently, only celery configuration is required, not code.