demessaging.config.messaging module

Messaging configuration classes for DASF.

Classes:

BaseMessagingConfig([_case_sensitive, ...])

Base class for messaging configs.

PulsarConfig([_case_sensitive, _env_prefix, ...])

A configuration class to connect to the pulsar messaging framework.

WebsocketURLConfig([_case_sensitive, ...])

A configuration for a websocket.

class demessaging.config.messaging.BaseMessagingConfig(_case_sensitive: bool | None = None, _env_prefix: str | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_ignore_empty: bool | None = None, _env_nested_delimiter: str | None = None, _env_parse_none_str: str | None = None, _secrets_dir: str | Path | None = None, *, topic: str, header: Dict[str, Any] | Dict[str, Any] = None, max_workers: int | None = None, queue_size: int | None = None, max_payload_size: int = 512000, producer_keep_alive: int = 120, producer_connection_timeout: int = 30)[source]

Bases: BaseSettings

Base class for messaging configs.

Parameters:
  • topic (str) – The topic identifier under which to register at the pulsar.

  • header (Union[Annotated[Dict[str, Any], Json], Dict[str, Any]]) – Header parameters for the request

  • max_workers (Optional[Annotated[int, Gt(gt=0)]]) – (optional) number of concurrent workers for handling requests, default: number of processors on the machine, multiplied by 5.

  • queue_size (Optional[Annotated[int, Gt(gt=0)]]) – (optional) size of the request queue, if MAX_WORKERS is set, this needs to be at least as big as MAX_WORKERS, otherwise an AttributeException is raised.

  • max_payload_size (int) – (optional) maximum payload size, must be smaller than pulsars ‘webSocketMaxTextFrameSize’, which is configured e.g.via ‘pulsar/conf/standalone.conf’.default: 512000 (500kb).

  • producer_keep_alive (int) – The amount of time that the websocket connection to a producer should be kept open. By default, 2 minutes (120 seconds). On each outgoing message, the timer will be reset. Set this to 0 to immediately close the connection when a message has been sent and acknowledged.

  • producer_connection_timeout (int) – The amount of time that we grant producers to establish a connection to the message broker in order to send a response. If a connection cannot be established in this time, the response will not be sent and the connection will be closed.

Methods:

get_topic_url(topic[, subscription])

Build the URL to connect to a websocket.

model_post_init(__context)

This function is meant to behave like a BaseModel method to initialise private attributes.

validate_queue_size()

Check that the queue_size is smaller than the max_workers.

Attributes:

header

max_payload_size

max_workers

model_computed_fields

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

producer

The connected producer for the messaging config

producer_connection_timeout

producer_keep_alive

queue_size

topic

get_topic_url(topic: str, subscription: str | None = None) str[source]

Build the URL to connect to a websocket.

header: Json[Dict[str, Any]] | Dict[str, Any]
max_payload_size: int
max_workers: PositiveInt | None
model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_parse_none_str': None, 'env_prefix': 'de_backend_', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'protected_namespaces': ('model_', 'settings_'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_file': None, 'yaml_file_encoding': None}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'header': FieldInfo(annotation=Union[Annotated[Dict[str, Any], Json], Dict[str, Any]], required=False, default_factory=dict, description='Header parameters for the request'), 'max_payload_size': FieldInfo(annotation=int, required=False, default=512000, description="(optional) maximum payload size, must be smaller than pulsars 'webSocketMaxTextFrameSize', which is configured e.g.via 'pulsar/conf/standalone.conf'.default: 512000 (500kb)."), 'max_workers': FieldInfo(annotation=Union[Annotated[int, Gt], NoneType], required=False, default=None, description='(optional) number of concurrent workers for handling requests, default: number of processors on the machine, multiplied by 5.'), 'producer_connection_timeout': FieldInfo(annotation=int, required=False, default=30, description='The amount of time that we grant producers to establish a connection to the message broker in order to send a response. If a connection cannot be established in this time, the response will not be sent and the connection will be closed.'), 'producer_keep_alive': FieldInfo(annotation=int, required=False, default=120, description='The amount of time that the websocket connection to a producer should be kept open. By default, 2 minutes (120 seconds). On each outgoing message, the timer will be reset. Set this to 0 to immediately close the connection when a message has been sent and acknowledged.'), 'queue_size': FieldInfo(annotation=Union[Annotated[int, Gt], NoneType], required=False, default=None, description='(optional) size of the request queue, if MAX_WORKERS is set, this needs to be at least as big as MAX_WORKERS, otherwise an AttributeException is raised.'), 'topic': FieldInfo(annotation=str, required=True, description='The topic identifier under which to register at the pulsar.')}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

model_post_init(__context: Any) None

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Parameters:
  • self – The BaseModel instance.

  • __context – The context.

property producer: MessageProducer

The connected producer for the messaging config

producer_connection_timeout: int
producer_keep_alive: int
queue_size: PositiveInt | None
topic: str
validate_queue_size()[source]

Check that the queue_size is smaller than the max_workers.

