Configuration classes for the de-messaging backend module.

Classes:

ApiRegistry(*[, imports, objects])

A registry for imports and encoders

BaseConfig(*[, doc, registry, template])

Configuration base class for functions, modules and classes.

BaseMessagingConfig([_case_sensitive, ...])

Base class for messaging configs.

ClassConfig(*[, doc, registry, template, ...])

Configuration class for a backend module class.

FunctionConfig(*[, doc, registry, template, ...])

Configuration class for a backend module function.

ModuleConfig(*[, doc, registry, template, ...])

Configuration class for a backend module.

PulsarConfig([_case_sensitive, _env_prefix, ...])

A configuration class to connect to the pulsar messaging framework.

WebsocketURLConfig([_case_sensitive, ...])

A configuration for a websocket.

Functions:

append_parameter_docs(model)

Append the parameters section to the docstring of a model.

build_parameter_docs(model)

Build the docstring for the parameters of a model.

configure([js])

Configuration decorator for function or modules.

object_to_string(obj)

type_to_string(type_)

Data:

registry

registry for the stuff that should be available in the generated client stub

class demessaging.config.ApiRegistry(*, imports: Dict[str, str] = None, objects: List[str] = None)[source]

Bases: BaseModel

A registry for imports and encoders

Parameters:
  • imports (Dict[str, str]) – Modules to import at the top of every file. The first item is the module, the second is the alias

  • objects (List[str]) – Source code for objects that should be inlined in the generated Python API.

Methods:

can_import_import(imports)

hard_code(python_code)

Register some code to be implemented in the generated module.

register_import(module[, alias])

Register a module that needs to be imported in generated API files.

register_type(obj)

Register a class or function to be available in the generated API.

Attributes:

imports

model_config

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

objects

classmethod can_import_import(imports: Dict[str, str]) Dict[str, str][source]
hard_code(python_code: str) None[source]

Register some code to be implemented in the generated module.

Parameters:

python_code (str) – The code that is supposed to be executed on a module level.

imports: Dict[str, str]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'imports': FieldInfo(annotation=Dict[str, str], required=False, default_factory=dict, description='Modules to import at the top of every file. The first item is the module, the second is the alias'), 'objects': FieldInfo(annotation=List[str], required=False, default_factory=list, description='Source code for objects that should be inlined in the generated Python API.')}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

objects: List[str]
register_import(module: str, alias: str | None = None) None[source]

Register a module that needs to be imported in generated API files.

Parameters:

module (str) – The name of the module, e.g. matplotlib.pyplot

register_type(obj: T) T[source]

Register a class or function to be available in the generated API.

Use this function if you want to have certain functions of classes available in the generated API module, but they should not be transformed to a call to the backend module.

class demessaging.config.BaseConfig(*, doc: str = '', registry: ApiRegistry = None, template: Template = Template(name='empty', folder=PosixPath('/builds/digital-earth/dasf/dasf-messaging-python/demessaging/templates'), suffix='.jinja2', context={}))[source]

Bases: BaseModel

Configuration base class for functions, modules and classes.

Parameters:

Attributes:

doc

model_config

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

registry

template

Methods:

render(**context)

Generate the code to call this function in the frontend.

doc: str
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'doc': FieldInfo(annotation=str, required=False, default='', description='The documentation of the object. If empty, this will be taken from the corresponding ``__doc__`` attribute.'), 'registry': FieldInfo(annotation=ApiRegistry, required=False, default_factory=_get_registry, description='Utilities for imports and encoders.'), 'template': FieldInfo(annotation=Template, required=False, default=Template(name='empty', folder=PosixPath('/builds/digital-earth/dasf/dasf-messaging-python/demessaging/templates'), suffix='.jinja2', context={}), description='The :class:`demessaging.template.Template` that is used to render this object for the generated API.')}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

registry: ApiRegistry
render(**context) str[source]

Generate the code to call this function in the frontend.

template: Template
class demessaging.config.BaseMessagingConfig(_case_sensitive: bool | None = None, _env_prefix: str | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_nested_delimiter: str | None = None, _secrets_dir: str | Path | None = None, *, topic: str, header: Dict[str, Any] | Dict[str, Any] = None, max_workers: int | None = None, queue_size: int | None = None, max_payload_size: int = 512000)[source]

