From 36694233455940230a850be01ce96826628fc84b Mon Sep 17 00:00:00 2001 From: Vignesh K <47584183+vigneshvicky07@users.noreply.github.com> Date: Sat, 14 Feb 2026 17:37:14 +0530 Subject: [PATCH] Add AUDIO_QUEUE_MAXSIZE configuration and update StreamProcessor to handle audio queue limits and chunk processing --- openflow/config.py | 2 ++ openflow/streamer.py | 24 ++++++++++++++++++++---- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/openflow/config.py b/openflow/config.py index cd41026..2ca3fc4 100644 --- a/openflow/config.py +++ b/openflow/config.py @@ -16,6 +16,8 @@ DEFAULT_MODEL = "tiny" MLX_MODEL_REPO = "mlx-community/whisper-tiny-mlx" +AUDIO_QUEUE_MAXSIZE = 20 + # Hotkey settings # This represents the key used in hotkey.py. # While currently relying on specific key codes, this config acts as a central reference. diff --git a/openflow/streamer.py b/openflow/streamer.py index d17a1d7..7504259 100644 --- a/openflow/streamer.py +++ b/openflow/streamer.py @@ -3,6 +3,8 @@ import logging import numpy as np +from openflow.config import AUDIO_QUEUE_MAXSIZE + logger = logging.getLogger(__name__) # Minimum audio duration (in samples) to attempt transcription @@ -19,10 +21,11 @@ def __init__(self, engine, accessibility=None, completion_callback=None): self.engine = engine self.accessibility = accessibility self.completion_callback = completion_callback - self.audio_queue = queue.Queue() + self.audio_queue = queue.Queue(maxsize=AUDIO_QUEUE_MAXSIZE) self.running = False self.worker_thread = None self.audio_buffer = np.array([], dtype=np.float32) + self.chunks = [] def start(self): """Starts the background worker thread.""" @@ -37,17 +40,23 @@ def stop(self): """Stops the worker thread.""" self.running = False if self.worker_thread: - self.worker_thread.join() + self.worker_thread.join(timeout=5.0) + if self.worker_thread.is_alive(): + logger.warning("StreamProcessor worker did not exit within timeout; daemon will clean up on process exit.") logger.debug("StreamProcessor stopped.") def reset(self): """Clears the current audio buffer.""" self.audio_buffer = np.array([], dtype=np.float32) + self.chunks.clear() def process_chunk(self, chunk): """Adds a new audio chunk to the queue.""" if self.running: - self.audio_queue.put(chunk) + try: + self.audio_queue.put_nowait(chunk) + except queue.Full: + logger.warning("Audio queue full — dropping chunk") def finish_recording(self): """Signals that recording has stopped; triggers final transcription.""" @@ -64,7 +73,7 @@ def _worker(self): self._handle_final_transcription() continue - self.audio_buffer = np.concatenate((self.audio_buffer, chunk)) + self.chunks.append(chunk) except queue.Empty: continue @@ -74,6 +83,13 @@ def _worker(self): def _handle_final_transcription(self): """Transcribes the accumulated buffer and inserts text.""" try: + if not self.chunks: + if self.completion_callback: + self.completion_callback() + return + self.audio_buffer = np.concatenate(self.chunks) + self.chunks.clear() + if len(self.audio_buffer) < MIN_SAMPLES: logger.debug("Audio too short, skipping transcription.") self.audio_buffer = np.array([], dtype=np.float32)