jeepito API#

Jeepito API.

class jeepito.Model(*, messages: MutableSequence[jeepito.domain.model.Message] = None)#

Base class for model.

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'messages': FieldInfo(annotation=MutableSequence[Message], required=False, default_factory=list)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

messages: MutableSequence[jeepito.domain.model.Message]#

List of messages consumed by the unit of work to mutate the repository.

Those message are ephemeral, published by event handler and consumed by the unit of work during the process of an original command.

jeepito.Field(default: Any = PydanticUndefined, *, default_factory: typing.Callable[[], Any] | None = PydanticUndefined, alias: str | None = PydanticUndefined, alias_priority: int | None = PydanticUndefined, validation_alias: str | AliasPath | AliasChoices | None = PydanticUndefined, serialization_alias: str | None = PydanticUndefined, title: str | None = PydanticUndefined, description: str | None = PydanticUndefined, examples: list[Any] | None = PydanticUndefined, exclude: bool | None = PydanticUndefined, discriminator: str | types.Discriminator | None = PydanticUndefined, json_schema_extra: JsonDict | typing.Callable[[JsonDict], None] | None = PydanticUndefined, frozen: bool | None = PydanticUndefined, validate_default: bool | None = PydanticUndefined, repr: bool = PydanticUndefined, init_var: bool | None = PydanticUndefined, kw_only: bool | None = PydanticUndefined, pattern: str | None = PydanticUndefined, strict: bool | None = PydanticUndefined, gt: float | None = PydanticUndefined, ge: float | None = PydanticUndefined, lt: float | None = PydanticUndefined, le: float | None = PydanticUndefined, multiple_of: float | None = PydanticUndefined, allow_inf_nan: bool | None = PydanticUndefined, max_digits: int | None = PydanticUndefined, decimal_places: int | None = PydanticUndefined, min_length: int | None = PydanticUndefined, max_length: int | None = PydanticUndefined, union_mode: Literal['smart', 'left_to_right'] = PydanticUndefined, **extra: Unpack[_EmptyKwargs]) Any#

Usage docs: https://docs.pydantic.dev/2.5/concepts/fields

Create a field for objects that can be configured.

Used to provide extra information about a field, either for the model schema or complex validation. Some arguments apply only to number fields (int, float, Decimal) and some apply only to str.

Note:
  • Any _Unset objects will be replaced by the corresponding value defined in the _DefaultValues dictionary. If a key for the _Unset object is not found in the _DefaultValues dictionary, it will default to None

Args:

default: Default value if the field is not set. default_factory: A callable to generate the default value, such as utcnow(). alias: An alternative name for the attribute. alias_priority: Priority of the alias. This affects whether an alias generator is used. validation_alias: ‘Whitelist’ validation step. The field will be the single one allowed by the alias or set of

aliases defined.

serialization_alias: ‘Blacklist’ validation step. The vanilla field will be the single one of the alias’ or set

of aliases’ fields and all the other fields will be ignored at serialization time.

title: Human-readable title. description: Human-readable description. examples: Example values for this field. exclude: Whether to exclude the field from the model serialization. discriminator: Field name or Discriminator for discriminating the type in a tagged union. json_schema_extra: Any additional JSON schema data for the schema property. frozen: Whether the field is frozen. validate_default: Run validation that isn’t only checking existence of defaults. This can be set to True or False. If not set, it defaults to None. repr: A boolean indicating whether to include the field in the __repr__ output. init_var: Whether the field should be included in the constructor of the dataclass. kw_only: Whether the field should be a keyword-only argument in the constructor of the dataclass. strict: If True, strict validation is applied to the field.

See [Strict Mode](../concepts/strict_mode.md) for details.

