Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions openflow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
DEFAULT_MODEL = "tiny"
MLX_MODEL_REPO = "mlx-community/whisper-tiny-mlx"

AUDIO_QUEUE_MAXSIZE = 20

# Hotkey settings
# This represents the key used in hotkey.py.
# While currently relying on specific key codes, this config acts as a central reference.
Expand Down
24 changes: 20 additions & 4 deletions openflow/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import logging
import numpy as np

from openflow.config import AUDIO_QUEUE_MAXSIZE

logger = logging.getLogger(__name__)

# Minimum audio duration (in samples) to attempt transcription
Expand All @@ -19,10 +21,11 @@ def __init__(self, engine, accessibility=None, completion_callback=None):
self.engine = engine
self.accessibility = accessibility
self.completion_callback = completion_callback
self.audio_queue = queue.Queue()
self.audio_queue = queue.Queue(maxsize=AUDIO_QUEUE_MAXSIZE)
self.running = False
self.worker_thread = None
self.audio_buffer = np.array([], dtype=np.float32)
self.chunks = []

def start(self):
"""Starts the background worker thread."""
Expand All @@ -37,17 +40,23 @@ def stop(self):
"""Stops the worker thread."""
self.running = False
if self.worker_thread:
self.worker_thread.join()
self.worker_thread.join(timeout=5.0)
if self.worker_thread.is_alive():
logger.warning("StreamProcessor worker did not exit within timeout; daemon will clean up on process exit.")
logger.debug("StreamProcessor stopped.")

def reset(self):
"""Clears the current audio buffer."""
self.audio_buffer = np.array([], dtype=np.float32)
self.chunks.clear()

def process_chunk(self, chunk):
"""Adds a new audio chunk to the queue."""
if self.running:
self.audio_queue.put(chunk)
try:
self.audio_queue.put_nowait(chunk)
except queue.Full:
logger.warning("Audio queue full — dropping chunk")

def finish_recording(self):
"""Signals that recording has stopped; triggers final transcription."""
Expand All @@ -64,7 +73,7 @@ def _worker(self):
self._handle_final_transcription()
continue

self.audio_buffer = np.concatenate((self.audio_buffer, chunk))
self.chunks.append(chunk)

except queue.Empty:
continue
Expand All @@ -74,6 +83,13 @@ def _worker(self):
def _handle_final_transcription(self):
"""Transcribes the accumulated buffer and inserts text."""
try:
if not self.chunks:
if self.completion_callback:
self.completion_callback()
return
self.audio_buffer = np.concatenate(self.chunks)
self.chunks.clear()

if len(self.audio_buffer) < MIN_SAMPLES:
logger.debug("Audio too short, skipping transcription.")
self.audio_buffer = np.array([], dtype=np.float32)
Expand Down