Bases: BaseSettings

Base class for messaging configs.

Parameters:
  • topic (str) – The topic identifier under which to register at the pulsar.

  • header (Union[Annotated[Dict[str, Any], Json], Dict[str, Any]]) – Header parameters for the request

  • max_workers (Optional[Annotated[int, Gt(gt=0)]]) – (optional) number of concurrent workers for handling requests, default: number of processors on the machine, multiplied by 5.

  • queue_size (Optional[Annotated[int, Gt(gt=0)]]) – (optional) size of the request queue, if MAX_WORKERS is set, this needs to be at least as big as MAX_WORKERS, otherwise an AttributeException is raised.

  • max_payload_size (int) – (optional) maximum payload size, must be smaller than pulsars ‘webSocketMaxTextFrameSize’, which is configured e.g.via ‘pulsar/conf/standalone.conf’.default: 512000 (500kb).

Methods:

get_topic_url(topic[, subscription])

Build the URL to connect to a websocket.

validate_queue_size()

Check that the queue_size is smaller than the max_workers.

Attributes:

header

max_payload_size

max_workers

model_config

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

queue_size

topic

get_topic_url(topic: str, subscription: str | None = None) str[source]

Build the URL to connect to a websocket.

header: Json[Dict[str, Any]] | Dict[str, Any]
max_payload_size: int
max_workers: PositiveInt | None
model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'env_file': None, 'env_file_encoding': None, 'env_nested_delimiter': None, 'env_prefix': 'de_backend_', 'extra': 'forbid', 'protected_namespaces': ('model_', 'settings_'), 'secrets_dir': None, 'validate_default': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'header': FieldInfo(annotation=Union[Annotated[Dict[str, Any], Json], Dict[str, Any]], required=False, default_factory=dict, description='Header parameters for the request'), 'max_payload_size': FieldInfo(annotation=int, required=False, default=512000, description="(optional) maximum payload size, must be smaller than pulsars 'webSocketMaxTextFrameSize', which is configured e.g.via 'pulsar/conf/standalone.conf'.default: 512000 (500kb)."), 'max_workers': FieldInfo(annotation=Union[Annotated[int, Gt(gt=0)], NoneType], required=False, description='(optional) number of concurrent workers for handling requests, default: number of processors on the machine, multiplied by 5.'), 'queue_size': FieldInfo(annotation=Union[Annotated[int, Gt(gt=0)], NoneType], required=False, description='(optional) size of the request queue, if MAX_WORKERS is set, this needs to be at least as big as MAX_WORKERS, otherwise an AttributeException is raised.'), 'topic': FieldInfo(annotation=str, required=True, description='The topic identifier under which to register at the pulsar.')}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

queue_size: PositiveInt | None
topic: str
validate_queue_size()[source]

Check that the queue_size is smaller than the max_workers.

class demessaging.config.ClassConfig(*, doc: str = '', registry: ApiRegistry = None, template: Template = Template(name='class_.py', folder=PosixPath('/builds/digital-earth/dasf/dasf-messaging-python/demessaging/templates'), suffix='.jinja2', context={}), name: str = '', init_doc: str = '', signature: Signature | None = None, methods: List[str] = None, validators: Dict[str, Any] = None, serializers: Dict[str, Any] = None, field_params: Dict[str, Dict[str, Any]] = None, annotations: Dict[str, Any] = None, reporter_args: Dict[str, BaseReport] = None)[source]

Bases: BaseConfig

Configuration class for a backend module class.

Parameters:
  • doc (str) – The documentation of the object. If empty, this will be taken from the corresponding __doc__ attribute.

  • registry (demessaging.config.ApiRegistry) – Utilities for imports and encoders.

  • template (demessaging.template.Template) – The demessaging.template.Template that is used to render the class for the generated API.

  • name (str) – The name of the function. If empty, this will be taken from the classes __name__ attribute.

  • init_doc (str) – The documentation of the function. If empty, this will be taken from the classes __init__ method.

  • signature (Optional[inspect.Signature]) – The calling signature for the function. If empty, this will be taken from the function itself.

  • methods (List[str]) – methods to use within the backend modules

  • validators (Dict[str, Any]) – custom validators for the constructor parameters

  • serializers (Dict[str, Any]) – A mapping from function argument to serializer that is of instance pydantic.functional_serializers.PlainSerializer or pydantic.functional_serializers.WrapSerializer.

  • field_params (Dict[str, Dict[str, Any]]) – custom Field overrides for the constructor parameters. See pydantic.Fields.Field()

  • annotations (Dict[str, Any]) – custom annotations for constructor parameters

  • reporter_args (Dict[str, deprogressapi.base.BaseReport]) – Arguments that use the dasf-progress-api