gt: Greater than. If set, value must be greater than this. Only applicable to numbers. ge: Greater than or equal. If set, value must be greater than or equal to this. Only applicable to numbers. lt: Less than. If set, value must be less than this. Only applicable to numbers. le: Less than or equal. If set, value must be less than or equal to this. Only applicable to numbers. multiple_of: Value must be a multiple of this. Only applicable to numbers. min_length: Minimum length for strings. max_length: Maximum length for strings. pattern: Pattern for strings. allow_inf_nan: Allow inf, -inf, nan. Only applicable to numbers. max_digits: Maximum number of allow digits for strings. decimal_places: Maximum number of decimal places allowed for numbers. union_mode: The strategy to apply when validating a union. Can be smart (the default), or left_to_right.

See [Union Mode](standard_library_types.md#union-mode) for details.

extra: Include extra fields used by the JSON schema.

!!! warning Deprecated

The extra kwargs is deprecated. Use json_schema_extra instead.

Returns:
A new [FieldInfo][pydantic.fields.FieldInfo], the return annotation is Any so Field can be used on

type annotated fields without causing a typing error.

class jeepito.Message(*, message_id: str = None, created_at: datetime.datetime = None, metadata: jeepito.domain.model.Metadata)#

Base class for messaging.

message_id: str#

Unique identifier of the message.

created_at: datetime.datetime#

Timestamp of the message.

All messages are kept in order for observability, debug and event replay.

metadata: jeepito.domain.model.Metadata#

Define extra fields used at serialization.

While serializing the message, a name and version must be defined to properly defined the message. Event if the class is renamed, those constants must be kept identically over the time in the codebase.

metadata are defined statically at the definition of the message.

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'created_at': FieldInfo(annotation=datetime, required=False, default_factory=builtin_function_or_method), 'message_id': FieldInfo(annotation=str, required=False, default_factory=generate_id), 'metadata': FieldInfo(annotation=Metadata, required=True)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class jeepito.Metadata(*, name: str, schema_version: int, published: bool = False)#
name: str#

Name of the schema.

schema_version: int#

Version of the schema.

published: bool#

Publish the event to an eventstream.

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'name': FieldInfo(annotation=str, required=True), 'published': FieldInfo(annotation=bool, required=False, default=False), 'schema_version': FieldInfo(annotation=int, required=True)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class jeepito.Command(*, message_id: str = None, created_at: datetime.datetime = None, metadata: jeepito.domain.model.Metadata)#

Baseclass for message of type command.

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'created_at': FieldInfo(annotation=datetime, required=False, default_factory=builtin_function_or_method), 'message_id': FieldInfo(annotation=str, required=False, default_factory=generate_id), 'metadata': FieldInfo(annotation=Metadata, required=True)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class jeepito.Event(*, message_id: str = None, created_at: datetime.datetime = None, metadata: jeepito.domain.model.Metadata)#

Baseclass for message of type event.

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'created_at': FieldInfo(annotation=datetime, required=False, default_factory=builtin_function_or_method), 'message_id': FieldInfo(annotation=str, required=False, default_factory=generate_id), 'metadata': FieldInfo(annotation=Metadata, required=True)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class jeepito.AsyncAbstractRepository#

Abstract Base Classe for Repository pattern.

seen: MutableSequence[jeepito.service._async.repository.TModel_contra]#
class jeepito.SyncAbstractRepository#

Abstract Base Classe for Repository pattern.

seen: MutableSequence[jeepito.service._sync.repository.TModel_contra]#
class jeepito.AsyncAbstractUnitOfWork#

Abstract unit of work.

To implement a unit of work, the AsyncAbstractUnitOfWork.commit() and AsyncAbstractUnitOfWork.rollback() has to be defined, and some repositories has to be declared has attributes.

eventstore: jeepito.service._async.repository.AsyncEventstoreAbstractRepository = <jeepito.service._async.repository.AsyncSinkholeEventstoreRepository object>#
collect_new_events() Iterator[jeepito.domain.model.Message]#
abstract async commit() None#

Commit the transation.

abstract async rollback() None#

Rollback the transation.

class jeepito.AsyncUnitOfWorkTransaction(uow: jeepito.service._async.unit_of_work.AsyncAbstractUnitOfWork[jeepito.service._async.unit_of_work.TRepositories])#
status: jeepito.service._async.unit_of_work.TransactionStatus#
uow: jeepito.service._async.unit_of_work.AsyncAbstractUnitOfWork[jeepito.service._async.unit_of_work.TRepositories]#
property eventstore: jeepito.service._async.repository.AsyncEventstoreAbstractRepository#
async commit() None#
async rollback() None#
class jeepito.SyncAbstractUnitOfWork#

Abstract unit of work.

To implement a unit of work, the AsyncAbstractUnitOfWork.commit() and AsyncAbstractUnitOfWork.rollback() has to be defined, and some repositories has to be declared has attributes.

eventstore: jeepito.service._sync.repository.SyncEventstoreAbstractRepository = <jeepito.service._sync.repository.SyncSinkholeEventstoreRepository object>#
collect_new_events() Iterator[jeepito.domain.model.Message]#
abstract commit() None#

Commit the transation.

abstract rollback() None#

Rollback the transation.

class jeepito.SyncUnitOfWorkTransaction(uow: jeepito.service._sync.unit_of_work.SyncAbstractUnitOfWork[jeepito.service._sync.unit_of_work.TRepositories])#
status: jeepito.service._sync.unit_of_work.TransactionStatus#
uow: jeepito.service._sync.unit_of_work.SyncAbstractUnitOfWork[jeepito.service._sync.unit_of_work.TRepositories]#
property eventstore: jeepito.service._sync.repository.SyncEventstoreAbstractRepository#
commit() None#
rollback() None#
class jeepito.AsyncEventstoreAbstractRepository(publisher: Optional[jeepito.service._async.eventstream.AsyncEventstreamPublisher] = None)#
async add(message: jeepito.domain.model.Message) None#

Add the message to the storage backend and mark as seen

seen message will be sent to the eventstream only if the unit of work has properly commit the transaction. If the transaction is rollback, then, message will be dropped too from the eventstream.

async publish_eventstream() None#

Publish seen message to the eventstream.

class jeepito.SyncEventstoreAbstractRepository(publisher: Optional[jeepito.service._sync.eventstream.SyncEventstreamPublisher] = None)#
add(message: jeepito.domain.model.Message) None#

Add the message to the storage backend and mark as seen

seen message will be sent to the eventstream only if the unit of work has properly commit the transaction. If the transaction is rollback, then, message will be dropped too from the eventstream.

publish_eventstream() None#

Publish seen message to the eventstream.

class jeepito.AsyncSinkholeEventstoreRepository(publisher: Optional[jeepito.service._async.eventstream.AsyncEventstreamPublisher] = None)#

An eventstore that drop all the message.

class jeepito.SyncSinkholeEventstoreRepository(publisher: Optional[jeepito.service._sync.eventstream.SyncEventstreamPublisher] = None)#

An eventstore that drop all the message.

jeepito.async_listen(wrapped: Callable[[jeepito.typing.TMessage, jeepito.typing.TAsyncUow], Coroutine[Any, Any, Any]]) Callable[[jeepito.typing.TMessage, jeepito.typing.TAsyncUow], Coroutine[Any, Any, Any]]#

Decorator to listen for a command or an event.

Note that you can handle one listener for a command, and many for events. The command handler result is returned by the handle call of the message bus.

jeepito.sync_listen(wrapped: Callable[[jeepito.typing.TMessage, jeepito.typing.TSyncUow], Any]) Callable[[jeepito.typing.TMessage, jeepito.typing.TSyncUow], Any]#

Decorator to listen for a command or an event.

Note that you can handle one listener for a command, and many for events. The command handler result is returned by the handle call of the message bus.

class jeepito.AsyncMessageBus#

Store all the handlers for commands an events.

add_listener(msg_type: Type[jeepito.domain.model.Message], callback: Callable[[Any, Any], Coroutine[Any, Any, Any]]) None#
remove_listener(msg_type: type, callback: Callable[[Any, Any], Coroutine[Any, Any, Any]]) None#
async handle(message: jeepito.domain.model.Message, uow: jeepito.service._async.unit_of_work.AsyncUnitOfWorkTransaction[jeepito.service._async.unit_of_work.TRepositories]) Any#

Notify listener of that event registered with jeepito.add_listener. Return the first event from the command.

scan(*mods: str) None#

Scan the module (or modules) containing service handlers.

when a message is handled by the bus, the bus propagate the message to hook functions, called Service Handler that receive the message, and a Unit Of Work to process it has a business transaction.

class jeepito.SyncMessageBus#

Store all the handlers for commands an events.

add_listener(msg_type: Type[jeepito.domain.model.Message], callback: Callable[[Any, Any], Any]) None#
remove_listener(msg_type: type, callback: Callable[[Any, Any], Any]) None#
handle(message: jeepito.domain.model.Message, uow: jeepito.service._sync.unit_of_work.SyncUnitOfWorkTransaction[jeepito.service._sync.unit_of_work.TRepositories]) Any#

Notify listener of that event registered with jeepito.add_listener. Return the first event from the command.

scan(*mods: str) None#

Scan the module (or modules) containing service handlers.

when a message is handled by the bus, the bus propagate the message to hook functions, called Service Handler that receive the message, and a Unit Of Work to process it has a business transaction.

class jeepito.AbstractMessageSerializer#

The message serializer take the message and return it with python native types, that are serializable for a transport.

abstract serialize_message(message: jeepito.domain.model.Message) Mapping[str, Any]#

Publish a message to the eventstream.

class jeepito.AsyncAbstractEventstreamTransport#

Transport a message to the event stream.

abstract async send_message_serialized(message: Mapping[str, Any]) None#

Publish a serialized message to the eventstream.

class jeepito.AsyncSinkholeEventstreamTransport#

Drop all messages.

By default, the events are not streamed until it is configured to do so.

async send_message_serialized(message: Mapping[str, Any]) None#

Do nothing.

class jeepito.AsyncEventstreamPublisher(transport: jeepito.service._async.eventstream.AsyncAbstractEventstreamTransport, serializer: jeepito.service.eventstream.AbstractMessageSerializer = <jeepito.service.eventstream.MessageSerializer object>)#

Publish a message to the event stream.

Parameters
  • serializer – Used to serialize the Message

  • transport – Used to send the serialized message to the eventstream.

async send_message(message: jeepito.domain.model.Message) None#

Publish a message to the eventstream.

To publish a message in the eventstream, the flag parameter “published” of the metadata of the message must be set to true. By default, message are not pushed to the queue, given the control of private message, such as command, and public event, shared with eventstream consumers.

class jeepito.SyncAbstractEventstreamTransport#

Transport a message to the event stream.

abstract send_message_serialized(message: Mapping[str, Any]) None#

Publish a serialized message to the eventstream.

class jeepito.SyncSinkholeEventstreamTransport#

Drop all messages.

By default, the events are not streamed until it is configured to do so.

send_message_serialized(message: Mapping[str, Any]) None#

Do nothing.

class jeepito.SyncEventstreamPublisher(transport: jeepito.service._sync.eventstream.SyncAbstractEventstreamTransport, serializer: jeepito.service.eventstream.AbstractMessageSerializer = <jeepito.service.eventstream.MessageSerializer object>)#

Publish a message to the event stream.

Parameters
  • serializer – Used to serialize the Message

  • transport – Used to send the serialized message to the eventstream.

send_message(message: jeepito.domain.model.Message) None#

Publish a message to the eventstream.

To publish a message in the eventstream, the flag parameter “published” of the metadata of the message must be set to true. By default, message are not pushed to the queue, given the control of private message, such as command, and public event, shared with eventstream consumers.