Source code for demessaging.config.messaging

"""Messaging configuration classes for DASF."""

# SPDX-FileCopyrightText: 2019-2024 Helmholtz Centre Potsdam GFZ German Research Centre for Geosciences
# SPDX-FileCopyrightText: 2020-2021 Helmholtz-Zentrum Geesthacht GmbH
# SPDX-FileCopyrightText: 2021-2024 Helmholtz-Zentrum hereon GmbH
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

import atexit
from typing import TYPE_CHECKING, Any, Dict, Optional, Union

from pydantic import Field, Json, PositiveInt, model_validator
from pydantic_settings import BaseSettings, SettingsConfigDict

from demessaging.utils import append_parameter_docs

if TYPE_CHECKING:
    from demessaging.messaging.producer import MessageProducer


[docs] @append_parameter_docs class BaseMessagingConfig(BaseSettings): # type: ignore """Base class for messaging configs.""" model_config = SettingsConfigDict(env_prefix="de_backend_") topic: str = Field( description=( "The topic identifier under which to register at the pulsar." ) ) header: Union[Json[Dict[str, Any]], Dict[str, Any]] = Field( # type: ignore default_factory=dict, description="Header parameters for the request" ) max_workers: Optional[PositiveInt] = Field( default=None, description=( "(optional) number of concurrent workers for handling requests, " "default: number of processors on the machine, multiplied by 5." ), ) queue_size: Optional[PositiveInt] = Field( default=None, description=( "(optional) size of the request queue, if MAX_WORKERS is set, " "this needs to be at least as big as MAX_WORKERS, " "otherwise an AttributeException is raised." ), ) max_payload_size: int = Field( default=500 * 1024, description=( "(optional) maximum payload size, must be smaller than pulsars 'webSocketMaxTextFrameSize', " "which is configured e.g.via 'pulsar/conf/standalone.conf'." "default: 512000 (500kb)." ), ) producer_keep_alive: int = Field( default=120, description=( "The amount of time that the websocket connection to a producer " "should be kept open. By default, 2 minutes (120 seconds). On " "each outgoing message, the timer will be reset. Set this to 0 to " "immediately close the connection when a message has been sent " "and acknowledged." ), ) producer_connection_timeout: int = Field( default=30, description=( "The amount of time that we grant producers to establish a " "connection to the message broker in order to send a response. If " "a connection cannot be established in this time, the response " "will not be sent and the connection will be closed." ), ) _producer: Optional[MessageProducer] = None @property def producer(self) -> MessageProducer: """The connected producer for the messaging config""" from demessaging.messaging.producer import MessageProducer if self._producer: return self._producer else: self._producer = producer = MessageProducer(self) producer.connect() atexit.register(producer.disconnect) return producer
[docs] @model_validator(mode="after") def validate_queue_size(self): """Check that the queue_size is smaller than the max_workers.""" queue_size = self.queue_size max_workers = self.max_workers if queue_size is not None and max_workers is not None: if queue_size < max_workers: raise ValueError( f"queue_size ({queue_size}) needs to be larger than or " f"equal to max_workers ({max_workers})" ) return self
[docs] def get_topic_url( self, topic: str, subscription: Optional[str] = None ) -> str: """Build the URL to connect to a websocket.""" raise NotImplementedError( "this method is supposed to be implemented by subclasses" )
[docs] @append_parameter_docs class PulsarConfig(BaseMessagingConfig): """A configuration class to connect to the pulsar messaging framework.""" host: str = Field( "localhost", description="The remote host of the pulsar." ) port: str = Field( "8080", description="The port of the pulsar at the given :attr:`host`." ) persistent: str = Field("non-persistent") tenant: str = Field("public") namespace: str = Field("default")
[docs] def get_topic_url( self, topic: str, subscription: Optional[str] = None ) -> str: """Build the URL to connect to a websocket.""" connection_type = "consumer" if subscription else "producer" sub = ("/" + subscription) if subscription else "" return ( f"ws://{self.host}:{self.port}/ws/v2/{connection_type}/" f"{self.persistent}/{self.tenant}/{self.namespace}/{topic}{sub}" )
[docs] @append_parameter_docs class WebsocketURLConfig(BaseMessagingConfig): """A configuration for a websocket.""" websocket_url: str = Field( "", description="The fully qualified URL to the websocket." ) producer_url: Optional[str] = Field( None, description=( "An alternative URL to use for producers. If None, the " "`websocket_url` will be used." ), ) consumer_url: Optional[str] = Field( None, description=( "An alternative URL to use for consumers. If None, the " "`websocket_url` will be used." ), )
[docs] def get_topic_url( self, topic: str, subscription: Optional[str] = None ) -> str: """Build the URL to connect to a websocket.""" sub = ("/" + subscription) if subscription else "" if subscription: uri = self.consumer_url or self.websocket_url else: uri = self.producer_url or self.websocket_url if not uri.endswith("/"): uri += "/" return uri + topic + sub