messagebus API

messagebus API.

class messagebus.AbstractMessageSerializer

The message serializer take the message and return it with python native types, that are serializable for a transport.

abstract serialize_message(message: Message[Any]) Mapping[str, Any]

Publish a message to the eventstream.

class messagebus.AsyncAbstractEventstreamTransport

Transport a message to the event stream.

abstract async send_message_serialized(message: Mapping[str, Any]) None

Publish a serialized message to the eventstream.

class messagebus.AsyncAbstractRepository

Abstract Base Classe for Repository pattern.

seen: MutableSequence[TModel_contra]
class messagebus.AsyncAbstractUnitOfWork

Abstract unit of work.

To implement a unit of work, the AsyncAbstractUnitOfWork.commit() and AsyncAbstractUnitOfWork.rollback() has to be defined, and some repositories has to be declared has attributes.

eventstore: AsyncEventstoreAbstractRepository = <messagebus.service._async.repository.AsyncSinkholeEventstoreRepository object>
collect_new_events() Iterator[Message[Any]]
abstract async commit() None

Commit the transation.

abstract async rollback() None

Rollback the transation.

class messagebus.AsyncEventstoreAbstractRepository(publisher: AsyncEventstreamPublisher | None = None)
async add(message: Message[Any]) None

Add the message to the storage backend and mark as seen

seen message will be sent to the eventstream only if the unit of work has properly commit the transaction. If the transaction is rollback, then, message will be dropped too from the eventstream.

async publish_eventstream() None

Publish seen message to the eventstream.

class messagebus.AsyncEventstreamPublisher(transport: ~messagebus.service._async.eventstream.AsyncAbstractEventstreamTransport, serializer: ~messagebus.service.eventstream.AbstractMessageSerializer = <messagebus.service.eventstream.MessageSerializer object>)

Publish a message to the event stream.

Parameters:
  • serializer – Used to serialize the Message

  • transport – Used to send the serialized message to the eventstream.

async send_message(message: Message[Any]) None

Publish a message to the eventstream.

To publish a message in the eventstream, the flag parameter “published” of the metadata of the message must be set to true. By default, message are not pushed to the queue, given the control of private message, such as command, and public event, shared with eventstream consumers.

class messagebus.AsyncMessageBus

Store all the handlers for commands an events.

add_listener(msg_type: type[Message[Any]], callback: Callable[[Any, Any], Coroutine[Any, Any, Any]]) None
remove_listener(msg_type: type, callback: Callable[[Any, Any], Coroutine[Any, Any, Any]]) None
async handle(message: Message[Any], uow: AsyncUnitOfWorkTransaction[TRepositories]) Any

Notify listener of that event registered with messagebus.add_listener. Return the first event from the command.

scan(*mods: str) None

Scan the module (or modules) containing service handlers.

when a message is handled by the bus, the bus propagate the message to hook functions, called Service Handler that receive the message, and a Unit Of Work to process it has a business transaction.

class messagebus.AsyncSinkholeEventstoreRepository(publisher: AsyncEventstreamPublisher | None = None)

An eventstore that drop all the message.

class messagebus.AsyncSinkholeEventstreamTransport

Drop all messages.

By default, the events are not streamed until it is configured to do so.

async send_message_serialized(message: Mapping[str, Any]) None

Do nothing.

class messagebus.AsyncUnitOfWorkTransaction(uow: AsyncAbstractUnitOfWork[TRepositories])
status: TransactionStatus
uow: AsyncAbstractUnitOfWork[TRepositories]
property eventstore: AsyncEventstoreAbstractRepository
async commit() None
async rollback() None
messagebus.Command

alias of GenericCommand[Metadata]

messagebus.Event

alias of GenericEvent[Metadata]

messagebus.Field(default: Any = PydanticUndefined, *, default_factory: Callable[[], Any] | Callable[[dict[str, Any]], Any] | None = PydanticUndefined, alias: str | None = PydanticUndefined, alias_priority: int | None = PydanticUndefined, validation_alias: str | AliasPath | AliasChoices | None = PydanticUndefined, serialization_alias: str | None = PydanticUndefined, title: str | None = PydanticUndefined, field_title_generator: Callable[[str, FieldInfo], str] | None = PydanticUndefined, description: str | None = PydanticUndefined, examples: list[Any] | None = PydanticUndefined, exclude: bool | None = PydanticUndefined, discriminator: str | types.Discriminator | None = PydanticUndefined, deprecated: Deprecated | str | bool | None = PydanticUndefined, json_schema_extra: JsonDict | Callable[[JsonDict], None] | None = PydanticUndefined, frozen: bool | None = PydanticUndefined, validate_default: bool | None = PydanticUndefined, repr: bool = PydanticUndefined, init: bool | None = PydanticUndefined, init_var: bool | None = PydanticUndefined, kw_only: bool | None = PydanticUndefined, pattern: str | Pattern[str] | None = PydanticUndefined, strict: bool | None = PydanticUndefined, coerce_numbers_to_str: bool | None = PydanticUndefined, gt: annotated_types.SupportsGt | None = PydanticUndefined, ge: annotated_types.SupportsGe | None = PydanticUndefined, lt: annotated_types.SupportsLt | None = PydanticUndefined, le: annotated_types.SupportsLe | None = PydanticUndefined, multiple_of: float | None = PydanticUndefined, allow_inf_nan: bool | None = PydanticUndefined, max_digits: int | None = PydanticUndefined, decimal_places: int | None = PydanticUndefined, min_length: int | None = PydanticUndefined, max_length: int | None = PydanticUndefined, union_mode: Literal['smart', 'left_to_right'] = PydanticUndefined, fail_fast: bool | None = PydanticUndefined, **extra: Unpack[_EmptyKwargs]) Any

