demessaging.config.messaging module
Messaging configuration classes for DASF.
Classes:
|
Base class for messaging configs. |
|
A configuration class to connect to the pulsar messaging framework. |
|
A configuration for a websocket. |
- class demessaging.config.messaging.BaseMessagingConfig(_case_sensitive: bool | None = None, _env_prefix: str | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_ignore_empty: bool | None = None, _env_nested_delimiter: str | None = None, _env_parse_none_str: str | None = None, _secrets_dir: str | Path | None = None, *, topic: str, header: Dict[str, Any] | Dict[str, Any] = None, max_workers: int | None = None, queue_size: int | None = None, max_payload_size: int = 512000, producer_keep_alive: int = 120, producer_connection_timeout: int = 30)[source]
Bases:
BaseSettings
Base class for messaging configs.
- Parameters:
topic (str) – The topic identifier under which to register at the pulsar.
header (Union[Annotated[Dict[str, Any], Json], Dict[str, Any]]) – Header parameters for the request
max_workers (Optional[Annotated[int, Gt(gt=0)]]) – (optional) number of concurrent workers for handling requests, default: number of processors on the machine, multiplied by 5.
queue_size (Optional[Annotated[int, Gt(gt=0)]]) – (optional) size of the request queue, if MAX_WORKERS is set, this needs to be at least as big as MAX_WORKERS, otherwise an AttributeException is raised.
max_payload_size (int) – (optional) maximum payload size, must be smaller than pulsars ‘webSocketMaxTextFrameSize’, which is configured e.g.via ‘pulsar/conf/standalone.conf’.default: 512000 (500kb).
producer_keep_alive (int) – The amount of time that the websocket connection to a producer should be kept open. By default, 2 minutes (120 seconds). On each outgoing message, the timer will be reset. Set this to 0 to immediately close the connection when a message has been sent and acknowledged.
producer_connection_timeout (int) – The amount of time that we grant producers to establish a connection to the message broker in order to send a response. If a connection cannot be established in this time, the response will not be sent and the connection will be closed.
Methods:
get_topic_url
(topic[, subscription])Build the URL to connect to a websocket.
model_post_init
(__context)This function is meant to behave like a BaseModel method to initialise private attributes.
Check that the queue_size is smaller than the max_workers.
Attributes:
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
The connected producer for the messaging config
- get_topic_url(topic: str, subscription: str | None = None) str [source]
Build the URL to connect to a websocket.
- model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_parse_none_str': None, 'env_prefix': 'de_backend_', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'protected_namespaces': ('model_', 'settings_'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_file': None, 'yaml_file_encoding': None}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'header': FieldInfo(annotation=Union[Annotated[Dict[str, Any], Json], Dict[str, Any]], required=False, default_factory=dict, description='Header parameters for the request'), 'max_payload_size': FieldInfo(annotation=int, required=False, default=512000, description="(optional) maximum payload size, must be smaller than pulsars 'webSocketMaxTextFrameSize', which is configured e.g.via 'pulsar/conf/standalone.conf'.default: 512000 (500kb)."), 'max_workers': FieldInfo(annotation=Union[Annotated[int, Gt], NoneType], required=False, default=None, description='(optional) number of concurrent workers for handling requests, default: number of processors on the machine, multiplied by 5.'), 'producer_connection_timeout': FieldInfo(annotation=int, required=False, default=30, description='The amount of time that we grant producers to establish a connection to the message broker in order to send a response. If a connection cannot be established in this time, the response will not be sent and the connection will be closed.'), 'producer_keep_alive': FieldInfo(annotation=int, required=False, default=120, description='The amount of time that the websocket connection to a producer should be kept open. By default, 2 minutes (120 seconds). On each outgoing message, the timer will be reset. Set this to 0 to immediately close the connection when a message has been sent and acknowledged.'), 'queue_size': FieldInfo(annotation=Union[Annotated[int, Gt], NoneType], required=False, default=None, description='(optional) size of the request queue, if MAX_WORKERS is set, this needs to be at least as big as MAX_WORKERS, otherwise an AttributeException is raised.'), 'topic': FieldInfo(annotation=str, required=True, description='The topic identifier under which to register at the pulsar.')}
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- model_post_init(__context: Any) None
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Parameters:
self – The BaseModel instance.
__context – The context.
- property producer: MessageProducer
The connected producer for the messaging config
- class demessaging.config.messaging.PulsarConfig(_case_sensitive: bool | None = None, _env_prefix: str | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_ignore_empty: bool | None = None, _env_nested_delimiter: str | None = None, _env_parse_none_str: str | None = None, _secrets_dir: str | Path | None = None, *, topic: str, header: Dict[str, Any] | Dict[str, Any] = None, max_workers: int | None = None, queue_size: int | None = None, max_payload_size: int = 512000, producer_keep_alive: int = 120, producer_connection_timeout: int = 30, host: str = 'localhost', port: str = '8080', persistent: str = 'non-persistent', tenant: str = 'public', namespace: str = 'default')[source]
Bases:
BaseMessagingConfig
A configuration class to connect to the pulsar messaging framework.
- Parameters:
topic (str) – The topic identifier under which to register at the pulsar.
header (Union[Annotated[Dict[str, Any], Json], Dict[str, Any]]) – Header parameters for the request
max_workers (Optional[Annotated[int, Gt(gt=0)]]) – (optional) number of concurrent workers for handling requests, default: number of processors on the machine, multiplied by 5.
queue_size (Optional[Annotated[int, Gt(gt=0)]]) – (optional) size of the request queue, if MAX_WORKERS is set, this needs to be at least as big as MAX_WORKERS, otherwise an AttributeException is raised.
max_payload_size (int) – (optional) maximum payload size, must be smaller than pulsars ‘webSocketMaxTextFrameSize’, which is configured e.g.via ‘pulsar/conf/standalone.conf’.default: 512000 (500kb).
producer_keep_alive (int) – The amount of time that the websocket connection to a producer should be kept open. By default, 2 minutes (120 seconds). On each outgoing message, the timer will be reset. Set this to 0 to immediately close the connection when a message has been sent and acknowledged.
producer_connection_timeout (int) – The amount of time that we grant producers to establish a connection to the message broker in order to send a response. If a connection cannot be established in this time, the response will not be sent and the connection will be closed.
host (str) – The remote host of the pulsar.
persistent (str) – None
tenant (str) – None
namespace (str) – None
Methods:
get_topic_url
(topic[, subscription])Build the URL to connect to a websocket.
model_post_init
(_ModelMetaclass__context)We need to both initialize private attributes and call the user-defined model_post_init method.
Attributes:
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
- get_topic_url(topic: str, subscription: str | None = None) str [source]
Build the URL to connect to a websocket.
- model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_parse_none_str': None, 'env_prefix': 'de_backend_', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'protected_namespaces': ('model_', 'settings_'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_file': None, 'yaml_file_encoding': None}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'header': FieldInfo(annotation=Union[Annotated[Dict[str, Any], Json], Dict[str, Any]], required=False, default_factory=dict, description='Header parameters for the request'), 'host': FieldInfo(annotation=str, required=False, default='localhost', description='The remote host of the pulsar.'), 'max_payload_size': FieldInfo(annotation=int, required=False, default=512000, description="(optional) maximum payload size, must be smaller than pulsars 'webSocketMaxTextFrameSize', which is configured e.g.via 'pulsar/conf/standalone.conf'.default: 512000 (500kb)."), 'max_workers': FieldInfo(annotation=Union[Annotated[int, Gt], NoneType], required=False, default=None, description='(optional) number of concurrent workers for handling requests, default: number of processors on the machine, multiplied by 5.'), 'namespace': FieldInfo(annotation=str, required=False, default='default'), 'persistent': FieldInfo(annotation=str, required=False, default='non-persistent'), 'port': FieldInfo(annotation=str, required=False, default='8080', description='The port of the pulsar at the given :attr:`host`.'), 'producer_connection_timeout': FieldInfo(annotation=int, required=False, default=30, description='The amount of time that we grant producers to establish a connection to the message broker in order to send a response. If a connection cannot be established in this time, the response will not be sent and the connection will be closed.'), 'producer_keep_alive': FieldInfo(annotation=int, required=False, default=120, description='The amount of time that the websocket connection to a producer should be kept open. By default, 2 minutes (120 seconds). On each outgoing message, the timer will be reset. Set this to 0 to immediately close the connection when a message has been sent and acknowledged.'), 'queue_size': FieldInfo(annotation=Union[Annotated[int, Gt], NoneType], required=False, default=None, description='(optional) size of the request queue, if MAX_WORKERS is set, this needs to be at least as big as MAX_WORKERS, otherwise an AttributeException is raised.'), 'tenant': FieldInfo(annotation=str, required=False, default='public'), 'topic': FieldInfo(annotation=str, required=True, description='The topic identifier under which to register at the pulsar.')}
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- class demessaging.config.messaging.WebsocketURLConfig(_case_sensitive: bool | None = None, _env_prefix: str | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_ignore_empty: bool | None = None, _env_nested_delimiter: str | None = None, _env_parse_none_str: str | None = None, _secrets_dir: str | Path | None = None, *, topic: str, header: Dict[str, Any] | Dict[str, Any] = None, max_workers: int | None = None, queue_size: int | None = None, max_payload_size: int = 512000, producer_keep_alive: int = 120, producer_connection_timeout: int = 30, websocket_url: str = '', producer_url: str | None = None, consumer_url: str | None = None)[source]
Bases:
BaseMessagingConfig
A configuration for a websocket.
- Parameters:
topic (str) – The topic identifier under which to register at the pulsar.
header (Union[Annotated[Dict[str, Any], Json], Dict[str, Any]]) – Header parameters for the request
max_workers (Optional[Annotated[int, Gt(gt=0)]]) – (optional) number of concurrent workers for handling requests, default: number of processors on the machine, multiplied by 5.
queue_size (Optional[Annotated[int, Gt(gt=0)]]) – (optional) size of the request queue, if MAX_WORKERS is set, this needs to be at least as big as MAX_WORKERS, otherwise an AttributeException is raised.
max_payload_size (int) – (optional) maximum payload size, must be smaller than pulsars ‘webSocketMaxTextFrameSize’, which is configured e.g.via ‘pulsar/conf/standalone.conf’.default: 512000 (500kb).
producer_keep_alive (int) – The amount of time that the websocket connection to a producer should be kept open. By default, 2 minutes (120 seconds). On each outgoing message, the timer will be reset. Set this to 0 to immediately close the connection when a message has been sent and acknowledged.
producer_connection_timeout (int) – The amount of time that we grant producers to establish a connection to the message broker in order to send a response. If a connection cannot be established in this time, the response will not be sent and the connection will be closed.
websocket_url (str) – The fully qualified URL to the websocket.
producer_url (Optional[str]) – An alternative URL to use for producers. If None, the websocket_url will be used.
consumer_url (Optional[str]) – An alternative URL to use for consumers. If None, the websocket_url will be used.
Attributes:
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
Methods:
get_topic_url
(topic[, subscription])Build the URL to connect to a websocket.
model_post_init
(_ModelMetaclass__context)We need to both initialize private attributes and call the user-defined model_post_init method.
- get_topic_url(topic: str, subscription: str | None = None) str [source]
Build the URL to connect to a websocket.
- model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_parse_none_str': None, 'env_prefix': 'de_backend_', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'protected_namespaces': ('model_', 'settings_'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_file': None, 'yaml_file_encoding': None}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'consumer_url': FieldInfo(annotation=Union[str, NoneType], required=False, default=None, description='An alternative URL to use for consumers. If None, the `websocket_url` will be used.'), 'header': FieldInfo(annotation=Union[Annotated[Dict[str, Any], Json], Dict[str, Any]], required=False, default_factory=dict, description='Header parameters for the request'), 'max_payload_size': FieldInfo(annotation=int, required=False, default=512000, description="(optional) maximum payload size, must be smaller than pulsars 'webSocketMaxTextFrameSize', which is configured e.g.via 'pulsar/conf/standalone.conf'.default: 512000 (500kb)."), 'max_workers': FieldInfo(annotation=Union[Annotated[int, Gt], NoneType], required=False, default=None, description='(optional) number of concurrent workers for handling requests, default: number of processors on the machine, multiplied by 5.'), 'producer_connection_timeout': FieldInfo(annotation=int, required=False, default=30, description='The amount of time that we grant producers to establish a connection to the message broker in order to send a response. If a connection cannot be established in this time, the response will not be sent and the connection will be closed.'), 'producer_keep_alive': FieldInfo(annotation=int, required=False, default=120, description='The amount of time that the websocket connection to a producer should be kept open. By default, 2 minutes (120 seconds). On each outgoing message, the timer will be reset. Set this to 0 to immediately close the connection when a message has been sent and acknowledged.'), 'producer_url': FieldInfo(annotation=Union[str, NoneType], required=False, default=None, description='An alternative URL to use for producers. If None, the `websocket_url` will be used.'), 'queue_size': FieldInfo(annotation=Union[Annotated[int, Gt], NoneType], required=False, default=None, description='(optional) size of the request queue, if MAX_WORKERS is set, this needs to be at least as big as MAX_WORKERS, otherwise an AttributeException is raised.'), 'topic': FieldInfo(annotation=str, required=True, description='The topic identifier under which to register at the pulsar.'), 'websocket_url': FieldInfo(annotation=str, required=False, default='', description='The fully qualified URL to the websocket.')}
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.