demessaging.messaging.consumer module

Consumer for messages submitted via the message broker.

Classes:

MessageConsumer(pulsar_config, handle_request)

Consumer for messages submitted via the message broker.

class demessaging.messaging.consumer.MessageConsumer(pulsar_config: BaseMessagingConfig, handle_request, handle_response=None, module_info: dict | None = None, api_info: ModuleAPIModel | None = None)[source]

Bases: WebsocketConnection

Consumer for messages submitted via the message broker.

Attributes:

RECONNECT_TIMEOUT_SLEEP

SOCKET_PING_INTERVAL

Methods:

acknowledge(msg)

close_websocket_app(ws_app[, reason])

disconnect()

extract_context(msg)

extract_message_type(msg)

extract_response_topic(msg)

handle_api_info(api_info_request)

Show the api of the module.

handle_info(info_request)

handle_pong(request)

handle_request_via_queue(msg)

is_valid_request(request_message)

is_valid_value(value)

on_message(ws_app, msg)

on_producer_message(ws_app, msg)

open_producer_app(topic, **kwargs)

reset_close_timer(ws_app)

send_error(request, error_message)

send_pong(request)

send_response(request[, response_payload, ...])

setup_subscription()

wait_for_request()

wait_for_websocket_connection(ws_app[, timeout])

RECONNECT_TIMEOUT_SLEEP = 10
SOCKET_PING_INTERVAL = 60
acknowledge(msg)[source]
close_websocket_app(ws_app: WebSocketApp, reason: str = 'unspecified')[source]
disconnect()[source]
static extract_context(msg) str | None[source]
static extract_message_type(msg)[source]
static extract_response_topic(msg)[source]
handle_api_info(api_info_request)[source]

Show the api of the module.

handle_info(info_request)[source]
handle_pong(request)[source]
handle_request_via_queue(msg)[source]
static is_valid_request(request_message)[source]
static is_valid_value(value)[source]
on_message(ws_app: WebSocketApp, msg)[source]
on_producer_message(ws_app: WebSocketApp, msg: str)[source]
open_producer_app(topic, **kwargs)[source]
producer_locks: Dict[WebSocketApp, allocate_lock]
producer_threads: Dict[WebSocketApp, Thread]
producer_timer: Dict[WebSocketApp, Timer]
producers: Dict[str, WebSocketApp]
request_semaphore: BoundedSemaphore | None
reset_close_timer(ws_app: WebSocketApp)[source]
send_error(request, error_message)[source]
send_pong(request)[source]
send_response(request, response_payload=None, msg_type=MessageType.RESPONSE, response_properties=None)[source]
setup_subscription()[source]
wait_for_request()[source]
wait_for_websocket_connection(ws_app: WebSocketApp, timeout=10)[source]