diff --git a/.gitignore b/.gitignore index a347481..536c06a 100644 --- a/.gitignore +++ b/.gitignore @@ -94,3 +94,6 @@ docker/freeswitch/conf/**/.fsxml docker/freeswitch/conf/**/*.fsxml docker/freeswitch-test/config/logs/ docker/freeswitch-test/config/recordings/freeswitch/ + +# Local FreeSWITCH source checkout (research reference, not part of the project) +/freeswitch/ diff --git a/AGENTS.md b/AGENTS.md index 0963668..7293ac3 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -287,6 +287,71 @@ async def my_feature(): raise ``` +### Centralized metrics + +All OTel metric instruments live in `genesis/protocol/metrics.py`. **Do not +re-declare an instrument that already exists there** — duplicate instrument +creation for the same metric name trips static analysis and produces OTel SDK +warnings. Import the instrument (and the `safe_add` / `safe_record` helpers) +from `genesis.protocol.metrics` instead: + +```python +from genesis.protocol.metrics import ( + calls_active_counter, + safe_add, + safe_record, + event_processing_duration, +) +``` + +`safe_add(counter, *args, **kwargs)` and `safe_record(histogram, *args, **kwargs)` +swallow OTel/no-provider errors so a missing exporter never crashes the protocol. + +### ESL channel lifecycle spans + +`genesis/protocol/lifecycle.py` registers two event processors +(`channel_lifecycle_processor`, `custom_subclass_processor`) that emit +`freeswitch.channel.*` and `freeswitch.sofia.*` / `freeswitch.callcenter.*` / +`freeswitch.conference.*` / `freeswitch.valet.*` spans for the semantic +FreeSWITCH channel lifecycle. They run after the core protocol processors +(auth, command reply, disconnect) and only enrich telemetry — they never +consume events that route to user handlers. They are on by default; opt out +with `GENESIS_TRACE_ESL_LIFECYCLE=0` / `GENESIS_TRACE_CUSTOM_SUBCLASSES=0`. + +Emitted spans (non-exhaustive): `freeswitch.channel.create`, `.progress`, +`.progress_media`, `.answer`, `.bridge`, `.unbridge`, `.hangup`, +`.hangup_complete`, `.destroy`, `.execute`, `.execute_complete`, `.codec`, +`freeswitch.call.update`, `freeswitch.sofia.transfer`, `freeswitch.sofia.register`, +`freeswitch.callcenter.info`, `freeswitch.conference.maintenance`, +`freeswitch.conference.cdr`, `freeswitch.valet.info`. + +### Cross-system correlation (sip.call_id) + +Every channel lifecycle span carries `sip.call_id` (= the ESL +`variable_sip_call_id` header), the standard SIP `Call-ID`. This is a stable +per-call identifier that any other SIP observer of the same call will also +have, so it is the natural join key when correlating Genesis traces with +another system's traces of the same call. The join happens **at the +observability backend** (Grafana/Tempo), by filtering/grouping on `sip.call_id` +— not in code. + +Cross-leg grouping: bridge spans carry `bridge.a_uuid` and `bridge.b_uuid` +(from `Bridge-A-Unique-ID` / `Bridge-B-Unique-ID`), so the a-leg and b-leg of a +call can be tied together at the backend. + +The `genesis.events.without_sip_call_id` counter tracks channel events that +lack the correlation key (a correlation-gap signal). W3C `traceparent` / +`X-Tracespan` propagation is intentionally **out of scope**; the attribute join +is sufficient. + +### Cardinality rule + +UUIDs go **on spans only**, never as metric attributes. Metric attributes use +low-cardinality enums/labels (`channel.state`, `direction`, `hangup.cause`, +`application.name`, `bridge.result`, `transfer.type`, `loadbalancer.backend`, +...). `queue.depth` is a span attribute (not a metric label) for the same +reason. + ## Pre-PR Checklist **CRITICAL: Always run the full CI stack locally before opening a PR.** diff --git a/docs/content/docs/Observability/metrics.md b/docs/content/docs/Observability/metrics.md index 0b1edf4..94fab11 100644 --- a/docs/content/docs/Observability/metrics.md +++ b/docs/content/docs/Observability/metrics.md @@ -90,3 +90,74 @@ For programmatic access to load counts per destination, use the load balancer's - **`genesis_timeouts_total`** (Counter) - Description: Number of timeouts - Attributes: `timeout.type` (wait, command, connection), `timeout.operation`, `timeout.duration` + +## Channel lifecycle metrics + +These metrics describe what a call is doing across its lifecycle — from the +moment FreeSWITCH creates the channel until it is destroyed. Use them together +with the [tracing](./tracing) spans to follow a call end to end, and see +[Cross-system correlation](./tracing#cross-system-correlation-sipcall_id) for +how `sip.call_id` lets you join these traces with another system's view of the +same call. + +- **`genesis.calls.active`** (UpDownCounter) + - Description: Number of calls currently active, by state and direction. Goes up when a channel is created and back down when it is destroyed. + - Attributes: `channel.state`, `direction` + +- **`genesis.channel.bridge.events`** (Counter) + - Description: Bridges established and torn down, from the authoritative `CHANNEL_BRIDGE` / `CHANNEL_UNBRIDGE` events. + - Attributes: `bridge.result` (`established`, `unbridged`), `hangup.cause` + +- **`genesis.channel.transfers`** (Counter) + - Description: Call transfers observed through the `sofia::transferor` and `sofia::transferee` events. + - Attributes: `transfer.type` (`blind`, `attended`), `transfer.role` + +- **`genesis.channel.codec.changes`** (Counter) + - Description: Codec renegotiations observed through `CODEC` events. + - Attributes: `channel.read_codec`, `channel.write_codec` + +- **`genesis.dialplan.applications`** (Counter) + - Description: Dialplan applications executed, from `CHANNEL_EXECUTE` and `CHANNEL_EXECUTE_COMPLETE`. + - Attributes: `application.name`, `application.result` (`started`, `success`, `fail`) + +- **`genesis.channel.hangup.causes.q850`** (Counter) + - Description: Hangup causes grouped by Q.850 code. + - Attributes: `hangup.cause.q850` + +- **`genesis.event.processing.duration`** (Histogram) + - Description: How long it takes to dispatch a single event through the processors and routing. + - Attributes: `event.name` + +- **`genesis.events.without_sip_call_id`** (Counter) + - Description: Channel events that arrived without a `variable_sip_call_id`. A high value means those calls cannot be joined to another system's view of the same call via `sip.call_id`. + - Attributes: (none) + +## Session, consumer, load balancer and queue metrics + +- **`genesis.session.commands`** (Counter) + - Description: `sendmsg` commands sent through a session, by application. + - Attributes: `application.name` + +- **`genesis.session.command.duration`** (Histogram) + - Description: How long a session `sendmsg` command takes to complete. + - Attributes: `application.name` + +- **`genesis.consumer.handlers`** (Counter) + - Description: How many times a consumer handler was invoked, by event. + - Attributes: `event.name` + +- **`genesis.loadbalancer.selections`** (Counter) + - Description: Destinations picked by the load balancer, including when it falls back to the first available destination. + - Attributes: `loadbalancer.backend`, `loadbalancer.result` (`selected`, `fallback`) + +- **`genesis.loadbalancer.errors`** (Counter) + - Description: Errors raised while selecting a destination. + - Attributes: `loadbalancer.backend`, `error` + +- **`genesis.commands.queue.depth`** (ObservableGauge) + - Description: How many command replies are still pending. Useful to spot backpressure on the command path. + - Attributes: (none) + +- **`genesis.events.queue.depth`** (ObservableGauge) + - Description: How many events are waiting to be processed. Useful to spot backpressure on the event path. + - Attributes: (none) diff --git a/docs/content/docs/Observability/server.md b/docs/content/docs/Observability/server.md index 152943f..adab0af 100644 --- a/docs/content/docs/Observability/server.md +++ b/docs/content/docs/Observability/server.md @@ -3,7 +3,20 @@ title: HTTP Server weight: 30 --- -A built-in HTTP server exposes health, readiness, and metrics. Port **8000** by default; set `GENESIS_OBSERVABILITY_PORT` to change it. With the CLI, the server starts automatically; with the library, you start it yourself (see below). +Genesis ships a built-in HTTP server that exposes three endpoints: + +- **`/health`** — liveness probe (is the process up and connected?) +- **`/ready`** — readiness probe (can the app accept work yet?) +- **`/metrics`** — Prometheus scrape endpoint for all Genesis metrics + +The server listens on port **8000** by default. Change it with the +`GENESIS_OBSERVABILITY_PORT` environment variable. + +How it starts depends on how you run Genesis: + +- **CLI** (`genesis consumer` / `genesis outbound`): the server starts + automatically. +- **Library**: you start the server yourself (see [Library](#library) below). ## Endpoints diff --git a/docs/content/docs/Observability/tracing.md b/docs/content/docs/Observability/tracing.md index 1f94659..8d9a7e0 100644 --- a/docs/content/docs/Observability/tracing.md +++ b/docs/content/docs/Observability/tracing.md @@ -73,6 +73,125 @@ Genesis automatically creates spans for the following operations: - Description: Ringing a group of destinations - Attributes: `ring_group.mode`, `ring_group.size`, `ring_group.timeout`, `ring_group.has_balancer`, `ring_group.has_variables`, `ring_group.balanced`, `ring_group.result`, `ring_group.duration`, `ring_group.answered_uuid`, `ring_group.answered_dial_path`, `ring_group.error` (if error) +**ESL Channel Lifecycle Spans (`freeswitch.channel.*`):** + +These spans follow a call across its FreeSWITCH lifecycle, from channel +creation to destruction. They carry the channel UUIDs and the SIP correlation +key on the span (see [Cross-system correlation](#cross-system-correlation-sipcall_id)). + +- **`freeswitch.channel.create`** + - Description: A new channel was created + - Attributes: `channel.uuid`, `channel.call_uuid`, `channel.direction`, `sip.call_id`, `channel.destination_number`, `channel.context` + +- **`freeswitch.channel.progress`** / **`freeswitch.channel.progress_media`** + - Description: The call is progressing / early media is flowing + - Attributes: `channel.state`, `answer.state`, codec names + +- **`freeswitch.channel.answer`** + - Description: The call was answered + - Attributes: `channel.state`, `answer.state`, codec names + +- **`freeswitch.channel.bridge`** + - Description: Two channels were bridged together + - Attributes: `bridge.a_uuid`, `bridge.b_uuid`, `other_leg.*` + - Events: `bridge.established` + +- **`freeswitch.channel.unbridge`** + - Description: The bridge between two channels was torn down + - Attributes: `bridge.a_uuid`, `bridge.b_uuid`, `hangup.cause` + - Events: `bridge.torn_down` + +- **`freeswitch.channel.hangup`** + - Description: The channel is hanging up + - Attributes: `hangup.cause`, `channel.state` + - Events: `hangup.cause.` + +- **`freeswitch.channel.hangup_complete`** + - Description: Hangup is complete and the call is finalized + - Attributes: `hangup.cause`, `hangup.cause.q850` + - Events: `call.finalized` + +- **`freeswitch.channel.destroy`** + - Description: The channel was destroyed + - Attributes: `channel.uuid`, `sip.call_id` + +- **`freeswitch.channel.execute`** / **`freeswitch.channel.execute_complete`** + - Description: A dialplan application started / finished executing + - Attributes: `application.name`, `application.uuid`, `application.data` / `application.response` + - Events: `app..done` + +- **`freeswitch.channel.codec`** + - Description: The channel negotiated (or renegotiated) its codecs + - Attributes: `channel.read_codec.*`, `channel.write_codec.*` + +- **`freeswitch.call.update`** + - Description: The caller ID or bridged state changed + - Attributes: `bridged.to`, `caller.transfer_source` + - Events: `caller_id.mutated` + +**CUSTOM Subclass Spans:** + +These spans cover the `CUSTOM` event subclasses FreeSWITCH emits for +transfers, registrations, callcenter, conference and valet parking. + +- **`freeswitch.sofia.transfer`** + - Description: A call transfer was observed + - Attributes: `transfer.role` (`transferor` / `transferee`), `transfer.type` (`blind` / `attended`) + - Events: `transfer.initiated` + +- **`freeswitch.sofia.register`** / **`freeswitch.sofia.reinvite`** / **`freeswitch.sofia.replaced`** + - Description: A SIP registration, reinvite or replace was observed + - Attributes: `register.aor`, `register.action`, `gateway.name` / `gateway.state`, `sofia.profile` + +- **`freeswitch.callcenter.info`** + - Description: A callcenter queue event + - Attributes: `cc.queue`, `cc.action`, `cc.agent`, `cc.member_uuid`, `cc.count`, `cc.selection` + +- **`freeswitch.conference.maintenance`** / **`freeswitch.conference.cdr`** + - Description: A conference maintenance or CDR event + - Attributes: `conference.name`, `conference.profile`, `conference.action`, `conference.member_id` + +- **`freeswitch.valet.info`** + - Description: A valet parking event + - Attributes: `valet.lot`, `valet.extension`, `valet.action`, `bridge.to_uuid` + +**Session / Consumer / Queue Spans:** + +- **`session.sendmsg`** (`Session` module) + - Description: A `sendmsg` command was sent through a session + - Attributes: `channel.uuid`, `application.name`, `application.uuid`, `application.block` + +- **`session.await_complete`** (`Session` module) + - Description: Waits for a blocking `sendmsg` to complete (child of `session.sendmsg` when `block=True`) + - Attributes: `channel.uuid`, `application.uuid` + +- **`consumer.start`** / **`consumer.stop`** (`Consumer` module) + - Description: The consumer subscribed to events / stopped + - Attributes: `consumer.host`, `consumer.port` + +- **`queue.wait_and_acquire`** (`Queue` module) + - Description: Waiting to acquire an item from the queue + - Attributes: `queue.id`, `queue.item_id`, `queue.depth` (span attribute, not a metric label) + +## Cross-system correlation (sip.call_id) + +Every `freeswitch.channel.*` span carries **`sip.call_id`**, taken from the ESL +`variable_sip_call_id` header. This is the standard SIP `Call-ID` header, a +stable per-call identifier that any other SIP observer of the same call will +also have. That makes it a natural join key when you want to correlate Genesis +traces with traces from another system that observed the same call. + +- The join happens **at the observability backend** (Grafana/Tempo or similar), + by filtering or grouping on `sip.call_id` — not in code. +- Cross-leg grouping: bridge spans carry **`bridge.a_uuid`** and + **`bridge.b_uuid`**, so the a-leg and b-leg of a call can be tied together. +- The `genesis.events.without_sip_call_id` metric counts channel events that + arrived without the correlation key — a signal that those calls cannot be + joined to another system's view. + +The lifecycle/CUSTOM processors are on by default. Opt out with +`GENESIS_TRACE_ESL_LIFECYCLE=0` or `GENESIS_TRACE_CUSTOM_SUBCLASSES=0`. + ## Configuration Install the OpenTelemetry SDK: diff --git a/genesis/channel.py b/genesis/channel.py index 80cb6c1..178ddfe 100644 --- a/genesis/channel.py +++ b/genesis/channel.py @@ -15,66 +15,34 @@ import time from asyncio import Event, wait_for, TimeoutError as AsyncioTimeoutError -from opentelemetry import trace, metrics +from opentelemetry import trace from genesis.protocol import Protocol from genesis.session import Session from genesis.inbound import Inbound from genesis.protocol.parser import ESLEvent +from genesis.protocol.metrics import ( + channel_operations_counter, + channel_operation_duration, + hangup_causes_counter, + bridge_operations_counter, + dtmf_received_counter, + call_duration_histogram, + timeout_counter, +) from genesis.types import HangupCause, ChannelState, ContextType from genesis.exceptions import ChannelError, TimeoutError from genesis.observability import logger tracer = trace.get_tracer(__name__) -meter = metrics.get_meter(__name__) - -# Define metrics here to avoid circular imports -channel_operations_counter = meter.create_counter( - "genesis.channel.operations", - description="Number of channel operations", - unit="1", -) - -channel_operation_duration = meter.create_histogram( - "genesis.channel.operation.duration", - description="Duration of channel operations", - unit="s", -) - -hangup_causes_counter = meter.create_counter( - "genesis.channel.hangup.causes", - description="Hangup causes", - unit="1", -) - -bridge_operations_counter = meter.create_counter( - "genesis.channel.bridge.operations", - description="Bridge operations", - unit="1", -) - -dtmf_received_counter = meter.create_counter( - "genesis.channel.dtmf.received", - description="DTMF digits received", - unit="1", -) - -call_duration_histogram = meter.create_histogram( - "genesis.call.duration", - description="Total call duration from creation to hangup", - unit="s", -) - -timeout_counter = meter.create_counter( - "genesis.timeouts", - description="Number of timeouts", - unit="1", -) # Span/attribute names (S1192: avoid duplicated literals) ATTR_CHANNEL_UUID = "channel.uuid" +ATTR_CHANNEL_CALL_UUID = "channel.call_uuid" +ATTR_SIP_CALL_ID = "sip.call_id" ATTR_CHANNEL_STATE = "channel.state" ATTR_HANGUP_CAUSE = "hangup.cause" +ATTR_HANGUP_CAUSE_Q850 = "hangup.cause.q850" ATTR_WAIT_TYPE = "wait.type" ATTR_WAIT_RESULT = "wait.result" ATTR_WAIT_DURATION = "wait.duration" @@ -157,6 +125,14 @@ async def create( raise ChannelError("Failed to retrieve UUID from FreeSWITCH") self.uuid = response.body.strip() span.set_attribute(ATTR_CHANNEL_UUID, self.uuid) + # channel.call_uuid groups a-leg/b-leg within the Genesis trace; + # at originate time it equals the origination UUID. + span.set_attribute(ATTR_CHANNEL_CALL_UUID, self.uuid) + # sip.call_id is the standard SIP Call-ID and the cross-system + # join key. It is usually not known yet at originate; attach when present. + sip_call_id = _context_str(self.context, "variable_sip_call_id") + if sip_call_id: + span.set_attribute(ATTR_SIP_CALL_ID, sip_call_id) self.protocol.on("CHANNEL_STATE", self._state_handler) await self.protocol.send(f"filter Unique-ID {self.uuid}") @@ -568,9 +544,25 @@ async def hangup(self, cause: HangupCause = "NORMAL_CLEARING") -> ESLEvent: def on_success(span: Any, result: ESLEvent, duration: float) -> None: hangup_causes_counter.add(1, attributes={ATTR_HANGUP_CAUSE: cause}) + # Q.850 code (authoritative) when FreeSWITCH exposed it on the leg. + q850 = _context_str(self.context, "variable_hangup_cause_q850") + if q850: + span.set_attribute(ATTR_HANGUP_CAUSE_Q850, q850) + # call.duration recorded with low-cardinality attrs so it can be + # partitioned by cause/direction (NO UUID: cardinality rule). if call_duration is not None: span.set_attribute("call.duration", call_duration) - call_duration_histogram.record(call_duration) + direction = _context_str(self.context, "Call-Direction") or "unknown" + call_duration_histogram.record( + call_duration, + attributes={ATTR_HANGUP_CAUSE: cause, "direction": direction}, + ) + # Mark the command-side hangup span; the authoritative marker comes + # from CHANNEL_HANGUP_COMPLETE (see channel_lifecycle_processor). + span.add_event( + "hangup.command_issued", + attributes={ATTR_HANGUP_CAUSE: cause}, + ) def on_error(exc: Exception) -> None: hangup_causes_counter.add( @@ -616,6 +608,24 @@ async def bridge(self, other: Channel | Session) -> ESLEvent: def on_success(span: Any, result: ESLEvent, duration: float) -> None: success = result.get("Reply-Text", "").startswith("+OK") bridge_operations_counter.add(1, attributes={"success": str(success)}) + # Correlation attrs: a/b leg UUIDs let the backend cross the two + # SIP dialogs of a bridged call (each leg has its own sip.call_id). + span.set_attribute("bridge.a_uuid", self.uuid or "unknown") + span.set_attribute("bridge.b_uuid", other_uuid or "unknown") + call_uuid = _context_str(self.context, "Channel-Call-UUID") or ( + self.uuid or "unknown" + ) + span.set_attribute(ATTR_CHANNEL_CALL_UUID, call_uuid) + sip_call_id = _context_str(self.context, "variable_sip_call_id") + if sip_call_id: + span.set_attribute(ATTR_SIP_CALL_ID, sip_call_id) + span.add_event( + "bridge.command_issued", + attributes={ + "bridge.a_uuid": self.uuid or "unknown", + "bridge.b_uuid": other_uuid or "unknown", + }, + ) def on_error(exc: Exception) -> None: bridge_operations_counter.add( diff --git a/genesis/consumer.py b/genesis/consumer.py index d4dcc43..848d5a3 100644 --- a/genesis/consumer.py +++ b/genesis/consumer.py @@ -9,9 +9,13 @@ import re from typing import Any, Callable, Optional +from opentelemetry import trace + from genesis.inbound import Inbound from genesis.observability import logger, observability +tracer = trace.get_tracer(__name__) + async def _invoke_maybe_coro(func: Callable[..., Any], message: Any) -> Any: """Invoke handler and await if it returns a coroutine.""" @@ -131,15 +135,25 @@ async def start(self) -> None: self.protocol.on("HEARTBEAT", observability.record_heartbeat) async with self.protocol as protocol: - logger.debug("Asking freeswitch to send us all events.") - await protocol.send("events plain ALL") - - for event in protocol.handlers.keys(): - logger.debug( - "Requesting freeswitch to filter events of type '%s'.", - event, - ) - await protocol.send(self._filter_command(event)) + # The consumer.start span wraps only the setup phase (auth, + # events subscription, filter registration) so it finalizes + # promptly and is observable; the blocking wait() runs outside. + with tracer.start_as_current_span( + "consumer.start", + attributes={ + "consumer.host": self.host, + "consumer.port": self.port, + }, + ): + logger.debug("Asking freeswitch to send us all events.") + await protocol.send("events plain ALL") + + for event in protocol.handlers.keys(): + logger.debug( + "Requesting freeswitch to filter events of type '%s'.", + event, + ) + await protocol.send(self._filter_command(event)) await self.wait() @@ -148,4 +162,5 @@ async def start(self) -> None: raise async def stop(self) -> None: - await self.protocol.stop() + with tracer.start_as_current_span("consumer.stop"): + await self.protocol.stop() diff --git a/genesis/group/ring.py b/genesis/group/ring.py index 97095da..4d29956 100644 --- a/genesis/group/ring.py +++ b/genesis/group/ring.py @@ -19,10 +19,18 @@ from genesis.types import ChannelState, HangupCause from genesis.exceptions import TimeoutError from genesis.group.load_balancer import LoadBalancerBackend +from genesis.protocol.metrics import ( + loadbalancer_selections_counter, + loadbalancer_errors_counter, + safe_add, +) tracer = trace.get_tracer(__name__) meter = metrics.get_meter(__name__) +# Repeated metric attribute key (centralised so Sonar S1192 stays quiet). +_ATTR_LB_BACKEND = "loadbalancer.backend" + # Ring group metrics ring_group_operations_counter = meter.create_counter( "genesis.ring_group.operations", @@ -145,6 +153,14 @@ async def ring( balancer is not None and mode == RingMode.BALANCING ), "ring_group.has_variables": str(variables is not None), + "ring_group.balancer_backend": ( + type(balancer).__name__ + if balancer is not None and mode == RingMode.BALANCING + else "none" + ), + "ring_group.context": ( + variables.get("user_context", "unknown") if variables else "unknown" + ), }, ) as span: try: @@ -164,6 +180,16 @@ async def ring( span.set_attribute( "ring_group.answered_dial_path", answered.dial_path ) + span.set_attribute( + "ring_group.selected_dial_path", answered.dial_path + ) + span.add_event( + "ring_group.leg_answered", + attributes={ + "ring_group.answered_uuid": answered.uuid or "unknown", + "ring_group.selected_dial_path": answered.dial_path, + }, + ) # Record metrics ring_group_operations_counter.add( @@ -203,6 +229,7 @@ async def ring( span.set_attribute("ring_group.error", str(e)) span.set_attribute("ring_group.duration", duration) span.record_exception(e) + span.set_status(trace.Status(trace.StatusCode.ERROR, str(e))) ring_group_results_counter.add( 1, @@ -332,10 +359,40 @@ async def _ring_balancing( """Ring destinations sequentially using load balancing, return first to answer.""" remaining = list(group) + backend_name = type(balancer).__name__ while remaining: - least_loaded = await balancer.get_least_loaded(remaining) + try: + least_loaded = await balancer.get_least_loaded(remaining) + except Exception as e: + safe_add( + loadbalancer_errors_counter, + 1, + attributes={ + _ATTR_LB_BACKEND: backend_name, + "error": type(e).__name__, + }, + ) + least_loaded = None + if not least_loaded: least_loaded = remaining[0] + safe_add( + loadbalancer_selections_counter, + 1, + attributes={ + _ATTR_LB_BACKEND: backend_name, + "loadbalancer.result": "fallback", + }, + ) + else: + safe_add( + loadbalancer_selections_counter, + 1, + attributes={ + _ATTR_LB_BACKEND: backend_name, + "loadbalancer.result": "selected", + }, + ) await balancer.increment(least_loaded) diff --git a/genesis/inbound.py b/genesis/inbound.py index 2c7b8ae..a114962 100644 --- a/genesis/inbound.py +++ b/genesis/inbound.py @@ -8,33 +8,18 @@ from asyncio import TimeoutError, open_connection, wait_for -from opentelemetry import metrics, trace +from opentelemetry import trace from genesis.exceptions import AuthenticationError, ConnectionTimeoutError from genesis.observability import logger from genesis.protocol import Protocol - -tracer = trace.get_tracer(__name__) -meter = metrics.get_meter(__name__) - -active_connections_counter = meter.create_up_down_counter( - "genesis.connections.active", - description="Number of active connections", - unit="1", -) -connection_errors_counter = meter.create_counter( - "genesis.connections.errors", - description="Number of connection errors", - unit="1", +from genesis.protocol.metrics import ( + connection_errors_counter, + connections_active_counter, + safe_add, ) - -def _safe_connection_metric(counter: object, *args: object, **kwargs: object) -> None: - """Add to a counter, swallowing OTel/metrics errors.""" - try: - getattr(counter, "add")(*args, **kwargs) - except Exception: - pass +tracer = trace.get_tracer(__name__) class Inbound(Protocol): @@ -84,7 +69,7 @@ async def authenticate(self) -> None: if response["Reply-Text"] != "+OK accepted": logger.debug("Freeswitch said the passed password is incorrect.") - _safe_connection_metric( + safe_add( connection_errors_counter, 1, attributes={"error": "authentication_failed", "type": "inbound"}, @@ -104,7 +89,7 @@ async def start(self) -> None: await self._connect() except TimeoutError: logger.debug("A timeout occurred when trying to connect to the freeswitch.") - _safe_connection_metric( + safe_add( connection_errors_counter, 1, attributes={"error": "timeout", "type": "inbound"}, @@ -113,9 +98,7 @@ async def start(self) -> None: await super().start() try: - _safe_connection_metric( - active_connections_counter, 1, attributes={"type": "inbound"} - ) + safe_add(connections_active_counter, 1, attributes={"type": "inbound"}) await self.authenticate() except Exception: await self.stop() @@ -124,6 +107,4 @@ async def start(self) -> None: async def stop(self) -> None: """Terminates the connection.""" await super().stop() - _safe_connection_metric( - active_connections_counter, -1, attributes={"type": "inbound"} - ) + safe_add(connections_active_counter, -1, attributes={"type": "inbound"}) diff --git a/genesis/outbound.py b/genesis/outbound.py index d2c02b2..89dbc8c 100644 --- a/genesis/outbound.py +++ b/genesis/outbound.py @@ -21,28 +21,18 @@ from collections.abc import Callable from typing import Any, Awaitable, Optional -from opentelemetry import metrics, trace +from opentelemetry import trace from genesis.observability import logger, observability from genesis.channel import Channel from genesis.session import Session - -tracer = trace.get_tracer(__name__) -meter = metrics.get_meter(__name__) - -active_connections_counter = meter.create_up_down_counter( - "genesis.connections.active", - description="Number of active connections", - unit="1", +from genesis.protocol.metrics import ( + connection_errors_counter, + connections_active_counter, + safe_add, ) - -def _safe_connection_metric(counter: object, *args: object, **kwargs: object) -> None: - """Add to a counter, swallowing OTel/metrics errors.""" - try: - getattr(counter, "add")(*args, **kwargs) - except Exception: - pass +tracer = trace.get_tracer(__name__) async def _setup_session(session: Session, server: "Outbound") -> None: @@ -159,9 +149,9 @@ async def handler( "net.peer.name": server.host, "net.peer.port": server.port, }, - ): - _safe_connection_metric( - active_connections_counter, + ) as span: + safe_add( + connections_active_counter, 1, attributes={"type": "outbound"}, ) @@ -169,9 +159,20 @@ async def handler( async with Session(reader, writer) as session: await _setup_session(session, server) await server.app(session) + except Exception as e: + # Record the outbound connection error (gap from mapping: + # outbound previously had no error counter). + safe_add( + connection_errors_counter, + 1, + attributes={"error": type(e).__name__, "type": "outbound"}, + ) + span.record_exception(e) + span.set_status(trace.Status(trace.StatusCode.ERROR, str(e))) + raise finally: - _safe_connection_metric( - active_connections_counter, + safe_add( + connections_active_counter, -1, attributes={"type": "outbound"}, ) diff --git a/genesis/protocol/base.py b/genesis/protocol/base.py index ed5412e..2da44f7 100644 --- a/genesis/protocol/base.py +++ b/genesis/protocol/base.py @@ -24,7 +24,7 @@ from opentelemetry import trace -from genesis.exceptions import ConnectionError, UnconnectedError +from genesis.exceptions import ConnectionError, GenesisError, UnconnectedError from genesis.observability import logger, TRACE_LEVEL_NUM from genesis.protocol.parser import ESLEvent, parse_headers from genesis.protocol.reader_fsm import ESLReaderFSM @@ -43,6 +43,9 @@ timeout_counter, channel_routing_counter, global_routing_counter, + event_processing_duration, + register_protocol, + safe_record, ) from genesis.protocol.routing import ( CompositeRoutingStrategy, @@ -73,6 +76,8 @@ def __init__(self): self.handlers: Dict[str, List[EventHandler]] = {} self.channel_registry: Dict[str, List[EventHandler]] = {} self.handler_tasks: set[Task[Any]] = set() + # Register so the ObservableGauges can report this protocol's queue depth. + register_protocol(self) # Initialize routing strategy (Strategy Pattern) self.routing_strategy = CompositeRoutingStrategy( @@ -195,24 +200,42 @@ async def consume(self) -> None: logger.error(f"Error in consumer loop: {outer_e}", exc_info=True) async def _process_one_event(self, event: ESLEvent) -> None: - """Run telemetry, processors, and dispatch for one event.""" + """Run telemetry, processors, and dispatch for one event. + + The ``process_event`` span wraps metrics+logging AND the processor + chain + routing, so the new ``freeswitch.channel.*`` lifecycle spans + (emitted by processors) become children of it and share its trace. + """ try: attributes = build_event_attributes(event) - with tracer.start_as_current_span("process_event", attributes=attributes): - record_event_metrics(event) - log_event(event) except Exception: - record_event_metrics(event) - log_event(event) + attributes = {} - for processor in self.event_processors: - result = processor(self, event) - if asyncio.iscoroutine(result): - await result + start_time = time.perf_counter() + with tracer.start_as_current_span("process_event", attributes=attributes): + try: + record_event_metrics(event) + log_event(event) + except Exception: + record_event_metrics(event) + log_event(event) - handlers, _ = await self.routing_strategy.route(event) - if handlers: - dispatch_to_handlers(handlers, event, self.handler_tasks) + for processor in self.event_processors: + result = processor(self, event) + if asyncio.iscoroutine(result): + await result + + handlers, _ = await self.routing_strategy.route(event) + if handlers: + dispatch_to_handlers(handlers, event, self.handler_tasks) + + safe_record( + event_processing_duration, + time.perf_counter() - start_time, + attributes={ + "event.name": event.get("Event-Name", "UNKNOWN"), + }, + ) def on( self, @@ -288,7 +311,12 @@ async def send(self, cmd: str) -> ESLEvent: try: with tracer.start_as_current_span("send_command") as span: - span.set_attribute("command.name", cmd) + # Use the command verb (first token) as command.name to avoid + # high-cardinality span attributes (raw cmd may carry UUIDs). + span.set_attribute("command.name", command_name) + remainder = cmd[len(command_name) :].strip() + if remainder: + span.set_attribute("command.args", remainder[:200]) return await self._execute_send(cmd, command_name, start_time, span) except Exception: # OTel not initialized - run without tracing @@ -330,6 +358,10 @@ async def _execute_send( reply = result.get("Reply-Text", "") if reply.startswith("-ERR"): self._record_command_error(command_name, "protocol_error") + if span is not None: + span.set_attribute("command.error", "protocol_error") + span.set_status(trace.Status(trace.StatusCode.ERROR, reply)) + span.record_exception(GenesisError(reply)) if span is not None: reply_text = result.get("Reply-Text") diff --git a/genesis/protocol/lifecycle.py b/genesis/protocol/lifecycle.py new file mode 100644 index 0000000..5b68e16 --- /dev/null +++ b/genesis/protocol/lifecycle.py @@ -0,0 +1,495 @@ +""" +ESL lifecycle telemetry processors +---------------------------------- + +These processors run after the core protocol processors and emit OpenTelemetry +spans for the semantic FreeSWITCH channel lifecycle (``freeswitch.channel.*``) +and for CUSTOM subclasses (``sofia::``, ``callcenter::``, ``conference::``, +``valet_parking::``). They only enrich telemetry — they never consume events +that route to user handlers. + +Correlation with another system's view of the same call is attribute-based: +every channel span carries ``sip.call_id`` (= ``variable_sip_call_id``, the +standard SIP Call-ID). Any other SIP observer of the same call will carry the +same value, so the join happens at the observability backend (Grafana/Tempo), +not in code. + +Cardinality rule: UUIDs go on spans only; metric attributes use low-cardinality +enums/labels (channel.state, direction, hangup.cause, application.name, ...). +""" + +import os +from typing import TYPE_CHECKING, Any, Dict, Optional + +from opentelemetry import trace + +from genesis.observability import logger +from genesis.protocol.parser import ESLEvent +from genesis.protocol.metrics import ( + calls_active_counter, + channel_bridge_events_counter, + channel_codec_changes_counter, + channel_transfers_counter, + dialplan_applications_counter, + events_without_sip_call_id_counter, + hangup_q850_counter, + safe_add, +) + +if TYPE_CHECKING: + from genesis.protocol.base import Protocol + +tracer = trace.get_tracer(__name__) + +# Feature flags (default on; opt-out via env). Reserved for future W3C +# propagation is intentionally NOT implemented here (out of scope). +_LIFECYCLE_ENABLED = os.environ.get("GENESIS_TRACE_ESL_LIFECYCLE", "1") != "0" +_CUSTOM_ENABLED = os.environ.get("GENESIS_TRACE_CUSTOM_SUBCLASSES", "1") != "0" + +# Repeated span/metric attribute keys (centralised so Sonar S1192 stays quiet +# and renames touch one place). +ATTR_CHANNEL_STATE = "channel.state" +ATTR_ANSWER_STATE = "answer.state" +ATTR_READ_CODEC = "channel.read_codec" +ATTR_WRITE_CODEC = "channel.write_codec" +ATTR_BRIDGE_A_UUID = "bridge.a_uuid" +ATTR_BRIDGE_B_UUID = "bridge.b_uuid" +ATTR_HANGUP_CAUSE = "hangup.cause" +ATTR_APPLICATION_NAME = "application.name" +ATTR_APPLICATION_RESULT = "application.result" +ATTR_TRANSFER_ROLE = "transfer.role" +ATTR_TRANSFER_TYPE = "transfer.type" + + +def _str(event: ESLEvent, key: str) -> Optional[str]: + """Return a single string value for key (list-aware), or None.""" + value = event.get(key) + if value is None: + return None + if isinstance(value, list): + return value[0] if value else None + return value if isinstance(value, str) else str(value) + + +def _set(attrs: Dict[str, Any], dst: str, event: ESLEvent, src: str) -> None: + """Copy event[src] into attrs[dst] when present.""" + value = _str(event, src) + if value: + attrs[dst] = value + + +def _channel_attrs(event: ESLEvent) -> Dict[str, Any]: + """Common channel attributes (uuid, call_uuid, direction, sip.call_id, other_leg).""" + attrs: Dict[str, Any] = {} + _set(attrs, "channel.uuid", event, "Unique-ID") + _set(attrs, "channel.call_uuid", event, "Channel-Call-UUID") + _set(attrs, "channel.direction", event, "Call-Direction") + _set(attrs, "sip.call_id", event, "variable_sip_call_id") + _set(attrs, "other_leg.uuid", event, "Other-Leg-Unique-ID") + return attrs + + +def _record_sip_gap(event: ESLEvent, attrs: Dict[str, Any]) -> None: + """Count channel events that lack the sip.call_id correlation key.""" + if "sip.call_id" not in attrs: + safe_add(events_without_sip_call_id_counter, 1, attributes={}) + + +def _attr_span(name: str, attrs: Dict[str, Any]) -> None: + """Emit a span that exists only to carry attributes (no interior work). + + Uses ``start_span`` + explicit ``end()`` instead of an empty + ``with start_as_current_span(...): pass`` block. Parent context is resolved + the same way (from the current span at call time) and the span is exported + identically. + """ + span = tracer.start_span(name, attributes=attrs) + span.end() + + +# Event names handled by the lifecycle processor. +_LIFECYCLE_EVENTS = { + "CHANNEL_CREATE", + "CHANNEL_PROGRESS", + "CHANNEL_PROGRESS_MEDIA", + "CHANNEL_ANSWER", + "CHANNEL_BRIDGE", + "CHANNEL_UNBRIDGE", + "CHANNEL_HANGUP", + "CHANNEL_HANGUP_COMPLETE", + "CHANNEL_DESTROY", + "CHANNEL_EXECUTE", + "CHANNEL_EXECUTE_COMPLETE", + "CHANNEL_PARK", + "CHANNEL_UNPARK", + "CALL_UPDATE", + "CODEC", +} + + +def channel_lifecycle_processor(protocol: "Protocol", event: ESLEvent) -> None: + """Emit ``freeswitch.channel.*`` spans for channel lifecycle events.""" + if not _LIFECYCLE_ENABLED: + return + + name = _str(event, "Event-Name") + if not name or name not in _LIFECYCLE_EVENTS: + return + + logger.debug("lifecycle %s on %s", name, type(protocol).__name__) + + attrs = _channel_attrs(event) + _record_sip_gap(event, attrs) + + emit = _LIFECYCLE_EMITTERS.get(name) + if emit is not None: + emit(event, attrs) + elif name in ("CHANNEL_PARK", "CHANNEL_UNPARK"): + _emit_state_span(event, attrs, f"freeswitch.channel.{name.lower()[8:]}") + + +def _emit_create(event: ESLEvent, attrs: Dict[str, Any]) -> None: + _set(attrs, "channel.name", event, "Channel-Name") + _set(attrs, "channel.destination_number", event, "Caller-Destination-Number") + _set(attrs, "channel.context", event, "Caller-Context") + _set(attrs, "channel.dialplan", event, "Caller-Dialplan") + _set(attrs, "channel.caller_id_number", event, "Caller-Caller-ID-Number") + _set(attrs, "channel.caller_id_name", event, "Caller-Caller-ID-Name") + _set(attrs, "channel.network_addr", event, "Caller-Network-Addr") + with tracer.start_as_current_span("freeswitch.channel.create", attributes=attrs): + safe_add( + calls_active_counter, + 1, + attributes={ + ATTR_CHANNEL_STATE: _str(event, "Channel-State") or "CS_INIT", + "direction": _str(event, "Call-Direction") or "unknown", + }, + ) + + +def _emit_progress(event: ESLEvent, attrs: Dict[str, Any]) -> None: + _set(attrs, ATTR_CHANNEL_STATE, event, "Channel-State") + attrs[ATTR_ANSWER_STATE] = _str(event, "Answer-State") or "ringing" + _attr_span("freeswitch.channel.progress", attrs) + + +def _emit_progress_media(event: ESLEvent, attrs: Dict[str, Any]) -> None: + attrs[ATTR_ANSWER_STATE] = _str(event, "Answer-State") or "early" + _set(attrs, ATTR_READ_CODEC, event, "Channel-Read-Codec-Name") + _set(attrs, ATTR_WRITE_CODEC, event, "Channel-Write-Codec-Name") + _attr_span("freeswitch.channel.progress_media", attrs) + + +def _emit_answer(event: ESLEvent, attrs: Dict[str, Any]) -> None: + _set(attrs, ATTR_CHANNEL_STATE, event, "Channel-State") + attrs[ATTR_ANSWER_STATE] = "answered" + _set(attrs, ATTR_READ_CODEC, event, "Channel-Read-Codec-Name") + _set(attrs, ATTR_WRITE_CODEC, event, "Channel-Write-Codec-Name") + _attr_span("freeswitch.channel.answer", attrs) + + +def _emit_bridge(event: ESLEvent, attrs: Dict[str, Any]) -> None: + _set(attrs, ATTR_BRIDGE_A_UUID, event, "Bridge-A-Unique-ID") + _set(attrs, ATTR_BRIDGE_B_UUID, event, "Bridge-B-Unique-ID") + _set(attrs, "other_leg.type", event, "Other-Type") + _set(attrs, "other_leg.destination_number", event, "Other-Leg-Destination-Number") + _set(attrs, "other_leg.caller_id_number", event, "Other-Leg-Caller-ID-Number") + with tracer.start_as_current_span( + "freeswitch.channel.bridge", attributes=attrs + ) as span: + a = attrs.get(ATTR_BRIDGE_A_UUID, "unknown") + b = attrs.get(ATTR_BRIDGE_B_UUID, "unknown") + span.add_event( + "bridge.established", + attributes={ATTR_BRIDGE_A_UUID: a, ATTR_BRIDGE_B_UUID: b}, + ) + safe_add( + channel_bridge_events_counter, + 1, + attributes={"bridge.result": "established"}, + ) + + +def _emit_unbridge(event: ESLEvent, attrs: Dict[str, Any]) -> None: + _set(attrs, ATTR_BRIDGE_A_UUID, event, "Bridge-A-Unique-ID") + # CHANNEL_UNBRIDGE may carry Other-Leg-Unique-ID instead of Bridge-B. + if ATTR_BRIDGE_B_UUID not in attrs: + _set(attrs, ATTR_BRIDGE_B_UUID, event, "Other-Leg-Unique-ID") + _set(attrs, ATTR_HANGUP_CAUSE, event, "Hangup-Cause") + with tracer.start_as_current_span( + "freeswitch.channel.unbridge", attributes=attrs + ) as span: + span.add_event( + "bridge.torn_down", + attributes={ + ATTR_BRIDGE_A_UUID: attrs.get(ATTR_BRIDGE_A_UUID, "unknown"), + ATTR_BRIDGE_B_UUID: attrs.get(ATTR_BRIDGE_B_UUID, "unknown"), + }, + ) + metric_attrs: Dict[str, Any] = {"bridge.result": "unbridged"} + cause = attrs.get(ATTR_HANGUP_CAUSE) + if cause: + metric_attrs[ATTR_HANGUP_CAUSE] = cause + safe_add(channel_bridge_events_counter, 1, attributes=metric_attrs) + + +def _emit_hangup(event: ESLEvent, attrs: Dict[str, Any]) -> None: + _set(attrs, ATTR_HANGUP_CAUSE, event, "Hangup-Cause") + _set(attrs, ATTR_CHANNEL_STATE, event, "Channel-State") + attrs[ATTR_ANSWER_STATE] = "hangup" + cause = _str(event, "Hangup-Cause") or "unknown" + normalized = cause.lower().replace(" ", "_") + with tracer.start_as_current_span( + "freeswitch.channel.hangup", attributes=attrs + ) as span: + span.add_event( + f"{ATTR_HANGUP_CAUSE}.{normalized}", + attributes={ATTR_HANGUP_CAUSE: cause}, + ) + + +def _emit_hangup_complete(event: ESLEvent, attrs: Dict[str, Any]) -> None: + _set(attrs, ATTR_HANGUP_CAUSE, event, "Hangup-Cause") + _set(attrs, "hangup.cause.q850", event, "variable_hangup_cause_q850") + _set(attrs, "channel.name", event, "Channel-Name") + with tracer.start_as_current_span( + "freeswitch.channel.hangup_complete", attributes=attrs + ) as span: + span.add_event( + "call.finalized", + attributes={ATTR_HANGUP_CAUSE: attrs.get(ATTR_HANGUP_CAUSE, "unknown")}, + ) + q850 = _str(event, "variable_hangup_cause_q850") + if q850: + safe_add( + hangup_q850_counter, + 1, + attributes={"hangup.cause.q850": q850}, + ) + + +def _emit_destroy(event: ESLEvent, attrs: Dict[str, Any]) -> None: + with tracer.start_as_current_span("freeswitch.channel.destroy", attributes=attrs): + safe_add( + calls_active_counter, + -1, + attributes={ + ATTR_CHANNEL_STATE: "CS_DESTROY", + "direction": _str(event, "Call-Direction") or "unknown", + }, + ) + + +def _emit_execute(event: ESLEvent, attrs: Dict[str, Any]) -> None: + _set(attrs, ATTR_APPLICATION_NAME, event, "Application") + _set(attrs, "application.uuid", event, "Application-UUID") + _set(attrs, "application.data", event, "Application-Data") + with tracer.start_as_current_span("freeswitch.channel.execute", attributes=attrs): + app = _str(event, "Application") or "unknown" + safe_add( + dialplan_applications_counter, + 1, + attributes={ATTR_APPLICATION_NAME: app, ATTR_APPLICATION_RESULT: "started"}, + ) + + +def _emit_execute_complete(event: ESLEvent, attrs: Dict[str, Any]) -> None: + _set(attrs, ATTR_APPLICATION_NAME, event, "Application") + _set(attrs, "application.uuid", event, "Application-UUID") + _set(attrs, "application.response", event, "Application-Response") + app = _str(event, "Application") or "unknown" + response = _str(event, "Application-Response") or "" + result = "success" if response and not response.startswith("-ERR") else "fail" + with tracer.start_as_current_span( + "freeswitch.channel.execute_complete", attributes=attrs + ) as span: + span.add_event( + f"app.{app}.done", + attributes={ATTR_APPLICATION_NAME: app, ATTR_APPLICATION_RESULT: result}, + ) + safe_add( + dialplan_applications_counter, + 1, + attributes={ATTR_APPLICATION_NAME: app, ATTR_APPLICATION_RESULT: result}, + ) + + +def _emit_state_span(event: ESLEvent, attrs: Dict[str, Any], span_name: str) -> None: + _set(attrs, ATTR_CHANNEL_STATE, event, "Channel-State") + _attr_span(span_name, attrs) + + +def _emit_call_update(event: ESLEvent, attrs: Dict[str, Any]) -> None: + _set(attrs, "bridged.to", event, "Bridged-To") + _set(attrs, "caller.transfer_source", event, "Caller-Transfer-Source") + _set(attrs, "caller.orig_caller_id_number", event, "Caller-Orig-Caller-ID-Number") + with tracer.start_as_current_span( + "freeswitch.call.update", attributes=attrs + ) as span: + span.add_event("caller_id.mutated", attributes={}) + + +def _emit_codec(event: ESLEvent, attrs: Dict[str, Any]) -> None: + _set(attrs, "channel.read_codec.name", event, "Channel-Read-Codec-Name") + _set(attrs, "channel.read_codec.rate", event, "Channel-Read-Codec-Rate") + _set(attrs, "channel.write_codec.name", event, "Channel-Write-Codec-Name") + _set(attrs, "channel.write_codec.rate", event, "Channel-Write-Codec-Rate") + read_codec = _str(event, "Channel-Read-Codec-Name") or "unknown" + write_codec = _str(event, "Channel-Write-Codec-Name") or "unknown" + with tracer.start_as_current_span("freeswitch.channel.codec", attributes=attrs): + safe_add( + channel_codec_changes_counter, + 1, + attributes={ + ATTR_READ_CODEC: read_codec, + ATTR_WRITE_CODEC: write_codec, + }, + ) + + +# Dispatch table for the lifecycle events that map 1:1 to an emitter. Park / +# unpark are handled inline by the processor (parameterised span name) and so +# are intentionally absent here. +_LIFECYCLE_EMITTERS = { + "CHANNEL_CREATE": _emit_create, + "CHANNEL_PROGRESS": _emit_progress, + "CHANNEL_PROGRESS_MEDIA": _emit_progress_media, + "CHANNEL_ANSWER": _emit_answer, + "CHANNEL_BRIDGE": _emit_bridge, + "CHANNEL_UNBRIDGE": _emit_unbridge, + "CHANNEL_HANGUP": _emit_hangup, + "CHANNEL_HANGUP_COMPLETE": _emit_hangup_complete, + "CHANNEL_DESTROY": _emit_destroy, + "CHANNEL_EXECUTE": _emit_execute, + "CHANNEL_EXECUTE_COMPLETE": _emit_execute_complete, + "CALL_UPDATE": _emit_call_update, + "CODEC": _emit_codec, +} + + +# --------------------------------------------------------------------------- +# CUSTOM subclass processor +# --------------------------------------------------------------------------- +_CUSTOM_MAP = { + "sofia::transferor": "transferor", + "sofia::transferee": "transferee", + "sofia::reinvite": "reinvite", + "sofia::replaced": "replaced", + "sofia::register": "register", + "sofia::unregister": "register", + "sofia::expire": "register", + "sofia::gateway_state": "register", + "callcenter::info": "callcenter", + "conference::maintenance": "conference", + "conference::cdr": "conference", + "valet_parking::info": "valet", +} + + +def custom_subclass_processor(protocol: "Protocol", event: ESLEvent) -> None: + """Emit spans for CUSTOM subclasses (sofia/callcenter/conference/valet).""" + if not _CUSTOM_ENABLED: + return + if _str(event, "Event-Name") != "CUSTOM": + return + subclass = _str(event, "Event-Subclass") + if not subclass or subclass not in _CUSTOM_MAP: + return + + logger.debug("custom %s on %s", subclass, type(protocol).__name__) + + attrs = _channel_attrs(event) + kind = _CUSTOM_MAP[subclass] + + if kind in ("transferor", "transferee"): + _emit_transfer(event, attrs, kind) + elif kind in ("reinvite", "replaced"): + _emit_reinvite(event, attrs, kind) + elif kind == "register": + _emit_register(event, attrs, subclass) + elif kind == "callcenter": + _emit_callcenter(event, attrs) + elif kind == "conference": + _emit_conference(event, attrs, subclass) + elif kind == "valet": + _emit_valet(event, attrs) + + +def _emit_transfer(event: ESLEvent, attrs: Dict[str, Any], role: str) -> None: + attrs[ATTR_TRANSFER_ROLE] = role + # Heuristic: transferee only occurs in attended transfers; a lone + # transferor is typically a blind transfer. + attrs[ATTR_TRANSFER_TYPE] = "attended" if role == "transferee" else "blind" + _set(attrs, "sofia.profile", event, "variable_sofia_profile_name") + with tracer.start_as_current_span( + "freeswitch.sofia.transfer", attributes=attrs + ) as span: + span.add_event( + "transfer.initiated", + attributes={ + ATTR_TRANSFER_ROLE: role, + ATTR_TRANSFER_TYPE: attrs[ATTR_TRANSFER_TYPE], + }, + ) + safe_add( + channel_transfers_counter, + 1, + attributes={ + ATTR_TRANSFER_TYPE: attrs[ATTR_TRANSFER_TYPE], + ATTR_TRANSFER_ROLE: role, + }, + ) + + +def _emit_reinvite(event: ESLEvent, attrs: Dict[str, Any], kind: str) -> None: + _set(attrs, "sofia.profile", event, "variable_sofia_profile_name") + with tracer.start_as_current_span( + f"freeswitch.sofia.{kind}", attributes=attrs + ) as span: + span.add_event("media.renegotiated", attributes={}) + + +def _emit_register(event: ESLEvent, attrs: Dict[str, Any], subclass: str) -> None: + from_user = _str(event, "from-user") + from_host = _str(event, "from-host") + if from_user and from_host: + attrs["register.aor"] = f"{from_user}@{from_host}" + _set(attrs, "register.contact_ip", event, "contact") + _set(attrs, "register.expires_s", event, "expires") + _set(attrs, "register.response_code", event, "status") + _set(attrs, "gateway.name", event, "Gateway-Name") + _set(attrs, "gateway.state", event, "State") + attrs["register.action"] = subclass.split("::")[1] + _attr_span("freeswitch.sofia.register", attrs) + + +def _emit_callcenter(event: ESLEvent, attrs: Dict[str, Any]) -> None: + _set(attrs, "cc.queue", event, "CC-Queue") + _set(attrs, "cc.action", event, "CC-Action") + _set(attrs, "cc.agent", event, "CC-Agent") + _set(attrs, "cc.member_uuid", event, "CC-Member-UUID") + _set(attrs, "cc.count", event, "CC-Count") + _set(attrs, "cc.selection", event, "CC-Selection") + _attr_span("freeswitch.callcenter.info", attrs) + + +def _emit_conference(event: ESLEvent, attrs: Dict[str, Any], subclass: str) -> None: + _set(attrs, "conference.name", event, "Conference-Name") + _set(attrs, "conference.profile", event, "Conference-Profile") + _set(attrs, "conference.action", event, "Action") + _set(attrs, "conference.member_id", event, "Member-ID") + _set(attrs, "old.member_id", event, "Old-Member-ID") + span_name = ( + "freeswitch.conference.cdr" + if subclass == "conference::cdr" + else "freeswitch.conference.maintenance" + ) + _attr_span(span_name, attrs) + + +def _emit_valet(event: ESLEvent, attrs: Dict[str, Any]) -> None: + _set(attrs, "valet.lot", event, "Valet-Lot-Name") + _set(attrs, "valet.extension", event, "Valet-Extension") + _set(attrs, "valet.action", event, "Action") + _set(attrs, "bridge.to_uuid", event, "Bridge-To-UUID") + _attr_span("freeswitch.valet.info", attrs) diff --git a/genesis/protocol/metrics.py b/genesis/protocol/metrics.py index 319fdb9..9ec1a28 100644 --- a/genesis/protocol/metrics.py +++ b/genesis/protocol/metrics.py @@ -2,15 +2,25 @@ Metrics definitions for Protocol operations. This module centralizes all OpenTelemetry metrics used by the Protocol -and related components (Channel, Session, etc.). +and related components (Channel, Session, Inbound, Outbound, etc.). + +Centralization avoids duplicated instrument definitions (which both trip +static analysis and produce OTel SDK warnings when the same metric name is +created with different descriptions in multiple modules). """ +import weakref +from typing import Any, Iterable + from opentelemetry import trace, metrics +from opentelemetry.metrics import Observation tracer = trace.get_tracer(__name__) meter = metrics.get_meter(__name__) +# --------------------------------------------------------------------------- # Command metrics +# --------------------------------------------------------------------------- commands_sent_counter = meter.create_counter( "genesis.commands.sent", description="Number of ESL commands sent", @@ -35,7 +45,9 @@ unit="1", ) +# --------------------------------------------------------------------------- # Channel operation metrics +# --------------------------------------------------------------------------- channel_operations_counter = meter.create_counter( "genesis.channel.operations", description="Number of channel operations", @@ -78,7 +90,9 @@ unit="1", ) +# --------------------------------------------------------------------------- # Routing metrics (for O(1) event routing) +# --------------------------------------------------------------------------- channel_routing_counter = meter.create_counter( "genesis.channel.routing.hits", description="Number of O(1) channel routing hits", @@ -90,3 +104,208 @@ description="Number of fallback to O(N) global routing", unit="1", ) + +# --------------------------------------------------------------------------- +# Connection metrics (shared by Inbound and Outbound) +# --------------------------------------------------------------------------- +connections_active_counter = meter.create_up_down_counter( + "genesis.connections.active", + description="Number of active connections", + unit="1", +) + +connection_errors_counter = meter.create_counter( + "genesis.connections.errors", + description="Number of connection errors", + unit="1", +) + +# --------------------------------------------------------------------------- +# New ESL lifecycle / routing correlation metrics +# --------------------------------------------------------------------------- +# Cardinality rule: metric attributes NEVER carry UUIDs; only low-cardinality +# enums/labels (channel.state, direction, hangup.cause, application.name, ...). +# UUIDs go on spans only. +calls_active_counter = meter.create_up_down_counter( + "genesis.calls.active", + description="Number of active calls by state and direction", + unit="1", +) + +channel_bridge_events_counter = meter.create_counter( + "genesis.channel.bridge.events", + description="ESL CHANNEL_BRIDGE/UNBRIDGE events (authoritative bridge state)", + unit="1", +) + +channel_transfers_counter = meter.create_counter( + "genesis.channel.transfers", + description="Call transfers observed via sofia::transferor/transferee", + unit="1", +) + +channel_codec_changes_counter = meter.create_counter( + "genesis.channel.codec.changes", + description="Codec renegotiations observed via CODEC events", + unit="1", +) + +dialplan_applications_counter = meter.create_counter( + "genesis.dialplan.applications", + description="Dialplan applications executed (CHANNEL_EXECUTE[_COMPLETE])", + unit="1", +) + +hangup_q850_counter = meter.create_counter( + "genesis.channel.hangup.causes.q850", + description="Hangup causes by Q.850 code", + unit="1", +) + +event_processing_duration = meter.create_histogram( + "genesis.event.processing.duration", + description="Duration of event dispatch (processors + routing)", + unit="s", +) + +events_without_sip_call_id_counter = meter.create_counter( + "genesis.events.without_sip_call_id", + description="Channel events lacking variable_sip_call_id (correlation gap)", + unit="1", +) + +# --------------------------------------------------------------------------- +# Session / consumer / load balancer metrics +# --------------------------------------------------------------------------- +session_commands_counter = meter.create_counter( + "genesis.session.commands", + description="Session sendmsg commands by application", + unit="1", +) + +session_command_duration = meter.create_histogram( + "genesis.session.command.duration", + description="Duration of session sendmsg commands", + unit="s", +) + +consumer_handlers_counter = meter.create_counter( + "genesis.consumer.handlers", + description="Consumer handler invocations by event and match result", + unit="1", +) + +loadbalancer_selections_counter = meter.create_counter( + "genesis.loadbalancer.selections", + description="Load balancer selections by backend and result", + unit="1", +) + +loadbalancer_errors_counter = meter.create_counter( + "genesis.loadbalancer.errors", + description="Load balancer errors by error type", + unit="1", +) + +# --------------------------------------------------------------------------- +# Observable gauges for queue depth (backpressure visibility) +# --------------------------------------------------------------------------- +# Protocols register themselves (weakly) so the gauge callbacks can sum the +# pending events/commands across all live instances without holding them alive. +_protocol_registry: "weakref.WeakSet[Any]" = weakref.WeakSet() + + +def register_protocol(protocol: Any) -> None: + """Register a Protocol instance so its queue depths feed the gauges.""" + _protocol_registry.add(protocol) + + +def _commands_queue_depth(_options: Any) -> Iterable[Observation]: + total = 0 + for proto in tuple(_protocol_registry): + try: + total += proto.commands.qsize() + except Exception: + pass + yield Observation(total, {}) + + +def _events_queue_depth(_options: Any) -> Iterable[Observation]: + total = 0 + for proto in tuple(_protocol_registry): + try: + total += proto.events.qsize() + except Exception: + pass + yield Observation(total, {}) + + +commands_queue_depth_gauge = meter.create_observable_gauge( + "genesis.commands.queue.depth", + callbacks=[_commands_queue_depth], + description="Depth of the pending command reply queue", + unit="1", +) + +events_queue_depth_gauge = meter.create_observable_gauge( + "genesis.events.queue.depth", + callbacks=[_events_queue_depth], + description="Depth of the pending event queue", + unit="1", +) + + +def safe_add(counter: Any, *args: Any, **kwargs: Any) -> None: + """Add to a counter, swallowing OTel/metrics errors (best-effort).""" + try: + getattr(counter, "add")(*args, **kwargs) + except Exception: + pass + + +def safe_record(histogram: Any, *args: Any, **kwargs: Any) -> None: + """Record on a histogram, swallowing OTel/metrics errors (best-effort).""" + try: + getattr(histogram, "record")(*args, **kwargs) + except Exception: + pass + + +# Re-export for callers that import a batch of instruments (kept alphabetical). +__all__ = [ + "tracer", + "meter", + "commands_sent_counter", + "events_received_counter", + "command_duration_histogram", + "command_errors_counter", + "channel_operations_counter", + "channel_operation_duration", + "hangup_causes_counter", + "bridge_operations_counter", + "dtmf_received_counter", + "call_duration_histogram", + "timeout_counter", + "channel_routing_counter", + "global_routing_counter", + "connections_active_counter", + "connection_errors_counter", + "calls_active_counter", + "channel_bridge_events_counter", + "channel_transfers_counter", + "channel_codec_changes_counter", + "dialplan_applications_counter", + "hangup_q850_counter", + "event_processing_duration", + "events_without_sip_call_id_counter", + "session_commands_counter", + "session_command_duration", + "consumer_handlers_counter", + "loadbalancer_selections_counter", + "loadbalancer_errors_counter", + "commands_queue_depth_gauge", + "events_queue_depth_gauge", + "register_protocol", + "safe_add", + "safe_record", +] diff --git a/genesis/protocol/processors.py b/genesis/protocol/processors.py index f7e2e68..ce46cf4 100644 --- a/genesis/protocol/processors.py +++ b/genesis/protocol/processors.py @@ -9,6 +9,10 @@ from typing import TYPE_CHECKING, List, Callable, Awaitable, Union from genesis.protocol.parser import ESLEvent +from genesis.protocol.lifecycle import ( + channel_lifecycle_processor, + custom_subclass_processor, +) if TYPE_CHECKING: from genesis.protocol.base import Protocol @@ -47,10 +51,17 @@ async def disconnect_processor(protocol: "Protocol", event: ESLEvent) -> None: def default_processors() -> List[EventProcessor]: - """Return the default list of event processors (order matters).""" + """Return the default list of event processors (order matters). + + Lifecycle/CUSTOM processors run last so they never interfere with the + core protocol adapters (auth, command reply, disconnect). They only emit + telemetry — they do not consume events routed to user handlers. + """ return [ auth_request_processor, command_reply_processor, api_response_processor, disconnect_processor, + channel_lifecycle_processor, + custom_subclass_processor, ] diff --git a/genesis/protocol/routing/dispatcher.py b/genesis/protocol/routing/dispatcher.py index 8f47131..485b873 100644 --- a/genesis/protocol/routing/dispatcher.py +++ b/genesis/protocol/routing/dispatcher.py @@ -10,6 +10,7 @@ from genesis.observability import logger from genesis.protocol.parser import ESLEvent +from genesis.protocol.metrics import consumer_handlers_counter, safe_add from genesis.types import EventHandler @@ -31,7 +32,15 @@ def dispatch_to_handlers( event: The ESL event to dispatch task_set: Optional set to track live tasks (prevents GC and logs exceptions) """ + event_name = event.get("Event-Name", "UNKNOWN") + if isinstance(event_name, list): + event_name = event_name[0] if event_name else "UNKNOWN" for handler in handlers: + safe_add( + consumer_handlers_counter, + 1, + attributes={"event.name": str(event_name)}, + ) if iscoroutinefunction(handler): task = create_task(handler(event)) else: diff --git a/genesis/protocol/telemetry.py b/genesis/protocol/telemetry.py index 13f3c03..10ee338 100644 --- a/genesis/protocol/telemetry.py +++ b/genesis/protocol/telemetry.py @@ -11,6 +11,36 @@ from genesis.protocol.metrics import tracer, events_received_counter from genesis.observability import logger, TRACE_LEVEL_NUM +_EXPLICIT_ATTRS = { + "Call-Direction": "event.direction", + "Channel-State": "event.channel_state", + "Answer-State": "event.answer_state", + "Hangup-Cause": "event.hangup_cause", + "Event-Subclass": "event.subclass", + "Channel-Call-UUID": "event.call_uuid", + "Other-Leg-Unique-ID": "event.other_leg", + "Caller-Context": "event.context", + "Caller-Destination-Number": "event.destination_number", +} + + +def _header_attr_name(key: str) -> str: + """Map an ESL header key to its OpenTelemetry attribute name.""" + if key == "Event-Name": + return "event.name" + if key == "Unique-ID": + return "event.uuid" + if key == "Content-Type": + return "event.content_type" + return f"event.header.{key.lower().replace('-', '_')}" + + +def _scalar(value: Any) -> Any: + """Collapse a single-element list to its element; pass other values through.""" + if isinstance(value, list): + return value[0] if value else "" + return value + def build_event_attributes(event: ESLEvent) -> Dict[str, Any]: """Build OpenTelemetry attributes from an ESL event. @@ -21,21 +51,23 @@ def build_event_attributes(event: ESLEvent) -> Dict[str, Any]: Returns: Dictionary of attributes suitable for OTel spans and metrics """ - attributes = {} + attributes: Dict[str, Any] = {} for key, value in event.items(): - if key == "Event-Name": - attr_name = "event.name" - elif key == "Unique-ID": - attr_name = "event.uuid" - elif key == "Content-Type": - attr_name = "event.content_type" - else: - slug = key.lower().replace("-", "_") - attr_name = f"event.header.{slug}" - if isinstance(value, (str, int, float, bool, list, tuple)): - attributes[attr_name] = value + attributes[_header_attr_name(key)] = value + + # Routing / correlation attributes (explicit, low-cardinality keys) so the + # ``process_event`` span carries routing info and the cross-system join key. + for src, dst in _EXPLICIT_ATTRS.items(): + if src in event: + attributes[dst] = _scalar(event[src]) + + # sip.call_id is the standard SIP Call-ID and the cross-system join key. + # The join happens at the observability backend. + sip_call_id = event.get("variable_sip_call_id") + if sip_call_id: + attributes["sip.call_id"] = _scalar(sip_call_id) return attributes diff --git a/genesis/queue/backends.py b/genesis/queue/backends.py index e172bb6..060ba51 100644 --- a/genesis/queue/backends.py +++ b/genesis/queue/backends.py @@ -60,6 +60,10 @@ async def release(self, queue_id: str) -> None: """Release one slot for the queue.""" ... + def depth(self, queue_id: str) -> int: + """Return the number of items waiting in the queue (not yet acquired).""" + ... + class InMemoryBackend: """ @@ -175,3 +179,9 @@ async def release(self, queue_id: str) -> None: state.semaphore.release() async with state.lock: state.condition.notify_all() + + def depth(self, queue_id: str) -> int: + """Return the number of items waiting in the queue (not yet acquired).""" + if queue_id in self._states: + return len(self._states[queue_id].deque) + return 0 diff --git a/genesis/queue/core.py b/genesis/queue/core.py index 5fe2c59..0bc4d38 100644 --- a/genesis/queue/core.py +++ b/genesis/queue/core.py @@ -78,14 +78,22 @@ async def __aenter__(self) -> "QueueSlot": attributes={ ATTR_QUEUE_ID: self._queue_id, ATTR_QUEUE_ITEM_ID: self._item_id, + # queue.depth as a SPAN attribute (not a metric label) keeps + # backpressure observable without metric cardinality blow-up. + "queue.depth": self._queue._backend.depth(self._queue_id), }, - ): - await self._queue._backend.wait_and_acquire( - self._queue_id, - self._item_id, - self._max_concurrent, - timeout=self._timeout, - ) + ) as span: + try: + await self._queue._backend.wait_and_acquire( + self._queue_id, + self._item_id, + self._max_concurrent, + timeout=self._timeout, + ) + except Exception as e: + span.record_exception(e) + span.set_status(trace.Status(trace.StatusCode.ERROR, str(e))) + raise self._acquired = True elapsed = time.monotonic() - start queue_wait_duration.record(elapsed, attributes={ATTR_QUEUE_ID: self._queue_id}) diff --git a/genesis/session.py b/genesis/session.py index 22f7aa6..4f33d1b 100644 --- a/genesis/session.py +++ b/genesis/session.py @@ -7,18 +7,28 @@ from __future__ import annotations +import time from asyncio import Event, Queue, StreamReader, StreamWriter, wait_for from functools import partial from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple from uuid import uuid4 +from opentelemetry import trace + from genesis.observability import logger from genesis.protocol import Protocol from genesis.protocol.parser import ESLEvent +from genesis.protocol.metrics import ( + session_command_duration, + session_commands_counter, + safe_record, +) if TYPE_CHECKING: from genesis.channel import Channel +tracer = trace.get_tracer(__name__) + def _build_sendmsg_cmd( command: str, @@ -195,23 +205,56 @@ async def sendmsg( ) logger.debug("Send command to freeswitch: '%s'.", cmd) - if block and command == "execute" and resolved_event_uuid: - logger.debug( - "Waiting for command completion with Application-UUID: %s", - resolved_event_uuid, - ) - command_is_complete = self._awaitable_complete_command( - resolved_event_uuid, timeout + start_time = time.perf_counter() + with tracer.start_as_current_span( + "session.sendmsg", + attributes={ + "channel.uuid": self.uuid or "unknown", + "application.name": application, + "application.uuid": resolved_event_uuid or "unknown", + "application.block": str(block), + }, + ): + safe_add_cmd_attrs = {"application.name": application} + + if block and command == "execute" and resolved_event_uuid: + logger.debug( + "Waiting for command completion with Application-UUID: %s", + resolved_event_uuid, + ) + command_is_complete = self._awaitable_complete_command( + resolved_event_uuid, timeout + ) + response = await self.send(cmd) + logger.debug( + "Received response of execute command with block: %s", + response, + ) + with tracer.start_as_current_span( + "session.await_complete", + attributes={ + "channel.uuid": self.uuid or "unknown", + "application.uuid": resolved_event_uuid, + }, + ): + if timeout is not None: + await wait_for(command_is_complete.wait(), timeout=timeout) + else: + await command_is_complete.wait() + result = await self.fifo.get() + safe_record( + session_command_duration, + time.perf_counter() - start_time, + attributes=safe_add_cmd_attrs, + ) + session_commands_counter.add(1, attributes=safe_add_cmd_attrs) + return result + + result = await self.send(cmd) + safe_record( + session_command_duration, + time.perf_counter() - start_time, + attributes=safe_add_cmd_attrs, ) - response = await self.send(cmd) - logger.debug( - "Received response of execute command with block: %s", - response, - ) - if timeout is not None: - await wait_for(command_is_complete.wait(), timeout=timeout) - else: - await command_is_complete.wait() - return await self.fifo.get() - - return await self.send(cmd) + session_commands_counter.add(1, attributes=safe_add_cmd_attrs) + return result diff --git a/tests/payloads.py b/tests/payloads.py index 2dab620..699f0da 100644 --- a/tests/payloads.py +++ b/tests/payloads.py @@ -580,3 +580,201 @@ Event-Name: CHANNEL_ANSWER Unique-ID: {unique_id} """) + + +# --------------------------------------------------------------------------- +# Lifecycle / CUSTOM payloads for the telemetry processor tests. +# Kept minimal but carry the correlation key (variable_sip_call_id) and the +# fields the processors turn into span attributes / metric labels. +# --------------------------------------------------------------------------- +UUID_A = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa" +UUID_B = "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb" +SIP_CALL_ID = "test-sip-call-id-123" + +_channel_common = dedent("""\ + Unique-ID: {uuid_a} + Channel-Call-UUID: {uuid_a} + Call-Direction: inbound + variable_sip_call_id: {sip_call_id} + """) + + +channel_progress = _channel_common.format( + uuid_a=UUID_A, sip_call_id=SIP_CALL_ID +) + dedent( + """\ + Event-Name: CHANNEL_PROGRESS + Channel-State: CS_ROUTING + Answer-State: ringing + """ +) + + +channel_bridge = _channel_common.format( + uuid_a=UUID_A, sip_call_id=SIP_CALL_ID +) + dedent( + """\ + Event-Name: CHANNEL_BRIDGE + Bridge-A-Unique-ID: {uuid_a} + Bridge-B-Unique-ID: {uuid_b} + Other-Leg-Unique-ID: {uuid_b} + Other-Type: bride + Other-Leg-Destination-Number: 1002 + Other-Leg-Caller-ID-Number: 1002 + """.format(uuid_a=UUID_A, uuid_b=UUID_B) +) + + +channel_unbridge = _channel_common.format( + uuid_a=UUID_A, sip_call_id=SIP_CALL_ID +) + dedent( + """\ + Event-Name: CHANNEL_UNBRIDGE + Bridge-A-Unique-ID: {uuid_a} + Other-Leg-Unique-ID: {uuid_b} + Hangup-Cause: NORMAL_CLEARING + """.format(uuid_a=UUID_A, uuid_b=UUID_B) +) + + +channel_hangup_complete = _channel_common.format( + uuid_a=UUID_A, sip_call_id=SIP_CALL_ID +) + dedent( + """\ + Event-Name: CHANNEL_HANGUP_COMPLETE + Hangup-Cause: NORMAL_CLEARING + variable_hangup_cause_q850: 16 + Channel-Name: sofia/internal/100@192.168.50.4 + """ +) + + +channel_destroy = _channel_common.format( + uuid_a=UUID_A, sip_call_id=SIP_CALL_ID +) + dedent( + """\ + Event-Name: CHANNEL_DESTROY + Channel-State: CS_DESTROY + """ +) + + +channel_execute = _channel_common.format( + uuid_a=UUID_A, sip_call_id=SIP_CALL_ID +) + dedent( + """\ + Event-Name: CHANNEL_EXECUTE + Application: playback + Application-UUID: app-uuid-1 + Application-Data: /tmp/hello.wav + """ +) + + +channel_execute_complete = _channel_common.format( + uuid_a=UUID_A, sip_call_id=SIP_CALL_ID +) + dedent( + """\ + Event-Name: CHANNEL_EXECUTE_COMPLETE + Application: playback + Application-UUID: app-uuid-1 + Application-Response: FILE PLAYED + """ +) + + +codec = _channel_common.format(uuid_a=UUID_A, sip_call_id=SIP_CALL_ID) + dedent("""\ + Event-Name: CODEC + Channel-Read-Codec-Name: opus + Channel-Read-Codec-Rate: 48000 + Channel-Write-Codec-Name: opus + Channel-Write-Codec-Rate: 48000 + """) + + +call_update = _channel_common.format(uuid_a=UUID_A, sip_call_id=SIP_CALL_ID) + dedent( + """\ + Event-Name: CALL_UPDATE + Bridged-To: {uuid_b} + Caller-Transfer-Source: transfer_src + Caller-Orig-Caller-ID-Number: 100 + """.format(uuid_b=UUID_B) +) + + +sofia_transferor = _channel_common.format( + uuid_a=UUID_A, sip_call_id=SIP_CALL_ID +) + dedent( + """\ + Event-Name: CUSTOM + Event-Subclass: sofia::transferor + variable_sofia_profile_name: internal + """ +) + + +sofia_transferee = _channel_common.format( + uuid_a=UUID_A, sip_call_id=SIP_CALL_ID +) + dedent( + """\ + Event-Name: CUSTOM + Event-Subclass: sofia::transferee + variable_sofia_profile_name: internal + """ +) + + +callcenter_info = _channel_common.format( + uuid_a=UUID_A, sip_call_id=SIP_CALL_ID +) + dedent( + """\ + Event-Name: CUSTOM + Event-Subclass: callcenter::info + CC-Queue: sales + CC-Action: agent-state-change + CC-Agent: agent-1001 + CC-Member-UUID: member-uuid-1 + CC-Count: 1 + CC-Selection: round-robin + """ +) + + +conference_maintenance = _channel_common.format( + uuid_a=UUID_A, sip_call_id=SIP_CALL_ID +) + dedent( + """\ + Event-Name: CUSTOM + Event-Subclass: conference::maintenance + Conference-Name: 3000 + Conference-Profile: default + Action: add-member + Member-ID: 1 + Old-Member-ID: 0 + """ +) + + +valet_info = _channel_common.format(uuid_a=UUID_A, sip_call_id=SIP_CALL_ID) + dedent( + """\ + Event-Name: CUSTOM + Event-Subclass: valet_parking::info + Valet-Lot-Name: default + Valet-Extension: 4100 + Action: bridge + Bridge-To-UUID: {uuid_b} + """.format(uuid_b=UUID_B) +) + + +# A channel event WITHOUT variable_sip_call_id — used to assert the +# events_without_sip_call_id_counter correlation-gap metric fires. +channel_create_no_sip = dedent("""\ + Event-Name: CHANNEL_CREATE + Channel-State: CS_INIT + Unique-ID: {uuid_a} + Channel-Call-UUID: {uuid_a} + Call-Direction: outbound + Caller-Destination-Number: 1002 + Caller-Context: default + """).format(uuid_a=UUID_A) diff --git a/tests/test_channel_lifecycle.py b/tests/test_channel_lifecycle.py new file mode 100644 index 0000000..ea7d93f --- /dev/null +++ b/tests/test_channel_lifecycle.py @@ -0,0 +1,187 @@ +"""Tests for the ESL lifecycle / CUSTOM subclass telemetry processors. + +These processors emit ``freeswitch.channel.*`` and ``freeswitch.sofia.*`` / +``freeswitch.callcenter.*`` / ``freeswitch.conference.*`` / ``freeswitch.valet.*`` +spans. The key contract under test is **cross-system correlation**: every channel +span must carry ``sip.call_id`` (= ``variable_sip_call_id``, the standard SIP +Call-ID) so another system's view of the same call can be joined to it at the +observability backend. +""" + +from __future__ import annotations + +import pytest +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) + +from genesis.protocol.lifecycle import ( + channel_lifecycle_processor, + custom_subclass_processor, +) +from genesis.protocol.parser import parse_headers +from tests import payloads + + +@pytest.fixture +def memory_exporter(): + exporter = InMemorySpanExporter() + processor = SimpleSpanProcessor(exporter) + provider = trace.get_tracer_provider() + if not hasattr(provider, "add_span_processor"): + provider = TracerProvider() + trace.set_tracer_provider(provider) + provider.add_span_processor(processor) + yield exporter + + +def _event(payload: str): + return parse_headers(payload) + + +def _span(exporter: InMemorySpanExporter, name: str): + spans = [s for s in exporter.get_finished_spans() if s.name == name] + assert ( + spans + ), f"span '{name}' not emitted; got {[s.name for s in exporter.get_finished_spans()]}" + return spans[-1] + + +async def test_channel_create_emits_span_and_sip_call_id(memory_exporter): + event = _event(payloads.channel_create) + channel_lifecycle_processor(None, event) # type: ignore[arg-type] + + span = _span(memory_exporter, "freeswitch.channel.create") + # Correlation contract: sip.call_id must be present on the span. + assert "sip.call_id" in span.attributes + assert span.attributes["sip.call_id"] + assert span.attributes["channel.uuid"] == "d0b1da34-a727-11e4-9728-6f83a2e5e50a" + assert span.attributes["channel.destination_number"] == "101" + + +async def test_channel_bridge_carries_cross_leg_uuids(memory_exporter): + event = _event(payloads.channel_bridge) + channel_lifecycle_processor(None, event) # type: ignore[arg-type] + + span = _span(memory_exporter, "freeswitch.channel.bridge") + assert span.attributes["bridge.a_uuid"] == payloads.UUID_A + assert span.attributes["bridge.b_uuid"] == payloads.UUID_B + assert span.attributes["sip.call_id"] == payloads.SIP_CALL_ID + events = [e for e in span.events if e.name == "bridge.established"] + assert events, "bridge.established event not emitted" + + +async def test_channel_unbridge_emits_torn_down_event(memory_exporter): + event = _event(payloads.channel_unbridge) + channel_lifecycle_processor(None, event) # type: ignore[arg-type] + + span = _span(memory_exporter, "freeswitch.channel.unbridge") + assert span.attributes["sip.call_id"] == payloads.SIP_CALL_ID + assert [e for e in span.events if e.name == "bridge.torn_down"] + + +async def test_hangup_complete_records_q850(memory_exporter): + event = _event(payloads.channel_hangup_complete) + channel_lifecycle_processor(None, event) # type: ignore[arg-type] + + span = _span(memory_exporter, "freeswitch.channel.hangup_complete") + assert span.attributes["hangup.cause.q850"] == "16" + assert span.attributes["sip.call_id"] == payloads.SIP_CALL_ID + assert [e for e in span.events if e.name == "call.finalized"] + + +async def test_channel_destroy_emits_span(memory_exporter): + event = _event(payloads.channel_destroy) + channel_lifecycle_processor(None, event) # type: ignore[arg-type] + span = _span(memory_exporter, "freeswitch.channel.destroy") + assert span.attributes["sip.call_id"] == payloads.SIP_CALL_ID + + +async def test_execute_and_complete_spans(memory_exporter): + event = _event(payloads.channel_execute) + channel_lifecycle_processor(None, event) # type: ignore[arg-type] + span = _span(memory_exporter, "freeswitch.channel.execute") + assert span.attributes["application.name"] == "playback" + assert span.attributes["application.uuid"] == "app-uuid-1" + + event = _event(payloads.channel_execute_complete) + channel_lifecycle_processor(None, event) # type: ignore[arg-type] + span = _span(memory_exporter, "freeswitch.channel.execute_complete") + assert span.attributes["application.name"] == "playback" + + +async def test_codec_span(memory_exporter): + event = _event(payloads.codec) + channel_lifecycle_processor(None, event) # type: ignore[arg-type] + span = _span(memory_exporter, "freeswitch.channel.codec") + assert span.attributes["channel.read_codec.name"] == "opus" + assert span.attributes["sip.call_id"] == payloads.SIP_CALL_ID + + +async def test_call_update_span(memory_exporter): + event = _event(payloads.call_update) + channel_lifecycle_processor(None, event) # type: ignore[arg-type] + span = _span(memory_exporter, "freeswitch.call.update") + assert span.attributes["sip.call_id"] == payloads.SIP_CALL_ID + + +async def test_sofia_transfer_blind_and_attended(memory_exporter): + event = _event(payloads.sofia_transferor) + custom_subclass_processor(None, event) # type: ignore[arg-type] + span = _span(memory_exporter, "freeswitch.sofia.transfer") + assert span.attributes["transfer.role"] == "transferor" + assert span.attributes["transfer.type"] == "blind" + + event = _event(payloads.sofia_transferee) + custom_subclass_processor(None, event) # type: ignore[arg-type] + span = _span(memory_exporter, "freeswitch.sofia.transfer") + assert span.attributes["transfer.role"] == "transferee" + assert span.attributes["transfer.type"] == "attended" + + +async def test_callcenter_info_span(memory_exporter): + event = _event(payloads.callcenter_info) + custom_subclass_processor(None, event) # type: ignore[arg-type] + span = _span(memory_exporter, "freeswitch.callcenter.info") + assert span.attributes["cc.queue"] == "sales" + assert span.attributes["cc.action"] == "agent-state-change" + + +async def test_conference_maintenance_span(memory_exporter): + event = _event(payloads.conference_maintenance) + custom_subclass_processor(None, event) # type: ignore[arg-type] + span = _span(memory_exporter, "freeswitch.conference.maintenance") + assert span.attributes["conference.name"] == "3000" + assert span.attributes["conference.action"] == "add-member" + + +async def test_valet_info_span(memory_exporter): + event = _event(payloads.valet_info) + custom_subclass_processor(None, event) # type: ignore[arg-type] + span = _span(memory_exporter, "freeswitch.valet.info") + assert span.attributes["valet.lot"] == "default" + assert span.attributes["bridge.to_uuid"] == payloads.UUID_B + + +async def test_no_sip_call_id_event_still_emits_span(memory_exporter): + """A channel event without the correlation key still traces; the gap is + counted by the events_without_sip_call_id metric (no crash, no missing span).""" + event = _event(payloads.channel_create_no_sip) + channel_lifecycle_processor(None, event) # type: ignore[arg-type] + span = _span(memory_exporter, "freeswitch.channel.create") + assert "sip.call_id" not in span.attributes + + +async def test_non_lifecycle_event_is_noop(memory_exporter): + """A HEARTBEAT must not produce a lifecycle span.""" + event = _event(payloads.heartbeat) + channel_lifecycle_processor(None, event) # type: ignore[arg-type] + spans = exporter_names(memory_exporter) + assert not any(name.startswith("freeswitch.channel.") for name in spans) + + +def exporter_names(exporter: InMemorySpanExporter): + return [s.name for s in exporter.get_finished_spans()] diff --git a/tests/test_consumer_tracing.py b/tests/test_consumer_tracing.py new file mode 100644 index 0000000..d4a7fb3 --- /dev/null +++ b/tests/test_consumer_tracing.py @@ -0,0 +1,67 @@ +"""Tests for Consumer start/stop OpenTelemetry instrumentation.""" + +from __future__ import annotations + +import asyncio + +import pytest +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) + +from genesis import Consumer + + +@pytest.fixture +def memory_exporter(): + exporter = InMemorySpanExporter() + processor = SimpleSpanProcessor(exporter) + provider = trace.get_tracer_provider() + if not hasattr(provider, "add_span_processor"): + provider = TracerProvider() + trace.set_tracer_provider(provider) + provider.add_span_processor(processor) + yield exporter + + +async def _wait_for_span( + exporter: InMemorySpanExporter, name: str, timeout: float = 5.0 +): + """Event-based poll (no sleep) for a finished span by name.""" + start = asyncio.get_event_loop().time() + while True: + if any(s.name == name for s in exporter.get_finished_spans()): + return + if asyncio.get_event_loop().time() - start >= timeout: + raise TimeoutError(f"span '{name}' not seen within {timeout}s") + future = asyncio.Future() + asyncio.get_event_loop().call_soon(future.set_result, None) + await future + + +async def test_consumer_start_and_stop_spans(freeswitch, memory_exporter): + """Consumer.start emits consumer.start (setup phase) and stop emits consumer.stop.""" + consumer = Consumer(*freeswitch.address) + + start_task = asyncio.create_task(consumer.start()) + # Wait for the setup-phase span to finalize (before the blocking wait loop). + await _wait_for_span(memory_exporter, "consumer.start", timeout=5.0) + + await consumer.stop() + try: + await asyncio.wait_for(start_task, timeout=5.0) + except (asyncio.TimeoutError, Exception): + start_task.cancel() + + spans = {s.name for s in memory_exporter.get_finished_spans()} + assert "consumer.start" in spans + assert "consumer.stop" in spans + + start_span = next( + s for s in memory_exporter.get_finished_spans() if s.name == "consumer.start" + ) + assert start_span.attributes["consumer.host"] == freeswitch.address[0] + assert "consumer.port" in start_span.attributes diff --git a/tests/test_inbound.py b/tests/test_inbound.py index 69ef23a..6ad3760 100644 --- a/tests/test_inbound.py +++ b/tests/test_inbound.py @@ -132,7 +132,7 @@ async def test_inbound_client_send_command_error(freeswitch): async def test_inbound_metrics_error_on_start(freeswitch): async with freeswitch: with patch( - "genesis.inbound.active_connections_counter.add", + "genesis.inbound.connections_active_counter.add", side_effect=Exception("Metrics error"), ): async with Inbound(*freeswitch.address) as client: @@ -143,7 +143,7 @@ async def test_inbound_metrics_error_on_stop(freeswitch): async with freeswitch: async with Inbound(*freeswitch.address): with patch( - "genesis.inbound.active_connections_counter.add", + "genesis.inbound.connections_active_counter.add", side_effect=Exception("Metrics error"), ): pass diff --git a/tests/test_session_tracing.py b/tests/test_session_tracing.py new file mode 100644 index 0000000..f34a88c --- /dev/null +++ b/tests/test_session_tracing.py @@ -0,0 +1,124 @@ +"""Tests for Session.sendmsg OpenTelemetry instrumentation. + +Verifies the ``session.sendmsg`` span (and the ``session.await_complete`` +child span when blocking) plus the ``genesis.session.commands`` metric +attributes. Uses the Outbound + Dialplan doubles (no real FreeSWITCH). +""" + +from __future__ import annotations + +import asyncio + +import pytest +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) + +from genesis import Outbound, Session + + +@pytest.fixture +def memory_exporter(): + exporter = InMemorySpanExporter() + processor = SimpleSpanProcessor(exporter) + provider = trace.get_tracer_provider() + if not hasattr(provider, "add_span_processor"): + provider = TracerProvider() + trace.set_tracer_provider(provider) + provider.add_span_processor(processor) + yield exporter + + +def _spans(exporter: InMemorySpanExporter): + return exporter.get_finished_spans() + + +async def test_sendmsg_non_blocking_emits_span(host, port, dialplan, memory_exporter): + """A non-blocking sendmsg emits session.sendmsg with application metadata.""" + done = asyncio.Event() + handler_started = asyncio.Event() + + async def handler(session: Session) -> None: + handler_started.set() + # Non-blocking execute: returns the +OK reply, no completion wait. + await session.sendmsg("execute", "answer", block=False) + done.set() + + address = (host(), port()) + app = Outbound(handler, *address) + await app.start(block=False) + await dialplan.start(*address) + + await asyncio.wait_for(dialplan.client_connected.wait(), timeout=5.0) + await asyncio.wait_for(handler_started.wait(), timeout=5.0) + await asyncio.wait_for(done.wait(), timeout=5.0) + + await app.stop() + await dialplan.stop() + + sendmsg_spans = [s for s in _spans(memory_exporter) if s.name == "session.sendmsg"] + assert sendmsg_spans, "session.sendmsg span not emitted" + span = sendmsg_spans[-1] + assert span.attributes["application.name"] == "answer" + assert span.attributes["application.block"] == "False" + + +async def test_sendmsg_blocking_emits_await_complete( + host, port, dialplan, memory_exporter +): + """A blocking sendmsg emits session.sendmsg + a session.await_complete child.""" + sendmsg_returned = asyncio.Event() + handler_started = asyncio.Event() + captured = {} + + async def handler(session: Session) -> None: + handler_started.set() + captured["channel_uuid"] = session.uuid + # blocking execute; the Dialplan double stores the pending Event-UUID. + await session.sendmsg("execute", "playback", "/tmp/x.wav", block=True) + sendmsg_returned.set() + + address = (host(), port()) + app = Outbound(handler, *address) + await app.start(block=False) + await dialplan.start(*address) + + await asyncio.wait_for(dialplan.client_connected.wait(), timeout=5.0) + await asyncio.wait_for(handler_started.wait(), timeout=5.0) + + # Wait (event-based, no sleep) until the Dialplan double has recorded the + # pending execute Event-UUID, then broadcast the completion event so the + # blocked sendmsg returns. The complete event must carry the session's + # channel UUID so O(1) channel routing delivers it to the registered handler. + async def _pending_uuid(): + while not dialplan.pending_execute_events: + future = asyncio.Future() + asyncio.get_event_loop().call_soon(future.set_result, None) + await future + return next(iter(dialplan.pending_execute_events)) + + app_uuid = await asyncio.wait_for(_pending_uuid(), timeout=5.0) + + await dialplan.broadcast( + { + "Event-Name": "CHANNEL_EXECUTE_COMPLETE", + "Application-UUID": app_uuid, + "Unique-ID": captured["channel_uuid"], + } + ) + + await asyncio.wait_for(sendmsg_returned.wait(), timeout=5.0) + await app.stop() + await dialplan.stop() + + names = [s.name for s in _spans(memory_exporter)] + assert "session.sendmsg" in names + assert "session.await_complete" in names + sendmsg_span = next( + s for s in _spans(memory_exporter) if s.name == "session.sendmsg" + ) + assert sendmsg_span.attributes["application.name"] == "playback" + assert sendmsg_span.attributes["application.block"] == "True"