Attributes:

annotations

field_params

init_doc

methods

model_config

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

name

reporter_args

serializers

signature

template

validators

annotations: Dict[str, Any]
doc: str
field_params: Dict[str, Dict[str, Any]]
init_doc: str
methods: List[str]
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'annotations': FieldInfo(annotation=Dict[str, Any], required=False, default_factory=dict, description='custom annotations for constructor parameters'), 'doc': FieldInfo(annotation=str, required=False, default='', description='The documentation of the object. If empty, this will be taken from the corresponding ``__doc__`` attribute.'), 'field_params': FieldInfo(annotation=Dict[str, Dict[str, Any]], required=False, default_factory=dict, description='custom Field overrides for the constructor parameters. See :func:`pydantic.Fields.Field`'), 'init_doc': FieldInfo(annotation=str, required=False, default='', description='The documentation of the function. If empty, this will be taken from the classes ``__init__`` method.'), 'methods': FieldInfo(annotation=List[str], required=False, default_factory=list, description='methods to use within the backend modules'), 'name': FieldInfo(annotation=str, required=False, default='', description='The name of the function. If empty, this will be taken from the classes ``__name__`` attribute.'), 'registry': FieldInfo(annotation=ApiRegistry, required=False, default_factory=_get_registry, description='Utilities for imports and encoders.'), 'reporter_args': FieldInfo(annotation=Dict[str, deprogressapi.base.BaseReport], required=False, default_factory=dict, description='Arguments that use the dasf-progress-api'), 'serializers': FieldInfo(annotation=Dict[str, Any], required=False, default_factory=dict, description='A mapping from function argument to serializer that is of instance :class:`pydantic.functional_serializers.PlainSerializer` or :class:`pydantic.functional_serializers.WrapSerializer`.'), 'signature': FieldInfo(annotation=Union[Signature, NoneType], required=False, description='The calling signature for the function. If empty, this will be taken from the function itself.'), 'template': FieldInfo(annotation=Template, required=False, default=Template(name='class_.py', folder=PosixPath('/builds/digital-earth/dasf/dasf-messaging-python/demessaging/templates'), suffix='.jinja2', context={}), description='The :class:`demessaging.template.Template` that is used to render the class for the generated API.'), 'validators': FieldInfo(annotation=Dict[str, Any], required=False, default_factory=dict, description='custom validators for the constructor parameters')}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

name: str
registry: ApiRegistry
reporter_args: Dict[str, BaseReport]
serializers: Dict[str, Any]
signature: inspect.Signature | None
template: Template
validators: Dict[str, Any]
class demessaging.config.FunctionConfig(*, doc: str = '', registry: ApiRegistry = None, template: Template = Template(name='function.py', folder=PosixPath('/builds/digital-earth/dasf/dasf-messaging-python/demessaging/templates'), suffix='.jinja2', context={}), name: str = '', signature: Signature | None = None, validators: Dict[str, List[ImportString | Callable[[], Callable]]] = None, serializers: Dict[str, ImportString | Callable[[], Callable]] = None, return_validators: List[ImportString | Callable[[], Callable]] | None = None, return_serializer: ImportString | Callable[[], Callable] | None = None, field_params: Dict[str, Dict[str, Any]] = None, returns: Dict[str, Any] = None, return_annotation: Any | None = None, annotations: Dict[str, Any] = None, reporter_args: Dict[str, BaseReport] = None)[source]

Bases: BaseConfig

Configuration class for a backend module function.

