messagebus API¶
messagebus API.
- class messagebus.GenericCommand(*, message_id: UUID = None, created_at: datetime = None, metadata: TMetadata)¶
Baseclass for message of type command.
- model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}¶
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[Dict[str, FieldInfo]] = {'created_at': FieldInfo(annotation=datetime, required=False, default_factory=builtin_function_or_method), 'message_id': FieldInfo(annotation=UUID, required=False, default_factory=builtin_function_or_method), 'metadata': FieldInfo(annotation=TypeVar, required=True)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__ from Pydantic V1.
- class messagebus.GenericEvent(*, message_id: UUID = None, created_at: datetime = None, metadata: TMetadata)¶
Baseclass for message of type event.
- model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}¶
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[Dict[str, FieldInfo]] = {'created_at': FieldInfo(annotation=datetime, required=False, default_factory=builtin_function_or_method), 'message_id': FieldInfo(annotation=UUID, required=False, default_factory=builtin_function_or_method), 'metadata': FieldInfo(annotation=TypeVar, required=True)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__ from Pydantic V1.
- class messagebus.GenericModel(*, messages: MutableSequence[Message] = None)¶
Base class for model.
- model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}¶
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[Dict[str, FieldInfo]] = {'messages': FieldInfo(annotation=MutableSequence[Message], required=False, default_factory=list)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__ from Pydantic V1.
- messagebus.Model¶
alias of
GenericModel[Metadata]
- messagebus.Field(default: Any = PydanticUndefined, *, default_factory: Callable[[], Any] | None = PydanticUndefined, alias: str | None = PydanticUndefined, alias_priority: int | None = PydanticUndefined, validation_alias: str | AliasPath | AliasChoices | None = PydanticUndefined, serialization_alias: str | None = PydanticUndefined, title: str | None = PydanticUndefined, field_title_generator: typing_extensions.Callable[[str, FieldInfo], str] | None = PydanticUndefined, description: str | None = PydanticUndefined, examples: list[Any] | None = PydanticUndefined, exclude: bool | None = PydanticUndefined, discriminator: str | types.Discriminator | None = PydanticUndefined, deprecated: Deprecated | str | bool | None = PydanticUndefined, json_schema_extra: JsonDict | Callable[[JsonDict], None] | None = PydanticUndefined, frozen: bool | None = PydanticUndefined, validate_default: bool | None = PydanticUndefined, repr: bool = PydanticUndefined, init: bool | None = PydanticUndefined, init_var: bool | None = PydanticUndefined, kw_only: bool | None = PydanticUndefined, pattern: str | Pattern[str] | None = PydanticUndefined, strict: bool | None = PydanticUndefined, coerce_numbers_to_str: bool | None = PydanticUndefined, gt: annotated_types.SupportsGt | None = PydanticUndefined, ge: annotated_types.SupportsGe | None = PydanticUndefined, lt: annotated_types.SupportsLt | None = PydanticUndefined, le: annotated_types.SupportsLe | None = PydanticUndefined, multiple_of: float | None = PydanticUndefined, allow_inf_nan: bool | None = PydanticUndefined, max_digits: int | None = PydanticUndefined, decimal_places: int | None = PydanticUndefined, min_length: int | None = PydanticUndefined, max_length: int | None = PydanticUndefined, union_mode: Literal['smart', 'left_to_right'] = PydanticUndefined, fail_fast: bool | None = PydanticUndefined, **extra: Unpack[_EmptyKwargs]) Any ¶
Usage docs: https://docs.pydantic.dev/2.9/concepts/fields
Create a field for objects that can be configured.
Used to provide extra information about a field, either for the model schema or complex validation. Some arguments apply only to number fields (int, float, Decimal) and some apply only to str.
- Note:
Any _Unset objects will be replaced by the corresponding value defined in the _DefaultValues dictionary. If a key for the _Unset object is not found in the _DefaultValues dictionary, it will default to None
- Args:
default: Default value if the field is not set. default_factory: A callable to generate the default value, such as
utcnow()
. alias: The name to use for the attribute when validating or serializing by alias.This is often used for things like converting between snake and camel case.
alias_priority: Priority of the alias. This affects whether an alias generator is used. validation_alias: Like alias, but only affects validation, not serialization. serialization_alias: Like alias, but only affects serialization, not validation. title: Human-readable title. field_title_generator: A callable that takes a field name and returns title for it. description: Human-readable description. examples: Example values for this field. exclude: Whether to exclude the field from the model serialization. discriminator: Field name or Discriminator for discriminating the type in a tagged union. deprecated: A deprecation message, an instance of warnings.deprecated or the typing_extensions.deprecated backport,
or a boolean. If True, a default deprecation message will be emitted when accessing the field.
json_schema_extra: A dict or callable to provide extra JSON schema properties. frozen: Whether the field is frozen. If true, attempts to change the value on an instance will raise an error. validate_default: If True, apply validation to the default value every time you create an instance.
Otherwise, for performance reasons, the default value of the field is trusted and not validated.
repr: A boolean indicating whether to include the field in the __repr__ output. init: Whether the field should be included in the constructor of the dataclass.
(Only applies to dataclasses.)
- init_var: Whether the field should _only_ be included in the constructor of the dataclass.
(Only applies to dataclasses.)
- kw_only: Whether the field should be a keyword-only argument in the constructor of the dataclass.
(Only applies to dataclasses.)
coerce_numbers_to_str: Whether to enable coercion of any Number type to str (not applicable in strict mode). strict: If True, strict validation is applied to the field.
See [Strict Mode](../concepts/strict_mode.md) for details.
gt: Greater than. If set, value must be greater than this. Only applicable to numbers. ge: Greater than or equal. If set, value must be greater than or equal to this. Only applicable to numbers. lt: Less than. If set, value must be less than this. Only applicable to numbers. le: Less than or equal. If set, value must be less than or equal to this. Only applicable to numbers. multiple_of: Value must be a multiple of this. Only applicable to numbers. min_length: Minimum length for iterables. max_length: Maximum length for iterables. pattern: Pattern for strings (a regular expression). allow_inf_nan: Allow inf, -inf, nan. Only applicable to numbers. max_digits: Maximum number of allow digits for strings. decimal_places: Maximum number of decimal places allowed for numbers. union_mode: The strategy to apply when validating a union. Can be smart (the default), or left_to_right.
See [Union Mode](../concepts/unions.md#union-modes) for details.
- fail_fast: If True, validation will stop on the first error. If False, all validation errors will be collected.
This option can be applied only to iterable types (list, tuple, set, and frozenset).
extra: (Deprecated) Extra fields that will be included in the JSON schema.
- !!! warning Deprecated
The extra kwargs is deprecated. Use json_schema_extra instead.
- Returns:
- A new [FieldInfo][pydantic.fields.FieldInfo]. The return annotation is Any so Field can be used on
type-annotated fields without causing a type error.
- class messagebus.Message(*, message_id: UUID = None, created_at: datetime = None, metadata: TMetadata)¶
Base class for messaging.
- message_id: UUID¶
Unique identifier of the message.
- created_at: datetime¶
Timestamp of the message.
All messages are kept in order for observability, debug and event replay.
- metadata: TMetadata¶
Define extra fields used at serialization.
While serializing the message, a name and version must be defined to properly defined the message. Event if the class is renamed, those constants must be kept identically over the time in the codebase.
metadata are defined statically at the definition of the message.
- model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}¶
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[Dict[str, FieldInfo]] = {'created_at': FieldInfo(annotation=datetime, required=False, default_factory=builtin_function_or_method), 'message_id': FieldInfo(annotation=UUID, required=False, default_factory=builtin_function_or_method), 'metadata': FieldInfo(annotation=TypeVar, required=True)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__ from Pydantic V1.
- class messagebus.Metadata(*, name: str, schema_version: int, published: bool = False)¶
- name: str¶
Name of the schema.
- schema_version: int¶
Version of the schema.
- published: bool¶
Publish the event to an eventstream.
- model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}¶
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[Dict[str, FieldInfo]] = {'name': FieldInfo(annotation=str, required=True), 'published': FieldInfo(annotation=bool, required=False, default=False), 'schema_version': FieldInfo(annotation=int, required=True)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__ from Pydantic V1.
- messagebus.Command¶
alias of
GenericCommand[Metadata]
- messagebus.Event¶
alias of
GenericEvent[Metadata]
- class messagebus.AsyncAbstractRepository¶
Abstract Base Classe for Repository pattern.
- seen: MutableSequence[TModel_contra]¶
- class messagebus.SyncAbstractRepository¶
Abstract Base Classe for Repository pattern.
- seen: MutableSequence[TModel_contra]¶
- class messagebus.AsyncAbstractUnitOfWork¶
Abstract unit of work.
To implement a unit of work, the
AsyncAbstractUnitOfWork.commit()
andAsyncAbstractUnitOfWork.rollback()
has to be defined, and some repositories has to be declared has attributes.- eventstore: AsyncEventstoreAbstractRepository = <messagebus.service._async.repository.AsyncSinkholeEventstoreRepository object>¶
- abstract async commit() None ¶
Commit the transation.
- abstract async rollback() None ¶
Rollback the transation.
- class messagebus.AsyncUnitOfWorkTransaction(uow: AsyncAbstractUnitOfWork[TRepositories])¶
- status: TransactionStatus¶
- uow: AsyncAbstractUnitOfWork[TRepositories]¶
- property eventstore: AsyncEventstoreAbstractRepository¶
- async commit() None ¶
- async rollback() None ¶
- class messagebus.SyncAbstractUnitOfWork¶
Abstract unit of work.
To implement a unit of work, the
AsyncAbstractUnitOfWork.commit()
andAsyncAbstractUnitOfWork.rollback()
has to be defined, and some repositories has to be declared has attributes.- eventstore: SyncEventstoreAbstractRepository = <messagebus.service._sync.repository.SyncSinkholeEventstoreRepository object>¶
- abstract commit() None ¶
Commit the transation.
- abstract rollback() None ¶
Rollback the transation.
- class messagebus.SyncUnitOfWorkTransaction(uow: SyncAbstractUnitOfWork[TRepositories])¶
- status: TransactionStatus¶
- uow: SyncAbstractUnitOfWork[TRepositories]¶
- property eventstore: SyncEventstoreAbstractRepository¶
- commit() None ¶
- rollback() None ¶
- class messagebus.AsyncEventstoreAbstractRepository(publisher: AsyncEventstreamPublisher | None = None)¶
- async add(message: Message[Any]) None ¶
Add the message to the storage backend and mark as seen
seen message will be sent to the eventstream only if the unit of work has properly commit the transaction. If the transaction is rollback, then, message will be dropped too from the eventstream.
- async publish_eventstream() None ¶
Publish seen message to the eventstream.
- class messagebus.SyncEventstoreAbstractRepository(publisher: SyncEventstreamPublisher | None = None)¶
- add(message: Message[Any]) None ¶
Add the message to the storage backend and mark as seen
seen message will be sent to the eventstream only if the unit of work has properly commit the transaction. If the transaction is rollback, then, message will be dropped too from the eventstream.
- publish_eventstream() None ¶
Publish seen message to the eventstream.
- class messagebus.AsyncSinkholeEventstoreRepository(publisher: AsyncEventstreamPublisher | None = None)¶
An eventstore that drop all the message.
- class messagebus.SyncSinkholeEventstoreRepository(publisher: SyncEventstreamPublisher | None = None)¶
An eventstore that drop all the message.
- messagebus.async_listen(wrapped: Callable[[TMessage, TAsyncUow], Coroutine[Any, Any, Any]]) Callable[[TMessage, TAsyncUow], Coroutine[Any, Any, Any]] ¶
Decorator to listen for a command or an event.
Note that you can handle one listener for a command, and many for events. The command handler result is returned by the handle call of the message bus.
- messagebus.sync_listen(wrapped: Callable[[TMessage, TSyncUow], Any]) Callable[[TMessage, TSyncUow], Any] ¶
Decorator to listen for a command or an event.
Note that you can handle one listener for a command, and many for events. The command handler result is returned by the handle call of the message bus.
- class messagebus.AsyncMessageBus¶
Store all the handlers for commands an events.
- add_listener(msg_type: type[Message[Any]], callback: Callable[[Any, Any], Coroutine[Any, Any, Any]]) None ¶
- remove_listener(msg_type: type, callback: Callable[[Any, Any], Coroutine[Any, Any, Any]]) None ¶
- async handle(message: Message[Any], uow: AsyncUnitOfWorkTransaction[TRepositories]) Any ¶
Notify listener of that event registered with messagebus.add_listener. Return the first event from the command.
- scan(*mods: str) None ¶
Scan the module (or modules) containing service handlers.
when a message is handled by the bus, the bus propagate the message to hook functions, called Service Handler that receive the message, and a Unit Of Work to process it has a business transaction.
- class messagebus.SyncMessageBus¶
Store all the handlers for commands an events.
- remove_listener(msg_type: type, callback: Callable[[Any, Any], Any]) None ¶
- handle(message: Message[Any], uow: SyncUnitOfWorkTransaction[TRepositories]) Any ¶
Notify listener of that event registered with messagebus.add_listener. Return the first event from the command.
- scan(*mods: str) None ¶
Scan the module (or modules) containing service handlers.
when a message is handled by the bus, the bus propagate the message to hook functions, called Service Handler that receive the message, and a Unit Of Work to process it has a business transaction.
- class messagebus.AbstractMessageSerializer¶
The message serializer take the message and return it with python native types, that are serializable for a transport.
- class messagebus.AsyncAbstractEventstreamTransport¶
Transport a message to the event stream.
- abstract async send_message_serialized(message: Mapping[str, Any]) None ¶
Publish a serialized message to the eventstream.
- class messagebus.AsyncSinkholeEventstreamTransport¶
Drop all messages.
By default, the events are not streamed until it is configured to do so.
- async send_message_serialized(message: Mapping[str, Any]) None ¶
Do nothing.
- class messagebus.AsyncEventstreamPublisher(transport: ~messagebus.service._async.eventstream.AsyncAbstractEventstreamTransport, serializer: ~messagebus.service.eventstream.AbstractMessageSerializer = <messagebus.service.eventstream.MessageSerializer object>)¶
Publish a message to the event stream.
- Parameters:
serializer – Used to serialize the Message
transport – Used to send the serialized message to the eventstream.
- async send_message(message: Message[Any]) None ¶
Publish a message to the eventstream.
To publish a message in the eventstream, the flag parameter “published” of the metadata of the message must be set to true. By default, message are not pushed to the queue, given the control of private message, such as command, and public event, shared with eventstream consumers.
- class messagebus.SyncAbstractEventstreamTransport¶
Transport a message to the event stream.
- abstract send_message_serialized(message: Mapping[str, Any]) None ¶
Publish a serialized message to the eventstream.
- class messagebus.SyncSinkholeEventstreamTransport¶
Drop all messages.
By default, the events are not streamed until it is configured to do so.
- send_message_serialized(message: Mapping[str, Any]) None ¶
Do nothing.
- class messagebus.SyncEventstreamPublisher(transport: ~messagebus.service._sync.eventstream.SyncAbstractEventstreamTransport, serializer: ~messagebus.service.eventstream.AbstractMessageSerializer = <messagebus.service.eventstream.MessageSerializer object>)¶
Publish a message to the event stream.
- Parameters:
serializer – Used to serialize the Message
transport – Used to send the serialized message to the eventstream.
- send_message(message: Message[Any]) None ¶
Publish a message to the eventstream.
To publish a message in the eventstream, the flag parameter “published” of the metadata of the message must be set to true. By default, message are not pushed to the queue, given the control of private message, such as command, and public event, shared with eventstream consumers.