Usage docs: https://docs.pydantic.dev/2.10/concepts/fields

Create a field for objects that can be configured.

Used to provide extra information about a field, either for the model schema or complex validation. Some arguments apply only to number fields (int, float, Decimal) and some apply only to str.

Note:
  • Any _Unset objects will be replaced by the corresponding value defined in the _DefaultValues dictionary. If a key for the _Unset object is not found in the _DefaultValues dictionary, it will default to None

Args:

default: Default value if the field is not set. default_factory: A callable to generate the default value. The callable can either take 0 arguments

(in which case it is called as is) or a single argument containing the already validated data.

alias: The name to use for the attribute when validating or serializing by alias.

This is often used for things like converting between snake and camel case.

alias_priority: Priority of the alias. This affects whether an alias generator is used. validation_alias: Like alias, but only affects validation, not serialization. serialization_alias: Like alias, but only affects serialization, not validation. title: Human-readable title. field_title_generator: A callable that takes a field name and returns title for it. description: Human-readable description. examples: Example values for this field. exclude: Whether to exclude the field from the model serialization. discriminator: Field name or Discriminator for discriminating the type in a tagged union. deprecated: A deprecation message, an instance of warnings.deprecated or the typing_extensions.deprecated backport,

or a boolean. If True, a default deprecation message will be emitted when accessing the field.

json_schema_extra: A dict or callable to provide extra JSON schema properties. frozen: Whether the field is frozen. If true, attempts to change the value on an instance will raise an error. validate_default: If True, apply validation to the default value every time you create an instance.

Otherwise, for performance reasons, the default value of the field is trusted and not validated.

repr: A boolean indicating whether to include the field in the __repr__ output. init: Whether the field should be included in the constructor of the dataclass.

(Only applies to dataclasses.)

init_var: Whether the field should _only_ be included in the constructor of the dataclass.

(Only applies to dataclasses.)

kw_only: Whether the field should be a keyword-only argument in the constructor of the dataclass.

(Only applies to dataclasses.)

coerce_numbers_to_str: Whether to enable coercion of any Number type to str (not applicable in strict mode). strict: If True, strict validation is applied to the field.

See [Strict Mode](../concepts/strict_mode.md) for details.

gt: Greater than. If set, value must be greater than this. Only applicable to numbers. ge: Greater than or equal. If set, value must be greater than or equal to this. Only applicable to numbers. lt: Less than. If set, value must be less than this. Only applicable to numbers. le: Less than or equal. If set, value must be less than or equal to this. Only applicable to numbers. multiple_of: Value must be a multiple of this. Only applicable to numbers. min_length: Minimum length for iterables. max_length: Maximum length for iterables. pattern: Pattern for strings (a regular expression). allow_inf_nan: Allow inf, -inf, nan. Only applicable to numbers. max_digits: Maximum number of allow digits for strings. decimal_places: Maximum number of decimal places allowed for numbers. union_mode: The strategy to apply when validating a union. Can be smart (the default), or left_to_right.