class demessaging.config.messaging.PulsarConfig(_case_sensitive: bool | None = None, _env_prefix: str | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_ignore_empty: bool | None = None, _env_nested_delimiter: str | None = None, _env_parse_none_str: str | None = None, _secrets_dir: str | Path | None = None, *, topic: str, header: Dict[str, Any] | Dict[str, Any] = None, max_workers: int | None = None, queue_size: int | None = None, max_payload_size: int = 512000, producer_keep_alive: int = 120, producer_connection_timeout: int = 30, host: str = 'localhost', port: str = '8080', persistent: str = 'non-persistent', tenant: str = 'public', namespace: str = 'default')[source]

Bases: BaseMessagingConfig

A configuration class to connect to the pulsar messaging framework.

Parameters:
  • topic (str) – The topic identifier under which to register at the pulsar.

  • header (Union[Annotated[Dict[str, Any], Json], Dict[str, Any]]) – Header parameters for the request

  • max_workers (Optional[Annotated[int, Gt(gt=0)]]) – (optional) number of concurrent workers for handling requests, default: number of processors on the machine, multiplied by 5.

  • queue_size (Optional[Annotated[int, Gt(gt=0)]]) – (optional) size of the request queue, if MAX_WORKERS is set, this needs to be at least as big as MAX_WORKERS, otherwise an AttributeException is raised.

  • max_payload_size (int) – (optional) maximum payload size, must be smaller than pulsars ‘webSocketMaxTextFrameSize’, which is configured e.g.via ‘pulsar/conf/standalone.conf’.default: 512000 (500kb).

  • producer_keep_alive (int) – The amount of time that the websocket connection to a producer should be kept open. By default, 2 minutes (120 seconds). On each outgoing message, the timer will be reset. Set this to 0 to immediately close the connection when a message has been sent and acknowledged.

  • producer_connection_timeout (int) – The amount of time that we grant producers to establish a connection to the message broker in order to send a response. If a connection cannot be established in this time, the response will not be sent and the connection will be closed.

  • host (str) – The remote host of the pulsar.

  • port (str) – The port of the pulsar at the given host.

  • persistent (str) – None

  • tenant (str) – None

  • namespace (str) – None

Methods:

get_topic_url(topic[, subscription])

Build the URL to connect to a websocket.

model_post_init(_ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Attributes:

host

model_computed_fields

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

namespace

persistent

port

tenant

get_topic_url(topic: str, subscription: str | None = None) str[source]

Build the URL to connect to a websocket.

header: Json[Dict[str, Any]] | Dict[str, Any]
host: str
max_payload_size: int
max_workers: PositiveInt | None
model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_parse_none_str': None, 'env_prefix': 'de_backend_', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'protected_namespaces': ('model_', 'settings_'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_file': None, 'yaml_file_encoding': None}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'header': FieldInfo(annotation=Union[Annotated[Dict[str, Any], Json], Dict[str, Any]], required=False, default_factory=dict, description='Header parameters for the request'), 'host': FieldInfo(annotation=str, required=False, default='localhost', description='The remote host of the pulsar.'), 'max_payload_size': FieldInfo(annotation=int, required=False, default=512000, description="(optional) maximum payload size, must be smaller than pulsars 'webSocketMaxTextFrameSize', which is configured e.g.via 'pulsar/conf/standalone.conf'.default: 512000 (500kb)."), 'max_workers': FieldInfo(annotation=Union[Annotated[int, Gt], NoneType], required=False, default=None, description='(optional) number of concurrent workers for handling requests, default: number of processors on the machine, multiplied by 5.'), 'namespace': FieldInfo(annotation=str, required=False, default='default'), 'persistent': FieldInfo(annotation=str, required=False, default='non-persistent'), 'port': FieldInfo(annotation=str, required=False, default='8080', description='The port of the pulsar at the given :attr:`host`.'), 'producer_connection_timeout': FieldInfo(annotation=int, required=False, default=30, description='The amount of time that we grant producers to establish a connection to the message broker in order to send a response. If a connection cannot be established in this time, the response will not be sent and the connection will be closed.'), 'producer_keep_alive': FieldInfo(annotation=int, required=False, default=120, description='The amount of time that the websocket connection to a producer should be kept open. By default, 2 minutes (120 seconds). On each outgoing message, the timer will be reset. Set this to 0 to immediately close the connection when a message has been sent and acknowledged.'), 'queue_size': FieldInfo(annotation=Union[Annotated[int, Gt], NoneType], required=False, default=None, description='(optional) size of the request queue, if MAX_WORKERS is set, this needs to be at least as big as MAX_WORKERS, otherwise an AttributeException is raised.'), 'tenant': FieldInfo(annotation=str, required=False, default='public'), 'topic': FieldInfo(annotation=str, required=True, description='The topic identifier under which to register at the pulsar.')}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

model_post_init(_ModelMetaclass__context: Any) None

We need to both initialize private attributes and call the user-defined model_post_init method.

