messagebus API

messagebus API.

class messagebus.AbstractMessageSerializer

The message serializer take the message and return it with python native types, that are serializable for a transport.

abstractmethod serialize_message(message: Message[Any]) Mapping[str, Any]

Publish a message to the eventstream.

class messagebus.AsyncAbstractEventstreamTransport

Transport a message to the event stream.

abstractmethod async send_message_serialized(message: Mapping[str, Any]) None

Publish a serialized message to the eventstream.

class messagebus.AsyncAbstractRepository

Abstract Base Classe for Repository pattern.

seen: MutableSequence[TModel_contra]
class messagebus.AsyncAbstractUnitOfWork

Abstract unit of work.

To implement a unit of work, the AsyncAbstractUnitOfWork.commit() and AsyncAbstractUnitOfWork.rollback() has to be defined, and some repositories has to be declared has attributes.

collect_new_events() Iterator[Message[Any]]
abstractmethod async commit() None

Commit the transation.

eventstore: AsyncEventstoreAbstractRepository = <messagebus.service._async.repository.AsyncSinkholeEventstoreRepository object>
abstractmethod async rollback() None

Rollback the transation.

class messagebus.AsyncEventstoreAbstractRepository(publisher: AsyncEventstreamPublisher | None = None)
async add(message: Message[Any]) None

Add the message to the storage backend and mark as seen

seen message will be sent to the eventstream only if the unit of work has properly commit the transaction. If the transaction is rollback, then, message will be dropped too from the eventstream.

async publish_eventstream() None

Publish seen message to the eventstream.

class messagebus.AsyncEventstreamPublisher(transport: ~messagebus.service._async.eventstream.AsyncAbstractEventstreamTransport, serializer: ~messagebus.service.eventstream.AbstractMessageSerializer = <messagebus.service.eventstream.MessageSerializer object>)

Publish a message to the event stream.

Parameters:
  • serializer – Used to serialize the Message

  • transport – Used to send the serialized message to the eventstream.

async send_message(message: Message[Any]) None

Publish a message to the eventstream.

To publish a message in the eventstream, the flag parameter “published” of the metadata of the message must be set to true. By default, message are not pushed to the queue, given the control of private message, such as command, and public event, shared with eventstream consumers.

class messagebus.AsyncMessageBus(**dependencies: Any)

Store all the handlers for commands an events.

add_listener(msg_type: type[Message[Any]], callback: Callable[[Concatenate[Any, Any, P]], Coroutine[Any, Any, Any]]) None
async handle(command: GenericCommand[Any], uow: AsyncUnitOfWorkTransaction[TRepositories], **transient_dependencies: Any) Any

Notify listener of that event registered with messagebus.add_listener. Return the first event from the command.

Parameters:

message – The message to handle, should be a command.

remove_listener(msg_type: type, callback: Callable[[Concatenate[Any, Any, P]], Coroutine[Any, Any, Any]]) None
scan(*mods: str) None

Scan the module (or modules) containing service handlers.

when a message is handled by the bus, the bus propagate the message to hook functions, called Service Handler that receive the message, and a Unit Of Work to process it has a business transaction.

class messagebus.AsyncSinkholeEventstoreRepository(publisher: AsyncEventstreamPublisher | None = None)

An eventstore that drop all the message.

class messagebus.AsyncSinkholeEventstreamTransport

Drop all messages.

By default, the events are not streamed until it is configured to do so.

async send_message_serialized(message: Mapping[str, Any]) None

Do nothing.

class messagebus.AsyncUnitOfWorkTransaction(uow: AsyncAbstractUnitOfWork[TRepositories])
add_listener(listener: AsyncDependency) AsyncDependency
async commit() None
property eventstore: AsyncEventstoreAbstractRepository
async rollback() None
uow: AsyncAbstractUnitOfWork[TRepositories]
status: TransactionStatus
messagebus.Command

alias of GenericCommand[Metadata]

messagebus.Event

alias of GenericEvent[Metadata]