Parameters:
  • doc (str) – The documentation of the object. If empty, this will be taken from the corresponding __doc__ attribute.

  • registry (demessaging.config.ApiRegistry) – Utilities for imports and encoders.

  • template (demessaging.template.Template) – The demessaging.template.Template that is used to render the function for the generated API.

  • name (str) – The name of the function. If empty, this will be taken from the functions __name__ attribute.

  • signature (Optional[inspect.Signature]) – The calling signature for the function. If empty, this will be taken from the function itself.

  • validators (Dict[str, List[Union[pydantic.types.ImportString, Annotated[Callable, PlainSerializer(func=<function object_to_string at 0x7f1ac8ae3dc0>, return_type=PydanticUndefined, when_used='always')]]]]) – Custom validators for function arguments. This parameter is a mapping from function argument name to a list of callables that can be used as validator.

  • serializers (Dict[str, Union[pydantic.types.ImportString, Annotated[Callable, PlainSerializer(func=<function object_to_string at 0x7f1ac8ae3dc0>, return_type=PydanticUndefined, when_used='always')]]]) – A mapping from function argument to serializing function that is then used for the pydantic.functional_serializers.PlainSerializer.

  • return_validators (Optional[List[Union[pydantic.types.ImportString, Annotated[Callable, PlainSerializer(func=<function object_to_string at 0x7f1ac8ae3dc0>, return_type=PydanticUndefined, when_used='always')]]]]) – Validators for the return value. This parameter is a list of callables that can be used as validator for the return value.

  • return_serializer (Union[pydantic.types.ImportString, Annotated[Callable, PlainSerializer(func=<function object_to_string at 0x7f1ac8ae3dc0>, return_type=PydanticUndefined, when_used='always')], NoneType]) – A function that is used to serialize the return value.

  • field_params (Dict[str, Dict[str, Any]]) – custom Field overrides for the constructor parameters. See pydantic.Fields.Field()

  • returns (Dict[str, Any]) – custom returns overrides.

  • return_annotation (Optional[Any]) – The annotation for the return value.

  • annotations (Dict[str, Any]) – custom annotations for function parameters

  • reporter_args (Dict[str, deprogressapi.base.BaseReport]) – Arguments that use the dasf-progress-api

Attributes:

annotations

field_params

model_config

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

name

reporter_args

return_annotation

return_serializer

return_validators

returns

serializers

signature

template

validators

annotations: Dict[str, Any]
doc: str
field_params: Dict[str, Dict[str, Any]]
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'annotations': FieldInfo(annotation=Dict[str, Any], required=False, default_factory=dict, description='custom annotations for function parameters'), 'doc': FieldInfo(annotation=str, required=False, default='', description='The documentation of the object. If empty, this will be taken from the corresponding ``__doc__`` attribute.'), 'field_params': FieldInfo(annotation=Dict[str, Dict[str, Any]], required=False, default_factory=dict, description='custom Field overrides for the constructor parameters. See :func:`pydantic.Fields.Field`'), 'name': FieldInfo(annotation=str, required=False, default='', description='The name of the function. If empty, this will be taken from the functions ``__name__`` attribute.'), 'registry': FieldInfo(annotation=ApiRegistry, required=False, default_factory=_get_registry, description='Utilities for imports and encoders.'), 'reporter_args': FieldInfo(annotation=Dict[str, deprogressapi.base.BaseReport], required=False, default_factory=dict, description='Arguments that use the dasf-progress-api'), 'return_annotation': FieldInfo(annotation=Union[Any, NoneType], required=False, description='The annotation for the return value.'), 'return_serializer': FieldInfo(annotation=Union[ImportString, Annotated[Callable, PlainSerializer(func=<function object_to_string>, return_type=PydanticUndefined, when_used='always')], NoneType], required=False, description='A function that is used to serialize the return value.'), 'return_validators': FieldInfo(annotation=Union[List[Union[pydantic.types.ImportString, Annotated[Callable, PlainSerializer(func=<function object_to_string>, return_type=PydanticUndefined, when_used='always')]]], NoneType], required=False, description='Validators for the return value. This parameter is a list of callables that can be used as validator for the return value.'), 'returns': FieldInfo(annotation=Dict[str, Any], required=False, default_factory=dict, description='custom returns overrides.'), 'serializers': FieldInfo(annotation=Dict[str, Union[pydantic.types.ImportString, Annotated[Callable, PlainSerializer(func=<function object_to_string>, return_type=PydanticUndefined, when_used='always')]]], required=False, default_factory=dict, description='A mapping from function argument to serializing function that is then used for the :class:`pydantic.functional_serializers.PlainSerializer`.'), 'signature': FieldInfo(annotation=Union[Signature, NoneType], required=False, description='The calling signature for the function. If empty, this will be taken from the function itself.'), 'template': FieldInfo(annotation=Template, required=False, default=Template(name='function.py', folder=PosixPath('/builds/digital-earth/dasf/dasf-messaging-python/demessaging/templates'), suffix='.jinja2', context={}), description='The :class:`demessaging.template.Template` that is used to render the function for the generated API.'), 'validators': FieldInfo(annotation=Dict[str, List[Union[pydantic.types.ImportString, Annotated[Callable, PlainSerializer(func=<function object_to_string>, return_type=PydanticUndefined, when_used='always')]]]], required=False, default_factory=dict, description='Custom validators for function arguments. This parameter is a mapping from function argument name to a list of callables that can be used as validator.')}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