namespace: str
persistent: str
port: str
producer_connection_timeout: int
producer_keep_alive: int
queue_size: PositiveInt | None
tenant: str
topic: str
class demessaging.config.messaging.WebsocketURLConfig(_case_sensitive: bool | None = None, _env_prefix: str | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_ignore_empty: bool | None = None, _env_nested_delimiter: str | None = None, _env_parse_none_str: str | None = None, _secrets_dir: str | Path | None = None, *, topic: str, header: Dict[str, Any] | Dict[str, Any] = None, max_workers: int | None = None, queue_size: int | None = None, max_payload_size: int = 512000, producer_keep_alive: int = 120, producer_connection_timeout: int = 30, websocket_url: str = '', producer_url: str | None = None, consumer_url: str | None = None)[source]

Bases: BaseMessagingConfig

A configuration for a websocket.

Parameters:
  • topic (str) – The topic identifier under which to register at the pulsar.

  • header (Union[Annotated[Dict[str, Any], Json], Dict[str, Any]]) – Header parameters for the request

  • max_workers (Optional[Annotated[int, Gt(gt=0)]]) – (optional) number of concurrent workers for handling requests, default: number of processors on the machine, multiplied by 5.

  • queue_size (Optional[Annotated[int, Gt(gt=0)]]) – (optional) size of the request queue, if MAX_WORKERS is set, this needs to be at least as big as MAX_WORKERS, otherwise an AttributeException is raised.

  • max_payload_size (int) – (optional) maximum payload size, must be smaller than pulsars ‘webSocketMaxTextFrameSize’, which is configured e.g.via ‘pulsar/conf/standalone.conf’.default: 512000 (500kb).

  • producer_keep_alive (int) – The amount of time that the websocket connection to a producer should be kept open. By default, 2 minutes (120 seconds). On each outgoing message, the timer will be reset. Set this to 0 to immediately close the connection when a message has been sent and acknowledged.

  • producer_connection_timeout (int) – The amount of time that we grant producers to establish a connection to the message broker in order to send a response. If a connection cannot be established in this time, the response will not be sent and the connection will be closed.

  • websocket_url (str) – The fully qualified URL to the websocket.

  • producer_url (Optional[str]) – An alternative URL to use for producers. If None, the websocket_url will be used.

  • consumer_url (Optional[str]) – An alternative URL to use for consumers. If None, the websocket_url will be used.

Attributes:

consumer_url

model_computed_fields

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

producer_url

websocket_url

Methods:

get_topic_url(topic[, subscription])

Build the URL to connect to a websocket.

model_post_init(_ModelMetaclass__context)

We need to both initialize private attributes and call the user-defined model_post_init method.

consumer_url: str | None
get_topic_url(topic: str, subscription: str | None = None) str[source]

Build the URL to connect to a websocket.

header: Json[Dict[str, Any]] | Dict[str, Any]
max_payload_size: int
max_workers: PositiveInt | None
model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_parse_none_str': None, 'env_prefix': 'de_backend_', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'protected_namespaces': ('model_', 'settings_'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_file': None, 'yaml_file_encoding': None}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'consumer_url': FieldInfo(annotation=Union[str, NoneType], required=False, default=None, description='An alternative URL to use for consumers. If None, the `websocket_url` will be used.'), 'header': FieldInfo(annotation=Union[Annotated[Dict[str, Any], Json], Dict[str, Any]], required=False, default_factory=dict, description='Header parameters for the request'), 'max_payload_size': FieldInfo(annotation=int, required=False, default=512000, description="(optional) maximum payload size, must be smaller than pulsars 'webSocketMaxTextFrameSize', which is configured e.g.via 'pulsar/conf/standalone.conf'.default: 512000 (500kb)."), 'max_workers': FieldInfo(annotation=Union[Annotated[int, Gt], NoneType], required=False, default=None, description='(optional) number of concurrent workers for handling requests, default: number of processors on the machine, multiplied by 5.'), 'producer_connection_timeout': FieldInfo(annotation=int, required=False, default=30, description='The amount of time that we grant producers to establish a connection to the message broker in order to send a response. If a connection cannot be established in this time, the response will not be sent and the connection will be closed.'), 'producer_keep_alive': FieldInfo(annotation=int, required=False, default=120, description='The amount of time that the websocket connection to a producer should be kept open. By default, 2 minutes (120 seconds). On each outgoing message, the timer will be reset. Set this to 0 to immediately close the connection when a message has been sent and acknowledged.'), 'producer_url': FieldInfo(annotation=Union[str, NoneType], required=False, default=None, description='An alternative URL to use for producers. If None, the `websocket_url` will be used.'), 'queue_size': FieldInfo(annotation=Union[Annotated[int, Gt], NoneType], required=False, default=None, description='(optional) size of the request queue, if MAX_WORKERS is set, this needs to be at least as big as MAX_WORKERS, otherwise an AttributeException is raised.'), 'topic': FieldInfo(annotation=str, required=True, description='The topic identifier under which to register at the pulsar.'), 'websocket_url': FieldInfo(annotation=str, required=False, default='', description='The fully qualified URL to the websocket.')}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

model_post_init(_ModelMetaclass__context: Any) None

We need to both initialize private attributes and call the user-defined model_post_init method.

producer_connection_timeout: int
producer_keep_alive: int
producer_url: str | None
queue_size: PositiveInt | None
topic: str
websocket_url: str