messagebus.Field(default: Any = PydanticUndefined, *, default_factory: Callable[[], Any] | Callable[[dict[str, Any]], Any] | None = PydanticUndefined, alias: str | None = PydanticUndefined, alias_priority: int | None = PydanticUndefined, validation_alias: str | AliasPath | AliasChoices | None = PydanticUndefined, serialization_alias: str | None = PydanticUndefined, title: str | None = PydanticUndefined, field_title_generator: Callable[[str, FieldInfo], str] | None = PydanticUndefined, description: str | None = PydanticUndefined, examples: list[Any] | None = PydanticUndefined, exclude: bool | None = PydanticUndefined, discriminator: str | types.Discriminator | None = PydanticUndefined, deprecated: Deprecated | str | bool | None = PydanticUndefined, json_schema_extra: JsonDict | Callable[[JsonDict], None] | None = PydanticUndefined, frozen: bool | None = PydanticUndefined, validate_default: bool | None = PydanticUndefined, repr: bool = PydanticUndefined, init: bool | None = PydanticUndefined, init_var: bool | None = PydanticUndefined, kw_only: bool | None = PydanticUndefined, pattern: str | Pattern[str] | None = PydanticUndefined, strict: bool | None = PydanticUndefined, coerce_numbers_to_str: bool | None = PydanticUndefined, gt: annotated_types.SupportsGt | None = PydanticUndefined, ge: annotated_types.SupportsGe | None = PydanticUndefined, lt: annotated_types.SupportsLt | None = PydanticUndefined, le: annotated_types.SupportsLe | None = PydanticUndefined, multiple_of: float | None = PydanticUndefined, allow_inf_nan: bool | None = PydanticUndefined, max_digits: int | None = PydanticUndefined, decimal_places: int | None = PydanticUndefined, min_length: int | None = PydanticUndefined, max_length: int | None = PydanticUndefined, union_mode: Literal['smart', 'left_to_right'] = PydanticUndefined, fail_fast: bool | None = PydanticUndefined, **extra: Unpack[_EmptyKwargs]) Any

Usage docs: https://docs.pydantic.dev/2.10/concepts/fields

Create a field for objects that can be configured.

Used to provide extra information about a field, either for the model schema or complex validation. Some arguments apply only to number fields (int, float, Decimal) and some apply only to str.

Note:
  • Any _Unset objects will be replaced by the corresponding value defined in the _DefaultValues dictionary. If a key for the _Unset object is not found in the _DefaultValues dictionary, it will default to None

Args:

default: Default value if the field is not set. default_factory: A callable to generate the default value. The callable can either take 0 arguments

(in which case it is called as is) or a single argument containing the already validated data.

alias: The name to use for the attribute when validating or serializing by alias.

This is often used for things like converting between snake and camel case.

alias_priority: Priority of the alias. This affects whether an alias generator is used. validation_alias: Like alias, but only affects validation, not serialization. serialization_alias: Like alias, but only affects serialization, not validation. title: Human-readable title. field_title_generator: A callable that takes a field name and returns title for it. description: Human-readable description. examples: Example values for this field. exclude: Whether to exclude the field from the model serialization. discriminator: Field name or Discriminator for discriminating the type in a tagged union. deprecated: A deprecation message, an instance of warnings.deprecated or the typing_extensions.deprecated backport,

or a boolean. If True, a default deprecation message will be emitted when accessing the field.

json_schema_extra: A dict or callable to provide extra JSON schema properties. frozen: Whether the field is frozen. If true, attempts to change the value on an instance will raise an error. validate_default: If True, apply validation to the default value every time you create an instance.

Otherwise, for performance reasons, the default value of the field is trusted and not validated.

repr: A boolean indicating whether to include the field in the __repr__ output. init: Whether the field should be included in the constructor of the dataclass.

(Only applies to dataclasses.)

init_var: Whether the field should _only_ be included in the constructor of the dataclass.

(Only applies to dataclasses.)

kw_only: Whether the field should be a keyword-only argument in the constructor of the dataclass.

(Only applies to dataclasses.)

coerce_numbers_to_str: Whether to enable coercion of any Number type to str (not applicable in strict mode). strict: If True, strict validation is applied to the field.

