From cfdb481ffd313b6962e3951d1c457639c0bfb721 Mon Sep 17 00:00:00 2001 From: danielmast Date: Sun, 22 Mar 2026 11:03:32 +0100 Subject: [PATCH 1/3] Init plugin_streaming + test --- janus_client/__init__.py | 1 + janus_client/plugin_streaming.py | 613 +++++++++++++++++++++++++++++++ 2 files changed, 614 insertions(+) create mode 100644 janus_client/plugin_streaming.py diff --git a/janus_client/__init__.py b/janus_client/__init__.py index 6698bd5..7fffa78 100644 --- a/janus_client/__init__.py +++ b/janus_client/__init__.py @@ -7,6 +7,7 @@ from .plugin_textroom import JanusTextRoomPlugin, TextRoomError, TextRoomEventType from .plugin_video_call import JanusVideoCallPlugin, VideoCallError, VideoCallEventType from .plugin_video_room import JanusVideoRoomPlugin, VideoRoomError, VideoRoomEventType, ParticipantType +from .plugin_streaming import JanusStreamingPlugin, StreamingError, StreamingEventType from .transport import JanusTransport from .transport_http import JanusTransportHTTP diff --git a/janus_client/plugin_streaming.py b/janus_client/plugin_streaming.py new file mode 100644 index 0000000..78252dd --- /dev/null +++ b/janus_client/plugin_streaming.py @@ -0,0 +1,613 @@ +"""Janus Streaming plugin implementation. + +This module provides a client for the Janus Streaming plugin, which enables +watching live or pre-recorded media streams served by Janus as WebRTC media. + +The streaming plugin acts as a broadcast receiver: Janus holds mountpoints +(RTP/RTSP/file sources) and sends WebRTC offers to watchers. This means the +signalling flow is the reverse of most plugins — Janus creates the SDP offer +and the client creates the answer. + +Typical usage:: + + plugin = JanusStreamingPlugin() + await plugin.attach(session) + + # List available mountpoints + streams = await plugin.list_mountpoints() + + # Watch mountpoint 1 and record to file + recorder = MediaRecorder("output.mp4") + await plugin.watch(mount_id=1, recorder=recorder) + + await plugin.wait_webrtcup() + # … receive media … + + await plugin.stop() + await plugin.destroy() +""" + +import asyncio +import logging +from enum import Enum +from typing import Any, Callable, Dict, List, Optional + +from aiortc import MediaStreamTrack +from aiortc.contrib.media import MediaRecorder + +from .message_transaction import is_subset +from .plugin_base import JanusPlugin + +logger = logging.getLogger(__name__) + + +class StreamingError(Exception): + """Exception raised for Streaming plugin errors.""" + + def __init__(self, error_code: int, error_message: str) -> None: + self.error_code = error_code + self.error_message = error_message + super().__init__(f"Streaming error {error_code}: {error_message}") + + +class StreamingEventType(Enum): + """Types of events that can be received from the Streaming plugin.""" + + WATCHING = "watching" + STARTED = "started" + PAUSED = "paused" + STOPPED = "stopped" + SWITCHED = "switched" + ERROR = "error" + + +class JanusStreamingPlugin(JanusPlugin): + """Janus Streaming plugin client. + + Watches live or recorded media streams hosted as Janus mountpoints. + Janus sends the WebRTC offer; the client creates and sends an answer. + + Example:: + + plugin = JanusStreamingPlugin() + await plugin.attach(session) + + streams = await plugin.list_mountpoints() + print(streams) + + recorder = MediaRecorder("output.mp4") + await plugin.watch(mount_id=1, recorder=recorder) + await plugin.wait_webrtcup() + + await asyncio.sleep(10) # receive for 10 seconds + + await plugin.stop() + await plugin.destroy() + """ + + name = "janus.plugin.streaming" + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._mount_id: Optional[int] = None + self._recorder: Optional[MediaRecorder] = None + self._webrtcup_event = asyncio.Event() + self._event_handlers: Dict[StreamingEventType, List[Callable]] = { + event_type: [] for event_type in StreamingEventType + } + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _check_error_in_response(self, response: Dict[str, Any]) -> None: + """Raise StreamingError if the response contains an error.""" + if is_subset(response, {"janus": "error", "error": {}}): + error: Dict = response["error"] + raise StreamingError( + error.get("code", 0), error.get("reason", "Unknown error") + ) + + for janus_type in ("success", "event"): + if is_subset( + response, + { + "janus": janus_type, + "plugindata": {"plugin": self.name, "data": {"error": None}}, + }, + ): + plugin_data = response["plugindata"]["data"] + raise StreamingError( + plugin_data.get("error_code", 0), + plugin_data.get("error", "Unknown error"), + ) + + async def _send_request( + self, + body: Dict[str, Any], + jsep: Optional[Dict[str, Any]] = None, + timeout: float = 15.0, + ) -> Dict[str, Any]: + """Send a plugin request and wait for the first matching response.""" + + def response_matcher(response: Dict[str, Any]) -> bool: + return ( + is_subset( + response, {"janus": "success", "plugindata": {"plugin": self.name}} + ) + or is_subset( + response, {"janus": "event", "plugindata": {"plugin": self.name}} + ) + or is_subset(response, {"janus": "error", "error": {}}) + ) + + message: Dict[str, Any] = {"janus": "message", "body": body} + if jsep: + message["jsep"] = jsep + + message_transaction = await self.send(message) + response = await message_transaction.get( + matcher=response_matcher, timeout=timeout + ) + await message_transaction.done() + + self._check_error_in_response(response) + return response + + def _setup_recorder( + self, recorder: Optional[MediaRecorder], expected_tracks: int = 1 + ) -> None: + """Register an incoming-track handler that feeds the recorder. + + Waits until all expected tracks are received before starting the + recorder, because MediaRecorder cannot accept new tracks after start(). + """ + self._recorder = recorder + if recorder is None: + return + + received: list = [] + + @self.pc.on("track") + async def on_track(track: MediaStreamTrack) -> None: + logger.info("Streaming: received %s track", track.kind) + recorder.addTrack(track) + received.append(track) + if len(received) >= expected_tracks: + await recorder.start() + + async def _trigger_event( + self, event_type: StreamingEventType, data: Dict[str, Any] + ) -> None: + for handler in self._event_handlers[event_type]: + try: + if asyncio.iscoroutinefunction(handler): + await handler(data) + else: + handler(data) + except Exception as exc: + logger.error("Error in streaming event handler: %s", exc) + + # ------------------------------------------------------------------ + # Base-class abstract method + # ------------------------------------------------------------------ + + async def on_receive(self, response: Dict[str, Any]) -> None: + """Handle asynchronous events pushed by Janus.""" + if "jsep" in response: + await self.on_receive_jsep(jsep=response["jsep"]) + + janus_code = response.get("janus") + + if janus_code == "webrtcup": + logger.info("Streaming: WebRTC connection established") + self._webrtcup_event.set() + + elif janus_code == "hangup": + logger.info("Streaming: WebRTC connection closed by server") + if self.pc.signalingState != "closed": + await self.pc.close() + if self._recorder: + try: + await self._recorder.stop() + except Exception: + pass + + elif janus_code == "event": + await self._handle_event(response) + + async def _handle_event(self, response: Dict[str, Any]) -> None: + if "plugindata" not in response: + return + + plugin_data = response["plugindata"]["data"] + streaming_type = plugin_data.get("streaming") + + if streaming_type == "event": + if "error" in plugin_data: + await self._trigger_event(StreamingEventType.ERROR, plugin_data) + elif "status" in plugin_data: + status = plugin_data["status"] + mapping = { + "started": StreamingEventType.STARTED, + "paused": StreamingEventType.PAUSED, + "stopped": StreamingEventType.STOPPED, + } + if status in mapping: + await self._trigger_event(mapping[status], plugin_data) + + # ------------------------------------------------------------------ + # Event registration + # ------------------------------------------------------------------ + + def on_event( + self, + event_type: StreamingEventType, + handler: Callable[[Dict[str, Any]], None], + ) -> None: + """Register a handler for streaming events. + + Args: + event_type: The event type to listen for. + handler: Sync or async callable receiving the event data dict. + """ + self._event_handlers[event_type].append(handler) + + # ------------------------------------------------------------------ + # Viewer operations + # ------------------------------------------------------------------ + + async def list_mountpoints(self, timeout: float = 15.0) -> List[Dict[str, Any]]: + """List available streaming mountpoints. + + Args: + timeout: Maximum seconds to wait for a response. + + Returns: + List of mountpoint info dicts. + + Raises: + StreamingError: If the request fails. + """ + response = await self._send_request({"request": "list"}, timeout=timeout) + return response["plugindata"]["data"].get("list", []) + + async def info( + self, + mount_id: int, + pin: Optional[str] = None, + timeout: float = 15.0, + ) -> Dict[str, Any]: + """Get detailed info about a single mountpoint. + + Args: + mount_id: ID of the mountpoint. + pin: PIN if the mountpoint is protected. + timeout: Maximum seconds to wait for a response. + + Returns: + Mountpoint info dict. + + Raises: + StreamingError: If the request fails. + """ + body: Dict[str, Any] = {"request": "info", "id": mount_id} + if pin: + body["pin"] = pin + response = await self._send_request(body, timeout=timeout) + return response["plugindata"]["data"].get("info", {}) + + async def watch( + self, + mount_id: int, + pin: Optional[str] = None, + offer_audio: bool = True, + offer_video: bool = True, + offer_data: bool = True, + recorder: Optional[MediaRecorder] = None, + trickle: bool = False, + timeout: float = 30.0, + ) -> Dict[str, Any]: + """Start watching a mountpoint. + + Sends a ``watch`` request, receives a Janus SDP offer, creates an + answer, and sends ``start`` with the answer. After this call returns, + media will flow once the WebRTC connection is up. + + Args: + mount_id: ID of the mountpoint to watch. + pin: PIN if the mountpoint is protected. + offer_audio: Whether to negotiate audio. + offer_video: Whether to negotiate video. + offer_data: Whether to negotiate data channels. + recorder: Optional recorder for incoming media. + trickle: Whether to use trickle ICE. + timeout: Maximum seconds to wait for each request. + + Returns: + Plugin data from the ``watch`` response. + + Raises: + StreamingError: If the request fails. + """ + await self.reset_connection() + expected_tracks = sum([offer_audio, offer_video]) + self._setup_recorder(recorder, expected_tracks) + self._mount_id = mount_id + + body: Dict[str, Any] = { + "request": "watch", + "id": mount_id, + "offer_audio": offer_audio, + "offer_video": offer_video, + "offer_data": offer_data, + } + if pin: + body["pin"] = pin + + response = await self._send_request(body, timeout=timeout) + + # Janus sends an offer; create and send back an answer + if "jsep" in response: + await self.on_receive_jsep(response["jsep"]) + await self.pc.setLocalDescription(await self.pc.createAnswer()) + await self._start(trickle=trickle, timeout=timeout) + + return response["plugindata"]["data"] + + async def _start(self, trickle: bool = False, timeout: float = 15.0) -> None: + """Send the SDP answer to Janus to begin media delivery.""" + jsep = await self.create_jsep(self.pc, trickle=trickle) + await self._send_request({"request": "start"}, jsep=jsep, timeout=timeout) + + async def pause(self, timeout: float = 15.0) -> None: + """Pause media delivery from the server. + + Args: + timeout: Maximum seconds to wait for a response. + + Raises: + StreamingError: If the request fails. + """ + await self._send_request({"request": "pause"}, timeout=timeout) + + async def stop(self, timeout: float = 15.0) -> None: + """Stop watching the current mountpoint. + + Args: + timeout: Maximum seconds to wait for a response. + + Raises: + StreamingError: If the request fails. + """ + await self._send_request({"request": "stop"}, timeout=timeout) + self._mount_id = None + if self._recorder: + try: + await self._recorder.stop() + except Exception: + pass + self._recorder = None + + async def switch( + self, + mount_id: int, + pin: Optional[str] = None, + timeout: float = 15.0, + ) -> None: + """Switch to a different mountpoint without renegotiating WebRTC. + + Args: + mount_id: ID of the new mountpoint. + pin: PIN if the mountpoint is protected. + timeout: Maximum seconds to wait for a response. + + Raises: + StreamingError: If the request fails. + """ + body: Dict[str, Any] = {"request": "switch", "id": mount_id} + if pin: + body["pin"] = pin + await self._send_request(body, timeout=timeout) + self._mount_id = mount_id + + async def wait_webrtcup(self) -> None: + """Wait until the WebRTC connection is established.""" + await self._webrtcup_event.wait() + self._webrtcup_event.clear() + + # ------------------------------------------------------------------ + # Admin / mountpoint management operations + # ------------------------------------------------------------------ + + async def create_mountpoint( + self, + mount_type: str, + mount_id: Optional[int] = None, + name: Optional[str] = None, + description: Optional[str] = None, + is_private: bool = False, + pin: Optional[str] = None, + permanent: bool = False, + admin_key: Optional[str] = None, + timeout: float = 15.0, + **kwargs: Any, + ) -> int: + """Create a new streaming mountpoint. + + Args: + mount_type: Mountpoint type, e.g. ``"rtp"``, ``"live"``, + ``"ondemand"``, ``"rtsp"``. + mount_id: Desired mountpoint ID (auto-assigned if omitted). + name: Friendly name for the mountpoint. + description: Optional description. + is_private: Whether the mountpoint is hidden from ``list``. + pin: Optional PIN to protect the mountpoint. + permanent: Whether to persist the mountpoint in the config file. + admin_key: Admin key if required by the server. + timeout: Maximum seconds to wait for a response. + **kwargs: Additional mountpoint-specific parameters passed + directly in the request body (e.g. ``audio``, ``video``, + ``rtp``, ``rtsp_url``). + + Returns: + The created mountpoint ID. + + Raises: + StreamingError: If creation fails. + """ + body: Dict[str, Any] = { + "request": "create", + "type": mount_type, + "is_private": is_private, + "permanent": permanent, + } + if mount_id is not None: + body["id"] = mount_id + if name: + body["name"] = name + if description: + body["description"] = description + if pin: + body["pin"] = pin + if admin_key: + body["admin_key"] = admin_key + body.update(kwargs) + + response = await self._send_request(body, timeout=timeout) + return int(response["plugindata"]["data"]["stream"]["id"]) + + async def destroy_mountpoint( + self, + mount_id: int, + secret: Optional[str] = None, + permanent: bool = False, + timeout: float = 15.0, + ) -> None: + """Destroy a streaming mountpoint. + + Args: + mount_id: ID of the mountpoint to destroy. + secret: Mountpoint secret if required. + permanent: Whether to remove from the config file. + timeout: Maximum seconds to wait for a response. + + Raises: + StreamingError: If destruction fails. + """ + body: Dict[str, Any] = { + "request": "destroy", + "id": mount_id, + "permanent": permanent, + } + if secret: + body["secret"] = secret + await self._send_request(body, timeout=timeout) + + async def edit_mountpoint( + self, + mount_id: int, + secret: Optional[str] = None, + timeout: float = 15.0, + **kwargs: Any, + ) -> None: + """Edit an existing mountpoint. + + Args: + mount_id: ID of the mountpoint to edit. + secret: Mountpoint secret if required. + timeout: Maximum seconds to wait for a response. + **kwargs: Fields to update (e.g. ``new_name``, ``new_description``, + ``new_pin``, ``new_secret``, ``new_is_private``). + + Raises: + StreamingError: If editing fails. + """ + body: Dict[str, Any] = {"request": "edit", "id": mount_id} + if secret: + body["secret"] = secret + body.update(kwargs) + await self._send_request(body, timeout=timeout) + + async def enable_mountpoint( + self, + mount_id: int, + secret: Optional[str] = None, + timeout: float = 15.0, + ) -> None: + """Enable a previously disabled mountpoint. + + Args: + mount_id: ID of the mountpoint. + secret: Mountpoint secret if required. + timeout: Maximum seconds to wait for a response. + + Raises: + StreamingError: If the request fails. + """ + body: Dict[str, Any] = {"request": "enable", "id": mount_id} + if secret: + body["secret"] = secret + await self._send_request(body, timeout=timeout) + + async def disable_mountpoint( + self, + mount_id: int, + secret: Optional[str] = None, + stop_recording: bool = False, + timeout: float = 15.0, + ) -> None: + """Disable a mountpoint (kick all watchers, reject new ones). + + Args: + mount_id: ID of the mountpoint. + secret: Mountpoint secret if required. + stop_recording: Whether to stop any active recording. + timeout: Maximum seconds to wait for a response. + + Raises: + StreamingError: If the request fails. + """ + body: Dict[str, Any] = { + "request": "disable", + "id": mount_id, + "stop_recording": stop_recording, + } + if secret: + body["secret"] = secret + await self._send_request(body, timeout=timeout) + + # ------------------------------------------------------------------ + # Properties + # ------------------------------------------------------------------ + + @property + def mount_id(self) -> Optional[int]: + """ID of the currently watched mountpoint, or None.""" + return self._mount_id + + @property + def webrtcup_event(self) -> asyncio.Event: + """The underlying asyncio.Event that fires when WebRTC is up.""" + return self._webrtcup_event + + # ------------------------------------------------------------------ + # Cleanup + # ------------------------------------------------------------------ + + async def destroy(self) -> None: + """Stop media, clean up resources, and detach the plugin handle.""" + if self._recorder: + try: + await self._recorder.stop() + except Exception: + pass + self._recorder = None + + if self.pc.signalingState != "closed": + await self.pc.close() + + self._mount_id = None + self._webrtcup_event.clear() + + await super().destroy() From 951722019ee2ae76e25b94427fbb1a7d30b78d97 Mon Sep 17 00:00:00 2001 From: danielmast Date: Sun, 22 Mar 2026 11:03:36 +0100 Subject: [PATCH 2/3] Init plugin_streaming + test --- tests/test_plugin_streaming.py | 180 +++++++++++++++++++++++++++++++++ 1 file changed, 180 insertions(+) create mode 100644 tests/test_plugin_streaming.py diff --git a/tests/test_plugin_streaming.py b/tests/test_plugin_streaming.py new file mode 100644 index 0000000..d23378f --- /dev/null +++ b/tests/test_plugin_streaming.py @@ -0,0 +1,180 @@ +import asyncio +import os +import unittest +from urllib.parse import urljoin + +from janus_client import ( + JanusSession, + JanusTransport, + JanusStreamingPlugin, + StreamingError, + StreamingEventType, +) +from tests.util import async_test + + +class BaseTestClass: + class TestClass(unittest.TestCase): + server_url: str + + async def asyncSetUp(self) -> None: + self.transport = JanusTransport.create_transport( + base_url=self.server_url, api_secret=os.getenv("JANUS_API_SECRET", "") + ) + await self.transport.connect() + + async def asyncTearDown(self) -> None: + await self.transport.disconnect() + await asyncio.sleep(0.250) + + @async_test + async def test_attach_and_destroy(self): + """Plugin can be attached and destroyed cleanly.""" + await self.asyncSetUp() + + session = JanusSession(transport=self.transport) + plugin = JanusStreamingPlugin() + + try: + await plugin.attach(session=session) + self.assertIsNone(plugin.mount_id) + finally: + await plugin.destroy() + await session.destroy() + await self.asyncTearDown() + + @async_test + async def test_list_mountpoints(self): + """list_mountpoints returns a list.""" + await self.asyncSetUp() + + session = JanusSession(transport=self.transport) + plugin = JanusStreamingPlugin() + + try: + await plugin.attach(session=session) + mountpoints = await plugin.list_mountpoints() + self.assertIsInstance(mountpoints, list) + finally: + await plugin.destroy() + await session.destroy() + await self.asyncTearDown() + + @async_test + async def test_create_and_destroy_mountpoint(self): + """A mountpoint can be created and destroyed.""" + await self.asyncSetUp() + + session = JanusSession(transport=self.transport) + plugin = JanusStreamingPlugin() + + try: + await plugin.attach(session=session) + + mount_id = await plugin.create_mountpoint( + mount_type="rtp", + name="test_mount", + audio=True, + audioport=9700, + audiopt=111, + audiocodec="opus", + ) + self.assertIsInstance(mount_id, int) + + mountpoints = await plugin.list_mountpoints() + ids = [m["id"] for m in mountpoints] + self.assertIn(mount_id, ids) + + await plugin.destroy_mountpoint(mount_id) + + mountpoints_after = await plugin.list_mountpoints() + ids_after = [m["id"] for m in mountpoints_after] + self.assertNotIn(mount_id, ids_after) + finally: + await plugin.destroy() + await session.destroy() + await self.asyncTearDown() + + @async_test + async def test_info_nonexistent_mountpoint_raises(self): + """Requesting info for a non-existent mountpoint raises StreamingError.""" + await self.asyncSetUp() + + session = JanusSession(transport=self.transport) + plugin = JanusStreamingPlugin() + + try: + await plugin.attach(session=session) + + with self.assertRaises(StreamingError): + await plugin.info(mount_id=999999) + finally: + await plugin.destroy() + await session.destroy() + await self.asyncTearDown() + + @async_test + async def test_watch_nonexistent_mountpoint_raises(self): + """Watching a non-existent mountpoint raises StreamingError.""" + await self.asyncSetUp() + + session = JanusSession(transport=self.transport) + plugin = JanusStreamingPlugin() + + try: + await plugin.attach(session=session) + + with self.assertRaises(StreamingError): + await plugin.watch(mount_id=999999) + finally: + await plugin.destroy() + await session.destroy() + await self.asyncTearDown() + + @async_test + async def test_on_event_registration(self): + """Event handlers can be registered and are called.""" + await self.asyncSetUp() + + session = JanusSession(transport=self.transport) + plugin = JanusStreamingPlugin() + + try: + await plugin.attach(session=session) + + received = [] + plugin.on_event(StreamingEventType.STARTED, lambda data: received.append(data)) + + # Handler is registered — just verify no error was raised + self.assertEqual(len(plugin._event_handlers[StreamingEventType.STARTED]), 1) + finally: + await plugin.destroy() + await session.destroy() + await self.asyncTearDown() + + @async_test + async def test_webrtcup_event_property(self): + """webrtcup_event property returns an asyncio.Event.""" + await self.asyncSetUp() + + session = JanusSession(transport=self.transport) + plugin = JanusStreamingPlugin() + + try: + await plugin.attach(session=session) + self.assertIsInstance(plugin.webrtcup_event, asyncio.Event) + finally: + await plugin.destroy() + await session.destroy() + await self.asyncTearDown() + + +class TestTransportHttp(BaseTestClass.TestClass): + server_url = urljoin( + os.getenv("JANUS_HTTP_URL", ""), + os.getenv("JANUS_HTTP_BASE_PATH", ""), + ) + + +class TestTransportWebsocket(BaseTestClass.TestClass): + server_url = os.getenv("JANUS_WS_URL", "") From 826fafccab5ac0933d2621b6ffa51fe4e5e8cf59 Mon Sep 17 00:00:00 2001 From: danielmast Date: Sun, 22 Mar 2026 11:22:03 +0100 Subject: [PATCH 3/3] Add tests --- tests/test_plugin_streaming.py | 97 ++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/tests/test_plugin_streaming.py b/tests/test_plugin_streaming.py index d23378f..51c423f 100644 --- a/tests/test_plugin_streaming.py +++ b/tests/test_plugin_streaming.py @@ -95,6 +95,103 @@ async def test_create_and_destroy_mountpoint(self): await session.destroy() await self.asyncTearDown() + @async_test + async def test_edit_mountpoint(self): + """A mountpoint's name can be edited.""" + await self.asyncSetUp() + + session = JanusSession(transport=self.transport) + plugin = JanusStreamingPlugin() + + try: + await plugin.attach(session=session) + + mount_id = await plugin.create_mountpoint( + mount_type="rtp", + name="test_mount_edit", + audio=True, + audioport=9702, + audiopt=111, + audiocodec="opus", + ) + + await plugin.edit_mountpoint(mount_id, new_description="updated description") + + info = await plugin.info(mount_id) + self.assertEqual(info["description"], "updated description") + finally: + await plugin.destroy_mountpoint(mount_id) + await plugin.destroy() + await session.destroy() + await self.asyncTearDown() + + @async_test + async def test_enable_mountpoint(self): + """A disabled mountpoint can be re-enabled.""" + await self.asyncSetUp() + + session = JanusSession(transport=self.transport) + plugin = JanusStreamingPlugin() + + try: + await plugin.attach(session=session) + + secret = "test_secret" + mount_id = await plugin.create_mountpoint( + mount_type="rtp", + name="test_mount_enable", + audio=True, + audioport=9704, + audiopt=111, + audiocodec="opus", + secret=secret, + ) + + await plugin.disable_mountpoint(mount_id, secret=secret) + await plugin.enable_mountpoint(mount_id, secret=secret) + + # After re-enabling the mountpoint should appear in the list again + mountpoints = await plugin.list_mountpoints() + ids = [m["id"] for m in mountpoints] + self.assertIn(mount_id, ids) + finally: + await plugin.destroy_mountpoint(mount_id, secret=secret) + await plugin.destroy() + await session.destroy() + await self.asyncTearDown() + + @async_test + async def test_destroy_mountpoint_without_secret_raises(self): + """Destroying a secret-protected mountpoint without secret raises StreamingError.""" + await self.asyncSetUp() + + session = JanusSession(transport=self.transport) + plugin = JanusStreamingPlugin() + secret = "test_secret" + mount_id = None + + try: + await plugin.attach(session=session) + + mount_id = await plugin.create_mountpoint( + mount_type="rtp", + name="test_mount_secret", + audio=True, + audioport=9706, + audiopt=111, + audiocodec="opus", + secret=secret, + ) + + with self.assertRaises(StreamingError): + await plugin.destroy_mountpoint(mount_id) + finally: + if mount_id is not None: + await plugin.destroy_mountpoint(mount_id, secret=secret) + await plugin.destroy() + await session.destroy() + await self.asyncTearDown() + @async_test async def test_info_nonexistent_mountpoint_raises(self): """Requesting info for a non-existent mountpoint raises StreamingError."""