name: str
registry: ApiRegistry
reporter_args: Dict[str, BaseReport]
return_annotation: Any | None
return_serializer: Optional[Union[ImportString, Annotated[Callable, PlainSerializer(object_to_string)]]]
return_validators: Optional[List[Union[ImportString, Annotated[Callable, PlainSerializer(object_to_string)]]]]
returns: Dict[str, Any]
serializers: Dict[str, Union[ImportString, Annotated[Callable, PlainSerializer(object_to_string)]]]
signature: inspect.Signature | None
template: Template
validators: Dict[str, List[Union[ImportString, Annotated[Callable, PlainSerializer(object_to_string)]]]]
class demessaging.config.ModuleConfig(*, doc: str = '', registry: ApiRegistry = None, template: Template = Template(name='module.py', folder=PosixPath('/builds/digital-earth/dasf/dasf-messaging-python/demessaging/templates'), suffix='.jinja2', context={}), messaging_config: PulsarConfig | WebsocketURLConfig, debug: bool = False, members: List[str | Callable | Type[object] | Any] = None, imports: str = '')[source]

Bases: BaseConfig

Configuration class for a backend module.

Parameters:

Attributes:

debug

imports

members

messaging_config

model_config

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

pulsar_config

DEPRECATED! Get the messaging configuration.

template

debug: bool
doc: str
imports: str
members: List[str | Callable | Type[object] | Any]
messaging_config: PulsarConfig | WebsocketURLConfig
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'debug': FieldInfo(annotation=bool, required=False, default=False, description='Run the backend module in debug mode (creates more verbose error messages).'), 'doc': FieldInfo(annotation=str, required=False, default='', description='The documentation of the object. If empty, this will be taken from the corresponding ``__doc__`` attribute.'), 'imports': FieldInfo(annotation=str, required=False, default='', description='Imports that should be added to the generate API module.'), 'members': FieldInfo(annotation=List[Union[str, Callable, Type[object], Any]], required=False, default_factory=list, description='List of members for this module'), 'messaging_config': FieldInfo(annotation=Union[PulsarConfig, WebsocketURLConfig], required=True, description='Configuration on how to connect to the message broker.'), 'registry': FieldInfo(annotation=ApiRegistry, required=False, default_factory=_get_registry, description='Utilities for imports and encoders.'), 'template': FieldInfo(annotation=Template, required=False, default=Template(name='module.py', folder=PosixPath('/builds/digital-earth/dasf/dasf-messaging-python/demessaging/templates'), suffix='.jinja2', context={}), description='The :class:`demessaging.template.Template` that is used to render the module for the generated API.')}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

property pulsar_config: PulsarConfig | WebsocketURLConfig

DEPRECATED! Get the messaging configuration.

Please use the messaging_config attribute of this class.

registry: ApiRegistry
template: Template
class demessaging.config.PulsarConfig(_case_sensitive: bool | None = None, _env_prefix: str | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_nested_delimiter: str | None = None, _secrets_dir: str | Path | None = None, *, topic: str, header: Dict[str, Any] | Dict[str, Any] = None, max_workers: int | None = None, queue_size: int | None = None, max_payload_size: int = 512000, host: str = 'localhost', port: str = '8080', persistent: str = 'non-persistent', tenant: str = 'public', namespace: str = 'default')[source]

Bases: BaseMessagingConfig