See [Strict Mode](../concepts/strict_mode.md) for details.

gt: Greater than. If set, value must be greater than this. Only applicable to numbers. ge: Greater than or equal. If set, value must be greater than or equal to this. Only applicable to numbers. lt: Less than. If set, value must be less than this. Only applicable to numbers. le: Less than or equal. If set, value must be less than or equal to this. Only applicable to numbers. multiple_of: Value must be a multiple of this. Only applicable to numbers. min_length: Minimum length for iterables. max_length: Maximum length for iterables. pattern: Pattern for strings (a regular expression). allow_inf_nan: Allow inf, -inf, nan. Only applicable to numbers. max_digits: Maximum number of allow digits for strings. decimal_places: Maximum number of decimal places allowed for numbers. union_mode: The strategy to apply when validating a union. Can be smart (the default), or left_to_right.

See [Union Mode](../concepts/unions.md#union-modes) for details.

fail_fast: If True, validation will stop on the first error. If False, all validation errors will be collected.

This option can be applied only to iterable types (list, tuple, set, and frozenset).

extra: (Deprecated) Extra fields that will be included in the JSON schema.

!!! warning Deprecated

The extra kwargs is deprecated. Use json_schema_extra instead.

Returns:
A new [FieldInfo][pydantic.fields.FieldInfo]. The return annotation is Any so Field can be used on

type-annotated fields without causing a type error.

class messagebus.GenericCommand(*, message_id: ~uuid.UUID = <factory>, created_at: ~datetime.datetime = <factory>, metadata: ~messagebus.domain.model.metadata.TMetadata)

Baseclass for message of type command used to customized (overrride) the Metadata.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class messagebus.GenericEvent(*, message_id: ~uuid.UUID = <factory>, created_at: ~datetime.datetime = <factory>, metadata: ~messagebus.domain.model.metadata.TMetadata)

Baseclass for message of type event used to customized (overrride) the Metadata.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class messagebus.GenericModel(*, messages: ~collections.abc.MutableSequence[~messagebus.domain.model.message.Message] = <factory>)

Base class for model.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

messages: MutableSequence[Message]

List of messages consumed by the unit of work to mutate the repository.

Those message are ephemeral, published by event handler and consumed by the unit of work during the process of an original command.

class messagebus.Message(*, message_id: ~uuid.UUID = <factory>, created_at: ~datetime.datetime = <factory>, metadata: ~messagebus.domain.model.metadata.TMetadata)

Base class for messaging.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

message_id: UUID

Unique identifier of the message.

created_at: datetime

Timestamp of the message.

All messages are kept in order for observability, debug and event replay.

metadata: TMetadata

Define extra fields used at serialization.

While serializing the message, a name and version must be defined to properly defined the message. Event if the class is renamed, those constants must be kept identically over the time in the codebase.

metadata are defined statically at the definition of the message.

class messagebus.Metadata(*, name: str, schema_version: int, published: bool = False)

Every message, commands and event have metadata used and sends while serialization.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name: str

Name of the schema.

Identity a message type when serialization..

schema_version: int

Version of the schema.

Every message schema is versionned in order to have multiple event/command living together with multiple handler.

published: bool

Publish the event to an eventstream.

If the unit of work is associated to an eventstream, then the message is send throw it only if the flag has been set to True.

It allows to have internal commands and events and public ones. Sending message to an event queue create a coupling, and updating signature can introduce breaking changes.

messagebus.Model

alias of GenericModel[Metadata]

class messagebus.SyncAbstractEventstreamTransport

Transport a message to the event stream.

abstractmethod send_message_serialized(message: Mapping[str, Any]) None

Publish a serialized message to the eventstream.

class messagebus.SyncAbstractRepository

Abstract Base Classe for Repository pattern.

seen: MutableSequence[TModel_contra]
class messagebus.SyncAbstractUnitOfWork

Abstract unit of work.

To implement a unit of work, the AsyncAbstractUnitOfWork.commit() and AsyncAbstractUnitOfWork.rollback() has to be defined, and some repositories has to be declared has attributes.

collect_new_events() Iterator[Message[Any]]
abstractmethod commit() None

Commit the transation.

eventstore: SyncEventstoreAbstractRepository = <messagebus.service._sync.repository.SyncSinkholeEventstoreRepository object>
abstractmethod rollback() None

Rollback the transation.

class messagebus.SyncEventstoreAbstractRepository(publisher: SyncEventstreamPublisher | None = None)
add(message: Message[Any]) None

Add the message to the storage backend and mark as seen

seen message will be sent to the eventstream only if the unit of work has properly commit the transaction. If the transaction is rollback, then, message will be dropped too from the eventstream.

publish_eventstream() None

Publish seen message to the eventstream.

class messagebus.SyncEventstreamPublisher(transport: ~messagebus.service._sync.eventstream.SyncAbstractEventstreamTransport, serializer: ~messagebus.service.eventstream.AbstractMessageSerializer = <messagebus.service.eventstream.MessageSerializer object>)

Publish a message to the event stream.

Parameters:
  • serializer – Used to serialize the Message

  • transport – Used to send the serialized message to the eventstream.

send_message(message: Message[Any]) None

Publish a message to the eventstream.

To publish a message in the eventstream, the flag parameter “published” of the metadata of the message must be set to true. By default, message are not pushed to the queue, given the control of private message, such as command, and public event, shared with eventstream consumers.

class messagebus.SyncMessageBus(**dependencies: Any)

Store all the handlers for commands an events.

add_listener(msg_type: type[Message[Any]], callback: Callable[[Concatenate[Any, Any, P]], Any]) None
handle(command: GenericCommand[Any], uow: SyncUnitOfWorkTransaction[TRepositories], **transient_dependencies: Any) Any

Notify listener of that event registered with messagebus.add_listener. Return the first event from the command.

Parameters:

message – The message to handle, should be a command.

remove_listener(msg_type: type, callback: Callable[[Concatenate[Any, Any, P]], Any]) None
scan(*mods: str) None

Scan the module (or modules) containing service handlers.

when a message is handled by the bus, the bus propagate the message to hook functions, called Service Handler that receive the message, and a Unit Of Work to process it has a business transaction.

class messagebus.SyncSinkholeEventstoreRepository(publisher: SyncEventstreamPublisher | None = None)

An eventstore that drop all the message.

class messagebus.SyncSinkholeEventstreamTransport

Drop all messages.

By default, the events are not streamed until it is configured to do so.

send_message_serialized(message: Mapping[str, Any]) None

Do nothing.

class messagebus.SyncUnitOfWorkTransaction(uow: SyncAbstractUnitOfWork[TRepositories])
add_listener(listener: SyncDependency) SyncDependency
commit() None
property eventstore: SyncEventstoreAbstractRepository
rollback() None
uow: SyncAbstractUnitOfWork[TRepositories]
status: TransactionStatus
messagebus.async_listen(wrapped: Callable[[Concatenate[TMessage, TAsyncUow, P]], Coroutine[Any, Any, Any]]) Callable[[Concatenate[TMessage, TAsyncUow, P]], Coroutine[Any, Any, Any]]

Decorator to listen for a command or an event.

Note that you can handle one listener for a command, and many for events. The command handler result is returned by the handle call of the message bus.

messagebus.sync_listen(wrapped: Callable[[Concatenate[TMessage, TSyncUow, P]], Any]) Callable[[Concatenate[TMessage, TSyncUow, P]], Any]

Decorator to listen for a command or an event.

Note that you can handle one listener for a command, and many for events. The command handler result is returned by the handle call of the message bus.

class messagebus.AsyncDependency

Describe an async dependency

abstractmethod async on_after_commit() None

Method called when the unit of work transaction is has been commited.

abstractmethod async on_after_rollback() None

Method called when the unit of work transaction is has been rolled back.

class messagebus.SyncDependency

Describe an async dependency

abstractmethod on_after_commit() None

Method called when the unit of work transaction is has been commited.

abstractmethod on_after_rollback() None

Method called when the unit of work transaction is has been rolled back.