-
Notifications
You must be signed in to change notification settings - Fork 3.2k
fix(avatar): preserve audio wrappers across avatar hot-swaps #5863
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
c357c0e
6a23e75
9884666
99f75ce
4d101d5
ca17d29
3e21868
b3c3dd3
5296871
edf8965
f676d3e
b62cd5f
2d952a1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -142,7 +142,6 @@ def __init__( | |||||
| sample_rate: The sample rate required by the audio sink, if None, any sample rate is accepted | ||||||
| """ # noqa: E501 | ||||||
| super().__init__() | ||||||
| self.__next_in_chain = next_in_chain | ||||||
| self._sample_rate = sample_rate | ||||||
| self.__label = label | ||||||
| self.__capturing = False | ||||||
|
|
@@ -155,26 +154,37 @@ def __init__( | |||||
| playback_position=0, interrupted=False | ||||||
| ) | ||||||
|
|
||||||
| if self.next_in_chain: | ||||||
| self.next_in_chain.on( | ||||||
| "playback_finished", | ||||||
| lambda ev: self.on_playback_finished( | ||||||
| interrupted=ev.interrupted, | ||||||
| playback_position=ev.playback_position, | ||||||
| synchronized_transcript=ev.synchronized_transcript, | ||||||
| ), | ||||||
| ) | ||||||
| self.next_in_chain.on( | ||||||
| "playback_started", lambda ev: self.on_playback_started(created_at=ev.created_at) | ||||||
| ) | ||||||
| # auto-wrap a bare leaf with a _AudioSinkProxy so the leaf can be | ||||||
| # hot-swapped later without disturbing wrappers above | ||||||
| if ( | ||||||
| next_in_chain is not None | ||||||
| and next_in_chain.next_in_chain is None | ||||||
| and not isinstance(next_in_chain, _AudioSinkProxy) | ||||||
| ): | ||||||
| next_in_chain = _AudioSinkProxy(next_in_chain) | ||||||
|
|
||||||
| self._next_in_chain: AudioOutput | None = next_in_chain | ||||||
| if next_in_chain is not None: | ||||||
| next_in_chain.on("playback_finished", self._forward_next_playback_finished) | ||||||
| next_in_chain.on("playback_started", self._forward_next_playback_started) | ||||||
|
|
||||||
| def _forward_next_playback_finished(self, ev: PlaybackFinishedEvent) -> None: | ||||||
| self.on_playback_finished( | ||||||
| interrupted=ev.interrupted, | ||||||
| playback_position=ev.playback_position, | ||||||
| synchronized_transcript=ev.synchronized_transcript, | ||||||
| ) | ||||||
|
|
||||||
| def _forward_next_playback_started(self, ev: PlaybackStartedEvent) -> None: | ||||||
| self.on_playback_started(created_at=ev.created_at) | ||||||
|
|
||||||
| @property | ||||||
| def label(self) -> str: | ||||||
| return self.__label | ||||||
|
|
||||||
| @property | ||||||
| def next_in_chain(self) -> AudioOutput | None: | ||||||
| return self.__next_in_chain | ||||||
| return self._next_in_chain | ||||||
|
|
||||||
| def on_playback_started(self, *, created_at: float) -> None: | ||||||
| self.emit("playback_started", PlaybackStartedEvent(created_at=created_at)) | ||||||
|
|
@@ -228,6 +238,11 @@ def _reset_playback_count(self) -> None: | |||||
| self.__playback_segments_count = 0 | ||||||
| self.__playback_finished_count = 0 | ||||||
|
|
||||||
| @property | ||||||
| def _pending_playback_count(self) -> int: | ||||||
| """Number of captured segments that haven't reported playback_finished yet.""" | ||||||
| return self.__playback_segments_count - self.__playback_finished_count | ||||||
|
|
||||||
| @property | ||||||
| def sample_rate(self) -> int | None: | ||||||
| """The sample rate required by the audio sink, if None, any sample rate is accepted""" | ||||||
|
|
@@ -275,6 +290,97 @@ def __repr__(self) -> str: | |||||
| return f"{self.__class__.__name__}(label={self.label!r}, next={self.next_in_chain!r})" | ||||||
|
|
||||||
|
|
||||||
| class _AudioSinkProxy(AudioOutput): | ||||||
| """Stable swap point at the bottom of an audio wrapper chain. | ||||||
|
|
||||||
| Wrappers above hold a reference to the proxy; the actual sink lives in | ||||||
| ``next_in_chain`` and can be replaced via :meth:`set_next_in_chain` without | ||||||
| disturbing them. | ||||||
| """ | ||||||
|
|
||||||
| def __init__(self, next_in_chain: AudioOutput) -> None: | ||||||
| super().__init__( | ||||||
| label="AudioSinkProxy", | ||||||
| capabilities=AudioOutputCapabilities(pause=True), | ||||||
| next_in_chain=None, | ||||||
| ) | ||||||
| # whether the wrapper above us has attached the proxy; set_next_in_chain | ||||||
| # uses this to decide if a new/old downstream should be notified | ||||||
| self._attached = False | ||||||
| self.set_next_in_chain(next_in_chain) | ||||||
|
|
||||||
| self._capturing = False | ||||||
| self._pushed_duration: float = 0.0 | ||||||
|
|
||||||
| @property | ||||||
| def next_in_chain(self) -> AudioOutput: | ||||||
| assert self._next_in_chain is not None | ||||||
| return self._next_in_chain | ||||||
|
|
||||||
| def on_attached(self) -> None: | ||||||
| self._attached = True | ||||||
| super().on_attached() | ||||||
|
|
||||||
| def on_detached(self) -> None: | ||||||
| self._attached = False | ||||||
| super().on_detached() | ||||||
|
|
||||||
| def set_next_in_chain(self, new: AudioOutput) -> None: | ||||||
| """Replace the downstream sink, transferring playback listeners | ||||||
| and on_attached/on_detached state. | ||||||
| """ | ||||||
| if new is self._next_in_chain: | ||||||
| return | ||||||
|
|
||||||
| old = self._next_in_chain | ||||||
| if old is not None: | ||||||
| old.off("playback_finished", self._forward_next_playback_finished) | ||||||
| old.off("playback_started", self._forward_next_playback_started) | ||||||
| if self._pending_playback_count > 0: | ||||||
| # stop audio still playing on the old sink | ||||||
| old.clear_buffer() | ||||||
|
|
||||||
| if self._attached: | ||||||
| old.on_detached() | ||||||
|
|
||||||
| self._next_in_chain = new | ||||||
|
|
||||||
| new.on("playback_finished", self._forward_next_playback_finished) | ||||||
| new.on("playback_started", self._forward_next_playback_started) | ||||||
| if self._attached: | ||||||
| new.on_attached() | ||||||
|
|
||||||
| # a segment already flushed to the old sink will never be reported by the | ||||||
| # new one; finish it as interrupted so wait_for_playout() doesn't hang | ||||||
| if old is not None and self._pending_playback_count > 0 and not self._capturing: | ||||||
| self.on_playback_finished(playback_position=self._pushed_duration, interrupted=True) | ||||||
|
|
||||||
| @property | ||||||
| def sample_rate(self) -> int | None: | ||||||
| return self.next_in_chain.sample_rate | ||||||
|
|
||||||
| @property | ||||||
| def can_pause(self) -> bool: | ||||||
| return self.next_in_chain.can_pause | ||||||
|
|
||||||
| async def capture_frame(self, frame: rtc.AudioFrame) -> None: | ||||||
| if not self._capturing: | ||||||
| self._capturing = True | ||||||
| self._pushed_duration = 0.0 | ||||||
|
|
||||||
| await super().capture_frame(frame) | ||||||
| await self.next_in_chain.capture_frame(frame) | ||||||
| self._pushed_duration += frame.duration | ||||||
|
|
||||||
| def flush(self) -> None: | ||||||
| super().flush() | ||||||
| self.next_in_chain.flush() | ||||||
| self._capturing = False | ||||||
|
|
||||||
| def clear_buffer(self) -> None: | ||||||
| self.next_in_chain.clear_buffer() | ||||||
|
longcw marked this conversation as resolved.
|
||||||
|
|
||||||
|
|
||||||
| class TextOutput(ABC): | ||||||
| def __init__(self, *, label: str, next_in_chain: TextOutput | None) -> None: | ||||||
| self.__label = label | ||||||
|
|
@@ -568,6 +674,24 @@ def audio(self, sink: AudioOutput | None) -> None: | |||||
| else: | ||||||
| self._audio_sink.on_detached() | ||||||
|
|
||||||
| def swap_audio_endpoint(self, sink: AudioOutput) -> None: | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
not a fan of the name
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should it be something like
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "lead" reads as the head of the chain, but the method replaces the tail, how about
|
||||||
| """Swap the endpoint sink at the bottom of the chain, keeping wrappers attached. | ||||||
|
|
||||||
| Walks the chain looking for a :class:`_AudioSinkProxy` and swaps its | ||||||
| downstream — leaving wrappers like :class:`TranscriptSynchronizer` and | ||||||
| :class:`RecorderAudioOutput` in place. Falls back to ``self.audio = sink`` | ||||||
| when no proxy is present (no wrappers, or the chain hasn't been set up yet). | ||||||
|
|
||||||
| Use ``self.audio = sink`` instead to replace the entire chain. | ||||||
| """ | ||||||
| cur = self._audio_sink | ||||||
| while cur is not None: | ||||||
| if isinstance(cur, _AudioSinkProxy): | ||||||
| cur.set_next_in_chain(sink) | ||||||
| return | ||||||
| cur = cur.next_in_chain | ||||||
| self.audio = sink | ||||||
|
|
||||||
| @property | ||||||
| def transcription(self) -> TextOutput | None: | ||||||
| return self._transcription_sink | ||||||
|
|
||||||
Uh oh!
There was an error while loading. Please reload this page.