A configuration class to connect to the pulsar messaging framework.

Parameters:
  • topic (str) – The topic identifier under which to register at the pulsar.

  • header (Union[Annotated[Dict[str, Any], Json], Dict[str, Any]]) – Header parameters for the request

  • max_workers (Optional[Annotated[int, Gt(gt=0)]]) – (optional) number of concurrent workers for handling requests, default: number of processors on the machine, multiplied by 5.

  • queue_size (Optional[Annotated[int, Gt(gt=0)]]) – (optional) size of the request queue, if MAX_WORKERS is set, this needs to be at least as big as MAX_WORKERS, otherwise an AttributeException is raised.

  • max_payload_size (int) – (optional) maximum payload size, must be smaller than pulsars ‘webSocketMaxTextFrameSize’, which is configured e.g.via ‘pulsar/conf/standalone.conf’.default: 512000 (500kb).

  • host (str) – The remote host of the pulsar.

  • port (str) – The port of the pulsar at the given host.

  • persistent (str) – None

  • tenant (str) – None

  • namespace (str) – None

Methods:

get_topic_url(topic[, subscription])

Build the URL to connect to a websocket.

Attributes:

host

model_config

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

namespace

persistent

port

tenant

get_topic_url(topic: str, subscription: str | None = None) str[source]

Build the URL to connect to a websocket.

header: Json[Dict[str, Any]] | Dict[str, Any]
host: str
max_payload_size: int
max_workers: PositiveInt | None
model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'env_file': None, 'env_file_encoding': None, 'env_nested_delimiter': None, 'env_prefix': 'de_backend_', 'extra': 'forbid', 'protected_namespaces': ('model_', 'settings_'), 'secrets_dir': None, 'validate_default': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'header': FieldInfo(annotation=Union[Annotated[Dict[str, Any], Json], Dict[str, Any]], required=False, default_factory=dict, description='Header parameters for the request'), 'host': FieldInfo(annotation=str, required=False, default='localhost', description='The remote host of the pulsar.'), 'max_payload_size': FieldInfo(annotation=int, required=False, default=512000, description="(optional) maximum payload size, must be smaller than pulsars 'webSocketMaxTextFrameSize', which is configured e.g.via 'pulsar/conf/standalone.conf'.default: 512000 (500kb)."), 'max_workers': FieldInfo(annotation=Union[Annotated[int, Gt(gt=0)], NoneType], required=False, description='(optional) number of concurrent workers for handling requests, default: number of processors on the machine, multiplied by 5.'), 'namespace': FieldInfo(annotation=str, required=False, default='default'), 'persistent': FieldInfo(annotation=str, required=False, default='non-persistent'), 'port': FieldInfo(annotation=str, required=False, default='8080', description='The port of the pulsar at the given :attr:`host`.'), 'queue_size': FieldInfo(annotation=Union[Annotated[int, Gt(gt=0)], NoneType], required=False, description='(optional) size of the request queue, if MAX_WORKERS is set, this needs to be at least as big as MAX_WORKERS, otherwise an AttributeException is raised.'), 'tenant': FieldInfo(annotation=str, required=False, default='public'), 'topic': FieldInfo(annotation=str, required=True, description='The topic identifier under which to register at the pulsar.')}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

namespace: str
persistent: str
port: str
queue_size: PositiveInt | None
tenant: str
topic: str
class demessaging.config.WebsocketURLConfig(_case_sensitive: bool | None = None, _env_prefix: str | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_nested_delimiter: str | None = None, _secrets_dir: str | Path | None = None, *, topic: str, header: Dict[str, Any] | Dict[str, Any] = None, max_workers: int | None = None, queue_size: int | None = None, max_payload_size: int = 512000, websocket_url: str = '', producer_url: str | None = None, consumer_url: str | None = None)[source]

Bases: BaseMessagingConfig

A configuration for a websocket.