See [Union Mode](../concepts/unions.md#union-modes) for details.

fail_fast: If True, validation will stop on the first error. If False, all validation errors will be collected.

This option can be applied only to iterable types (list, tuple, set, and frozenset).

extra: (Deprecated) Extra fields that will be included in the JSON schema.

!!! warning Deprecated

The extra kwargs is deprecated. Use json_schema_extra instead.

Returns:
A new [FieldInfo][pydantic.fields.FieldInfo]. The return annotation is Any so Field can be used on

type-annotated fields without causing a type error.

class messagebus.GenericCommand(*, message_id: ~uuid.UUID = <factory>, created_at: ~datetime.datetime = <factory>, metadata: ~messagebus.domain.model.TMetadata)

Baseclass for message of type command.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class messagebus.GenericEvent(*, message_id: ~uuid.UUID = <factory>, created_at: ~datetime.datetime = <factory>, metadata: ~messagebus.domain.model.TMetadata)

Baseclass for message of type event.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class messagebus.GenericModel(*, messages: ~collections.abc.MutableSequence[~messagebus.domain.model.Message] = <factory>)

Base class for model.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

messages: MutableSequence[Message]

List of messages consumed by the unit of work to mutate the repository.

Those message are ephemeral, published by event handler and consumed by the unit of work during the process of an original command.

class messagebus.Message(*, message_id: ~uuid.UUID = <factory>, created_at: ~datetime.datetime = <factory>, metadata: ~messagebus.domain.model.TMetadata)

Base class for messaging.

message_id: UUID

Unique identifier of the message.

created_at: datetime

Timestamp of the message.

All messages are kept in order for observability, debug and event replay.

metadata: TMetadata

Define extra fields used at serialization.

While serializing the message, a name and version must be defined to properly defined the message. Event if the class is renamed, those constants must be kept identically over the time in the codebase.

metadata are defined statically at the definition of the message.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class messagebus.Metadata(*, name: str, schema_version: int, published: bool = False)
name: str

Name of the schema.

schema_version: int

Version of the schema.

published: bool

Publish the event to an eventstream.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

messagebus.Model

alias of GenericModel[Metadata]

class messagebus.SyncAbstractEventstreamTransport

Transport a message to the event stream.

abstract send_message_serialized(message: Mapping[str, Any]) None

Publish a serialized message to the eventstream.

class messagebus.SyncAbstractRepository

Abstract Base Classe for Repository pattern.

seen: MutableSequence[TModel_contra]
class messagebus.SyncAbstractUnitOfWork

Abstract unit of work.

To implement a unit of work, the AsyncAbstractUnitOfWork.commit() and AsyncAbstractUnitOfWork.rollback() has to be defined, and some repositories has to be declared has attributes.

eventstore: SyncEventstoreAbstractRepository = <messagebus.service._sync.repository.SyncSinkholeEventstoreRepository object>
collect_new_events() Iterator[Message[Any]]
abstract commit() None

Commit the transation.

abstract rollback() None

Rollback the transation.

class messagebus.SyncEventstoreAbstractRepository(publisher: SyncEventstreamPublisher | None = None)
add(message: Message[Any]) None

Add the message to the storage backend and mark as seen

seen message will be sent to the eventstream only if the unit of work has properly commit the transaction. If the transaction is rollback, then, message will be dropped too from the eventstream.

publish_eventstream() None

Publish seen message to the eventstream.

class messagebus.SyncEventstreamPublisher(transport: ~messagebus.service._sync.eventstream.SyncAbstractEventstreamTransport, serializer: ~messagebus.service.eventstream.AbstractMessageSerializer = <messagebus.service.eventstream.MessageSerializer object>)

Publish a message to the event stream.

Parameters:
  • serializer – Used to serialize the Message

  • transport – Used to send the serialized message to the eventstream.

send_message(message: Message[Any]) None

Publish a message to the eventstream.

To publish a message in the eventstream, the flag parameter “published” of the metadata of the message must be set to true. By default, message are not pushed to the queue, given the control of private message, such as command, and public event, shared with eventstream consumers.

class messagebus.SyncMessageBus

Store all the handlers for commands an events.

add_listener(msg_type: type[Message[Any]], callback: Callable[[Any, Any], Any]) None
remove_listener(msg_type: type, callback: Callable[[Any, Any], Any]) None
handle(message: Message[Any], uow: SyncUnitOfWorkTransaction[TRepositories]) Any

Notify listener of that event registered with messagebus.add_listener. Return the first event from the command.

scan(*mods: str) None

Scan the module (or modules) containing service handlers.

when a message is handled by the bus, the bus propagate the message to hook functions, called Service Handler that receive the message, and a Unit Of Work to process it has a business transaction.

class messagebus.SyncSinkholeEventstoreRepository(publisher: SyncEventstreamPublisher | None = None)

An eventstore that drop all the message.

class messagebus.SyncSinkholeEventstreamTransport

Drop all messages.

By default, the events are not streamed until it is configured to do so.

send_message_serialized(message: Mapping[str, Any]) None

Do nothing.

class messagebus.SyncUnitOfWorkTransaction(uow: SyncAbstractUnitOfWork[TRepositories])
status: TransactionStatus
uow: SyncAbstractUnitOfWork[TRepositories]
property eventstore: SyncEventstoreAbstractRepository
commit() None
rollback() None
messagebus.async_listen(wrapped: Callable[[TMessage, TAsyncUow], Coroutine[Any, Any, Any]]) Callable[[TMessage, TAsyncUow], Coroutine[Any, Any, Any]]

Decorator to listen for a command or an event.

Note that you can handle one listener for a command, and many for events. The command handler result is returned by the handle call of the message bus.

messagebus.sync_listen(wrapped: Callable[[TMessage, TSyncUow], Any]) Callable[[TMessage, TSyncUow], Any]

Decorator to listen for a command or an event.

Note that you can handle one listener for a command, and many for events. The command handler result is returned by the handle call of the message bus.