Parameters:
  • topic (str) – The topic identifier under which to register at the pulsar.

  • header (Union[Annotated[Dict[str, Any], Json], Dict[str, Any]]) – Header parameters for the request

  • max_workers (Optional[Annotated[int, Gt(gt=0)]]) – (optional) number of concurrent workers for handling requests, default: number of processors on the machine, multiplied by 5.

  • queue_size (Optional[Annotated[int, Gt(gt=0)]]) – (optional) size of the request queue, if MAX_WORKERS is set, this needs to be at least as big as MAX_WORKERS, otherwise an AttributeException is raised.

  • max_payload_size (int) – (optional) maximum payload size, must be smaller than pulsars ‘webSocketMaxTextFrameSize’, which is configured e.g.via ‘pulsar/conf/standalone.conf’.default: 512000 (500kb).

  • websocket_url (str) – The fully qualified URL to the websocket.

  • producer_url (Optional[str]) – An alternative URL to use for producers. If None, the websocket_url will be used.

  • consumer_url (Optional[str]) – An alternative URL to use for consumers. If None, the websocket_url will be used.

Attributes:

consumer_url

model_config

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

producer_url

websocket_url

Methods:

get_topic_url(topic[, subscription])

Build the URL to connect to a websocket.

consumer_url: str | None
get_topic_url(topic: str, subscription: str | None = None) str[source]

Build the URL to connect to a websocket.

header: Json[Dict[str, Any]] | Dict[str, Any]
max_payload_size: int
max_workers: PositiveInt | None
model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'env_file': None, 'env_file_encoding': None, 'env_nested_delimiter': None, 'env_prefix': 'de_backend_', 'extra': 'forbid', 'protected_namespaces': ('model_', 'settings_'), 'secrets_dir': None, 'validate_default': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'consumer_url': FieldInfo(annotation=Union[str, NoneType], required=False, description='An alternative URL to use for consumers. If None, the `websocket_url` will be used.'), 'header': FieldInfo(annotation=Union[Annotated[Dict[str, Any], Json], Dict[str, Any]], required=False, default_factory=dict, description='Header parameters for the request'), 'max_payload_size': FieldInfo(annotation=int, required=False, default=512000, description="(optional) maximum payload size, must be smaller than pulsars 'webSocketMaxTextFrameSize', which is configured e.g.via 'pulsar/conf/standalone.conf'.default: 512000 (500kb)."), 'max_workers': FieldInfo(annotation=Union[Annotated[int, Gt(gt=0)], NoneType], required=False, description='(optional) number of concurrent workers for handling requests, default: number of processors on the machine, multiplied by 5.'), 'producer_url': FieldInfo(annotation=Union[str, NoneType], required=False, description='An alternative URL to use for producers. If None, the `websocket_url` will be used.'), 'queue_size': FieldInfo(annotation=Union[Annotated[int, Gt(gt=0)], NoneType], required=False, description='(optional) size of the request queue, if MAX_WORKERS is set, this needs to be at least as big as MAX_WORKERS, otherwise an AttributeException is raised.'), 'topic': FieldInfo(annotation=str, required=True, description='The topic identifier under which to register at the pulsar.'), 'websocket_url': FieldInfo(annotation=str, required=False, default='', description='The fully qualified URL to the websocket.')}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

producer_url: str | None
queue_size: PositiveInt | None
topic: str
websocket_url: str
demessaging.config.append_parameter_docs(model: Type[BaseModel]) Type[BaseModel][source]

Append the parameters section to the docstring of a model.

demessaging.config.build_parameter_docs(model: Type[BaseModel]) str[source]

Build the docstring for the parameters of a model.

demessaging.config.configure(js: str | None = None, **kwargs) Callable[[T], T][source]

Configuration decorator for function or modules.

Use this function as a decorator for classes or functions in the backend module like so:

>>> @configure(field_params={"a": {"gt": 0}}, returns={"gt": 0})
... def sqrt(a: float) -> float:
...     import math
...
...     return math.sqrt(a)

The available parameters for this function vary depending on what you are decorating. If you are decorating a class, your parameters must be valid for the ClassConfig. If you are decorating a function, your parameters must be valid for a FunctionConfig.

Parameters:
  • js (Optional[str]) – A JSON-formatted string that can be used to setup the config.

  • **kwargs – Any keyword argument that can be used to setup the config.

Notes

If you are specifying any kwargs, your first argument (js) should be None.

demessaging.config.object_to_string(obj: Any)[source]
demessaging.config.registry = ApiRegistry(imports={}, objects=[])

registry for the stuff that should be available in the generated client stub

demessaging.config.type_to_string(type_: Any)[source]