diff --git a/src-tauri/src/app_state.rs b/src-tauri/src/app_state.rs index 09eec6e..4f18af6 100644 --- a/src-tauri/src/app_state.rs +++ b/src-tauri/src/app_state.rs @@ -27,6 +27,23 @@ pub struct AppInner { pub pending_audio_stop: Mutex>>, pub pending_audio_warmup: Mutex>>, pub latest_transcript: Mutex<(String, String)>, // (final_text, partial_text) + /// Audio sample chunks captured before the ASR session is ready (background + /// connect in progress, or during a reconnect gap). Drained into the session + /// once it attaches. Always accessed while holding `asr_session` to stay + /// ordered against the drain. + pub pending_audio: Mutex>>, + /// Resolves when the background ASR connect finishes (Ok) or fails (Err). + /// `stop_recording` awaits this when the user stops before the session is ready. + pub connect_rx: Mutex>>>, + /// Recording-session generation. Bumped on each start and on cancel so a + /// stale background connect task (from a cancelled/superseded session) can + /// detect it is obsolete and discard its result. + pub session_epoch: std::sync::atomic::AtomicU64, + /// Finalized text carried across ASR reconnects within a single recording. + /// Each reconnect starts a fresh server-side session with no memory of prior + /// audio, so already-recognized text is accumulated here and prepended to the + /// new session's output. Reset at the start of every recording. + pub accumulated_text: Mutex, } pub type AppHandle = Arc; @@ -49,5 +66,9 @@ pub fn create_app_state( pending_audio_stop: Mutex::new(None), pending_audio_warmup: Mutex::new(None), latest_transcript: Mutex::new((String::new(), String::new())), + pending_audio: Mutex::new(Vec::new()), + connect_rx: Mutex::new(None), + session_epoch: std::sync::atomic::AtomicU64::new(0), + accumulated_text: Mutex::new(String::new()), }) } diff --git a/src-tauri/src/asr/doubao.rs b/src-tauri/src/asr/doubao.rs index e12ff50..5d64ff9 100644 --- a/src-tauri/src/asr/doubao.rs +++ b/src-tauri/src/asr/doubao.rs @@ -411,12 +411,30 @@ fn normalize_error_message(error: &str) -> String { error.to_string() } +/// Classify a Doubao ASR error code as fatal (unrecoverable by reconnect) or +/// transient. Parameter-invalid errors won't recover by reconnecting; server-side +/// timeouts / busy errors and network drops are transient. +fn is_fatal_asr_code(code: u64) -> bool { + matches!(code, 45000001 | 45000002) +} + // --------------------------------------------------------------------------- // WebSocket sink type alias // --------------------------------------------------------------------------- type WsSink = futures_util::stream::SplitSink>, Message>; +/// A frame to write to the WebSocket, serialized through a single writer task. +enum WsWrite { + /// A non-last audio frame. + Audio(Vec), + /// The last-packet (commit) frame. The writer drops any audio enqueued after + /// it, so the server never sees a packet past the final one. + Last(Vec), + /// Close the connection. + Close, +} + // --------------------------------------------------------------------------- // DoubaoEngine — AsrEngine implementation // --------------------------------------------------------------------------- @@ -491,10 +509,16 @@ impl AsrEngine for DoubaoEngine { ); headers.insert("X-Api-Connect-Id", connect_id.parse().unwrap()); - // Connect - let (ws_stream, _) = connect_async(request) - .await - .map_err(|e| format!("ASR WebSocket connection failed: {}", e))?; + // Connect with a bounded timeout. Without it a stalled handshake relies on + // the OS-level TCP timeout (tens of seconds); the caller retries instead. + let (ws_stream, _) = + match tokio::time::timeout(std::time::Duration::from_secs(5), connect_async(request)) + .await + { + Ok(Ok(pair)) => pair, + Ok(Err(e)) => return Err(format!("ASR WebSocket connection failed: {}", e)), + Err(_) => return Err("ASR WebSocket 连接超时".to_string()), + }; let (sink, mut stream) = ws_stream.split(); @@ -518,10 +542,41 @@ impl AsrEngine for DoubaoEngine { let final_text: Arc> = Arc::new(Mutex::new(String::new())); let partial_text: Arc> = Arc::new(Mutex::new(String::new())); let latest_result_text: Arc> = Arc::new(Mutex::new(String::new())); - let sink = Arc::new(Mutex::new(Some(sink))); let commit_tx: Arc>>> = Arc::new(Mutex::new(None)); + // Dedicated writer task: a single FIFO consumer of the sink. Keeps frames + // ordered and drops any audio enqueued after the last packet, so the server + // never sees a packet past the final one (which it rejects). + let (writer_tx, mut writer_rx) = mpsc::unbounded_channel::(); + tokio::spawn(async move { + let mut sink: WsSink = sink; + let mut last_sent = false; + while let Some(msg) = writer_rx.recv().await { + match msg { + WsWrite::Audio(bytes) => { + if last_sent { + continue; + } + if sink.send(Message::Binary(bytes.into())).await.is_err() { + break; + } + } + WsWrite::Last(bytes) => { + if last_sent { + continue; + } + last_sent = true; + let _ = sink.send(Message::Binary(bytes.into())).await; + } + WsWrite::Close => { + let _ = sink.send(Message::Close(None)).await; + break; + } + } + } + }); + let (event_tx, event_rx) = mpsc::unbounded_channel(); let session = DoubaoSession { @@ -529,13 +584,12 @@ impl AsrEngine for DoubaoEngine { is_committed: is_committed.clone(), final_text: final_text.clone(), latest_result_text: latest_result_text.clone(), - sender: sink.clone(), + writer_tx, commit_tx: commit_tx.clone(), }; // Spawn message handler task let event_tx_clone = event_tx.clone(); - let sink_for_handler = sink.clone(); tokio::spawn(async move { while let Some(msg) = stream.next().await { match msg { @@ -604,8 +658,24 @@ impl AsrEngine for DoubaoEngine { .and_then(|v| v.as_str()) .map(|m| format!("ASR error {}: {}", code, m)) .unwrap_or_else(|| format!("ASR error code {}", code)); - let _ = - event_tx_clone.send(AsrEvent::Error(message.to_string())); + let _ = event_tx_clone.send(AsrEvent::Error { + message: message.to_string(), + fatal: is_fatal_asr_code(code), + }); + // If a commit is waiting, resolve it now with the + // best text we have instead of blocking until the + // 5s timeout (the socket is about to be reset). + if is_committed.load(Ordering::SeqCst) { + if let Some(tx) = commit_tx.lock().await.take() { + let latest = latest_result_text.lock().await.clone(); + let ft = final_text.lock().await.clone(); + let _ = tx.send(if latest.is_empty() { + ft + } else { + latest + }); + } + } continue; } } @@ -742,14 +812,19 @@ impl AsrEngine for DoubaoEngine { .or_else(|| payload.get("error").and_then(|e| e.get("message"))) .and_then(|v| v.as_str()) .unwrap_or("ASR 服务异常"); - let _ = event_tx_clone.send(AsrEvent::Error(message.to_string())); + // Unknown text-protocol error: attempt reconnect before giving up. + let _ = event_tx_clone.send(AsrEvent::Error { + message: message.to_string(), + fatal: false, + }); } } } Ok(Message::Close(frame)) => { is_ready.store(false, Ordering::SeqCst); - // Prevent lingering audio sends after connection closes - *sink_for_handler.lock().await = None; + // Lingering audio sends are already prevented: is_ready=false + // gates append_audio, and the FIFO writer task drops frames + // after the last packet / exits when its send fails. let code: Option = frame.as_ref().map(|f| f.code.into()); let reason = frame .as_ref() @@ -771,9 +846,21 @@ impl AsrEngine for DoubaoEngine { } Err(e) => { is_ready.store(false, Ordering::SeqCst); - *sink_for_handler.lock().await = None; let msg = normalize_error_message(&e.to_string()); - let _ = event_tx_clone.send(AsrEvent::Error(msg)); + // Connection reset without a clean Close: resolve any pending + // commit so the caller doesn't wait the full 5s timeout. + if is_committed.load(Ordering::SeqCst) { + if let Some(tx) = commit_tx.lock().await.take() { + let latest = latest_result_text.lock().await.clone(); + let ft = final_text.lock().await.clone(); + let _ = tx.send(if latest.is_empty() { ft } else { latest }); + } + } + // Transport-level failure (network drop): recoverable by reconnect. + let _ = event_tx_clone.send(AsrEvent::Error { + message: msg, + fatal: false, + }); break; } _ => {} @@ -796,7 +883,9 @@ struct DoubaoSession { is_committed: Arc, final_text: Arc>, latest_result_text: Arc>, - sender: Arc>>, + /// Sends frames to the dedicated writer task. A single FIFO consumer keeps + /// frames ordered and guarantees the last packet is written after all audio. + writer_tx: mpsc::UnboundedSender, commit_tx: Arc>>>, } @@ -817,14 +906,9 @@ impl AsrSession for DoubaoSession { .flat_map(|s| s.to_le_bytes()) .collect(); let frame = encode_audio_only_request(&audio, false); - let sender = self.sender.clone(); - tokio::spawn(async move { - if let Some(ref mut sink) = *sender.lock().await { - if let Err(e) = sink.send(Message::Binary(frame.into())).await { - log_asr!(debug, "Audio send skipped (connection closing): {}", e); - } - } - }); + // Hand the frame to the writer task; FIFO order is preserved and the + // writer drops anything enqueued after the last packet. + let _ = self.writer_tx.send(WsWrite::Audio(frame)); } async fn commit_and_await_final(&self) -> Result { @@ -834,15 +918,13 @@ impl AsrSession for DoubaoSession { if self.is_committed.load(Ordering::SeqCst) { return Err("录音已结束".to_string()); } + // Mark committed (stops further appends) and enqueue the last packet. + // Because all prior audio was enqueued before this call (the renderer + // flushes and acks before stop proceeds) and the writer is FIFO, the + // last packet is guaranteed to be written after every audio frame. self.is_committed.store(true, Ordering::SeqCst); - - // Send last-audio frame let frame = encode_audio_only_request(&[], true); - { - if let Some(ref mut sink) = *self.sender.lock().await { - let _ = sink.send(Message::Binary(frame.into())).await; - } - } + let _ = self.writer_tx.send(WsWrite::Last(frame)); // Wait for final result with timeout let (tx, rx) = tokio::sync::oneshot::channel(); @@ -861,13 +943,7 @@ impl AsrSession for DoubaoSession { fn close(&self) { self.is_ready.store(false, Ordering::SeqCst); - let sender = self.sender.clone(); - tokio::spawn(async move { - let mut guard = sender.lock().await; - if let Some(ref mut sink) = guard.take() { - let _ = sink.send(Message::Close(None)).await; - } - }); + let _ = self.writer_tx.send(WsWrite::Close); } } diff --git a/src-tauri/src/asr/mod.rs b/src-tauri/src/asr/mod.rs index 080a921..1b1efc5 100644 --- a/src-tauri/src/asr/mod.rs +++ b/src-tauri/src/asr/mod.rs @@ -12,7 +12,13 @@ pub enum AsrEvent { final_text: String, partial_text: String, }, - Error(String), + Error { + message: String, + /// Whether the error is unrecoverable (reconnecting cannot help). Fatal + /// errors finalize the recording with whatever text was already + /// recognized; non-fatal errors trigger an auto-reconnect attempt. + fatal: bool, + }, Close { code: Option, reason: String, diff --git a/src-tauri/src/commands.rs b/src-tauri/src/commands.rs index 9d11c49..902eb17 100644 --- a/src-tauri/src/commands.rs +++ b/src-tauri/src/commands.rs @@ -9,6 +9,10 @@ use tauri::{utils::Theme, AppHandle, Emitter, Manager, State}; // Re-export paste::PasteResult for use in commands use paste::PasteResult; +/// Cap on audio chunks buffered before the ASR session is ready (~100ms per +/// chunk, so ~30s). Bounds memory if the connect keeps failing. +const MAX_PENDING_CHUNKS: usize = 300; + /// Detect the actual OS-level light/dark theme preference. fn detect_system_theme() -> &'static str { #[cfg(target_os = "macos")] @@ -300,39 +304,55 @@ pub async fn send_audio_chunk( ); } + // Decode base64 → i16 PCM bytes → f32 samples + let bytes = match base64::engine::general_purpose::STANDARD.decode(&base64_chunk) { + Ok(data) => data, + Err(_) => { + log_audio!(warn, "Chunk #{} base64 decode failed", n); + return Ok(serde_json::json!({ "ok": false, "message": "音频数据解码失败" })); + } + }; + let samples: Vec = bytes + .chunks_exact(2) + .map(|chunk| { + let sample = i16::from_le_bytes([chunk[0], chunk[1]]); + sample as f32 / 32768.0 + }) + .collect(); + + // Drive the native waveform (macOS only) from the same PCM the ASR receives, + // whether the chunk is sent immediately or buffered. + #[cfg(target_os = "macos")] + if let Some(level) = compute_audio_level(&samples) { + crate::overlay::set_audio_level(&_app, level); + } + + // Hold the `asr_session` lock across the decision so buffering stays ordered + // against the background connect task's drain (same lock), guaranteeing no + // buffered chunk is silently dropped between drain and session-attach. let session = state.asr_session.lock().await; if let Some(ref session) = *session { if session.is_ready() { - // Decode base64 → i16 PCM bytes → f32 samples - let bytes = match base64::engine::general_purpose::STANDARD.decode(&base64_chunk) { - Ok(data) => data, - Err(_) => { - log_audio!(warn, "Chunk #{} base64 decode failed", n); - return Ok(serde_json::json!({ "ok": false, "message": "音频数据解码失败" })); - } - }; - let samples: Vec = bytes - .chunks_exact(2) - .map(|chunk| { - let sample = i16::from_le_bytes([chunk[0], chunk[1]]); - sample as f32 / 32768.0 - }) - .collect(); - // Drive the native waveform (macOS only) from the same PCM the ASR receives. - #[cfg(target_os = "macos")] - if let Some(level) = compute_audio_level(&samples) { - crate::overlay::set_audio_level(&_app, level); - } session.append_audio(&samples); return Ok(serde_json::json!({ "ok": true })); } - log_audio!(warn, "Chunk #{} dropped: session not ready", n); - } else { - if n == 0 { - log_audio!(warn, "Chunk #{} dropped: no session", n); - } } - Ok(serde_json::json!({ "ok": false, "message": "ASR 会话未建立" })) + + // Session not ready yet (background connect in progress, or reconnect gap): + // buffer the samples so nothing the user says before the session attaches is + // lost. Drained into the session once it attaches. + let mut pending = state.pending_audio.lock().await; + if pending.len() < MAX_PENDING_CHUNKS { + pending.push(samples); + } else if n.is_multiple_of(50) { + log_audio!( + warn, + "Pending audio buffer full ({} chunks), dropping chunk #{}", + MAX_PENDING_CHUNKS, + n + ); + } + Ok(serde_json::json!({ "ok": true, "buffered": true })) } /// Notify that audio has stopped in the renderer. diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index 7664782..b29f2e5 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -25,6 +25,11 @@ use tauri::{ image::Image, tray::TrayIconBuilder, App, AppHandle, Emitter, Listener, Manager, RunEvent, }; +/// Delay after the mic stream is ready, before entering Recording / playing the +/// start cue. Gives the browser AEC/AGC time to converge so the first words are +/// not attenuated. Trade-off: added latency between key press and "go". +const AUDIO_SETTLE_MS: u64 = 350; + #[cfg_attr(mobile, tauri::mobile_entry_point)] pub fn run() { tauri::Builder::default() @@ -288,10 +293,6 @@ async fn set_app_state( *app_inner.state.lock().await = next_state.clone(); sync_escape_shortcut(app, &next_state); - if next_state == app_state::AppState::Recording { - play_configured_sound(app, app_inner, "start"); - } - let _ = app.emit( "overlay:event", serde_json::json!({ @@ -317,18 +318,24 @@ fn resolve_default_sound_path(app: &AppHandle, filename: &str) -> PathBuf { .join(filename) } -fn play_configured_sound(app: &AppHandle, app_inner: &Arc, name: &str) { +/// Resolve the configured sound file path for `name` ("start" / "end"). +/// Returns `None` when sounds are disabled or the config cannot be loaded. +fn resolve_configured_sound_path( + app: &AppHandle, + app_inner: &Arc, + name: &str, +) -> Option { let config = match app_inner.config_manager.load_config() { Ok(config) => config, Err(error) => { log_app!(warn, "Sound config load failed: {}", error); - return; + return None; } }; let sound_config = config.app.sound.as_ref(); if sound_config.map(|sound| !sound.enabled).unwrap_or(false) { - return; + return None; } let custom_path = match name { @@ -353,7 +360,44 @@ fn play_configured_sound(app: &AppHandle, app_inner: &Arc, custom_path.to_string() }; - crate::paste::play_sound(&file_path); + Some(file_path) +} + +/// Play a cue (`name` = "start" / "end") through the renderer's AudioContext +/// instead of spawning `afplay`. A freshly spawned `afplay` process competes with +/// the audio output device that is still settling, which attenuated the cue (low +/// volume) or cut it short (partial playback). The renderer plays it through a +/// dedicated, kept-warm AudioContext, so the cue is full-volume and never +/// truncated. Falls back to `afplay` only if the file cannot be read. +fn emit_cue(app: &AppHandle, app_inner: &Arc, name: &str) { + use base64::Engine as _; + + let Some(file_path) = resolve_configured_sound_path(app, app_inner, name) else { + return; + }; + + match std::fs::read(&file_path) { + Ok(bytes) => { + let data = base64::engine::general_purpose::STANDARD.encode(&bytes); + let _ = app.emit( + "overlay:event", + serde_json::json!({ + "type": "cue:play", + "payload": { "kind": name, "data": data } + }), + ); + } + Err(error) => { + log_app!( + warn, + "Cue '{}' read failed ({}), falling back to afplay: {}", + name, + file_path, + error + ); + crate::paste::play_sound(&file_path); + } + } } async fn stop_renderer_audio( @@ -653,7 +697,27 @@ async fn start_recording(app_handle: AppHandle) { return; } - // 3. Create ASR session (engine chosen by model ID → registry engine) + // Settle delay before the cue: getUserMedia resolving only means the stream + // exists, not that its AEC/AGC have converged. The mic is live and DSP converges + // on real input during this wait (capture stays gated off until Recording), while + // the renderer's cue keep-alive (set up during warmup) holds the output device + // warm so the cue still plays smoothly afterwards. The cue is the user's "go" + // signal, so it must land AFTER this delay — never before, or the user would + // speak into the unconverged window and lose the first words. + tokio::time::sleep(std::time::Duration::from_millis(AUDIO_SETTLE_MS)).await; + + // Re-check cancellation: the user may have released during the settle delay. + if !*recording_state.0.lock().unwrap() { + log_rec!(warn, "Cancelled during settle, aborting start"); + stop_renderer_audio(&app_handle, &app_inner, 1200).await; + set_app_state(&app_handle, &app_inner, app_state::AppState::Idle).await; + if let Some(overlay) = app_handle.get_webview_window("overlay") { + let _ = overlay.hide(); + } + return; + } + + // 3. Active hotwords for this session (also reused on reconnect). let hotwords = app_inner.hotword_manager.active_words(); log_rec!( debug, @@ -662,6 +726,63 @@ async fn start_recording(app_handle: AppHandle) { hotwords ); + // 4. Play the start cue and enter Recording back-to-back: the cue tells the user + // they may speak, so streaming must begin the instant it plays — no gap. DSP + // has already converged during the settle delay above. The ASR session + // connects in the background so the user can speak as soon as the (local, + // fast) mic is ready instead of waiting on the (remote, variable) network + // handshake; audio captured before the session is ready is buffered and + // flushed once it connects. + let my_epoch = app_inner + .session_epoch + .fetch_add(1, std::sync::atomic::Ordering::SeqCst) + + 1; + app_inner.pending_audio.lock().await.clear(); + *app_inner.asr_session.lock().await = None; + *app_inner.accumulated_text.lock().await = String::new(); + + let (connect_tx, connect_rx) = tokio::sync::oneshot::channel::>(); + *app_inner.connect_rx.lock().await = Some(connect_rx); + + emit_cue(&app_handle, &app_inner, "start"); + set_app_state(&app_handle, &app_inner, app_state::AppState::Recording).await; + let _ = app_handle.emit( + "overlay:event", + serde_json::json!({ "type": "recording:start" }), + ); + + // 5. Connect the ASR session in the background; attach it once ready. + let connect_handle = app_handle.clone(); + tauri::async_runtime::spawn(async move { + connect_and_attach(connect_handle, config, hotwords, my_epoch, connect_tx).await; + }); +} + +/// True while `my_epoch` is still the current recording session. A background +/// connect task uses this to detect that a cancel/restart has superseded it. +fn is_current_epoch(app_inner: &app_state::AppInner, my_epoch: u64) -> bool { + app_inner + .session_epoch + .load(std::sync::atomic::Ordering::SeqCst) + == my_epoch +} + +/// Resolve the configured ASR engine and open a new session. Shared by the +/// initial background connect and the reconnect path. Returns the session, its +/// event receiver, and whether the overlay should show a static "recording" hint +/// (non-streaming engines produce no partial results). +async fn create_active_session( + app_handle: &AppHandle, + config: &crate::config::AppConfig, + hotwords: &[String], +) -> Result< + ( + Box, + tokio::sync::mpsc::UnboundedReceiver, + bool, + ), + String, +> { let resource_dir = app_handle .path() .resource_dir() @@ -676,7 +797,7 @@ async fn start_recording(app_handle: AppHandle) { let engine_model_id = config.audio_provider(); let entry = registry.models.iter().find(|m| m.id == engine_model_id); - let result = match entry { + let (result, show_recording_hint) = match entry { Some(entry) if entry.engine == "sherpa-onnx" => { let punctuation_config = registry .models @@ -695,7 +816,10 @@ async fn start_recording(app_handle: AppHandle) { stream_simulate: config.stream_simulate(engine_model_id, ®istry), }, ); - engine.create_session(&hotwords).await + // Non-streaming engines without simulated streaming produce no partials. + let show_hint = !entry.capabilities.streaming + && !config.stream_simulate(engine_model_id, ®istry); + (engine.create_session(hotwords).await, show_hint) } _ => { // Default / volcengine: Doubao online engine @@ -705,41 +829,75 @@ async fn start_recording(app_handle: AppHandle) { doubao_config.to_audio_config(), doubao_config.to_request_config(), ); - engine.create_session(&hotwords).await + (engine.create_session(hotwords).await, false) } }; + result.map(|(session, event_rx)| (session, event_rx, show_recording_hint)) +} + +/// Connect the ASR session in the background (one retry), then attach it: flush +/// any audio buffered during the connect and publish the ready session. Signals +/// completion through `connect_tx` so `stop_recording` can wait when the user +/// stops before the session is ready. +async fn connect_and_attach( + app_handle: AppHandle, + config: crate::config::AppConfig, + hotwords: Vec, + my_epoch: u64, + connect_tx: tokio::sync::oneshot::Sender>, +) { + let app_inner: Arc = + Arc::clone(&app_handle.state::>()); + + // Each Doubao attempt is bounded to 5s inside the engine; retry once. + let mut result = create_active_session(&app_handle, &config, &hotwords).await; + if result.is_err() && is_current_epoch(&app_inner, my_epoch) { + if let Err(ref e) = result { + log_rec!(warn, "ASR connect failed, retrying once: {}", e); + } + result = create_active_session(&app_handle, &config, &hotwords).await; + } + + // A newer session (cancel / restart) superseded us: discard quietly. + if !is_current_epoch(&app_inner, my_epoch) { + if let Ok((session, _, _)) = result { + session.close(); + } + let _ = connect_tx.send(Err("已取消".to_string())); + return; + } + match result { - Ok((session, event_rx)) => { + Ok((session, event_rx, show_recording_hint)) => { let session: Arc = Arc::from(session); - // Check if recording was cancelled during ASR connection - if !*recording_state.0.lock().unwrap() { - log_rec!(warn, "Cancelled during ASR connection, closing session"); - session.close(); - stop_renderer_audio(&app_handle, &app_inner, 1200).await; - set_app_state(&app_handle, &app_inner, app_state::AppState::Idle).await; - if let Some(overlay) = app_handle.get_webview_window("overlay") { - let _ = overlay.hide(); + // Flush buffered audio then publish the session, all under the + // asr_session lock so send_audio_chunk cannot interleave a chunk + // between the drain and the attach. + { + let mut slot = app_inner.asr_session.lock().await; + if !is_current_epoch(&app_inner, my_epoch) { + session.close(); + let _ = connect_tx.send(Err("已取消".to_string())); + return; } - return; + let buffered: Vec> = + app_inner.pending_audio.lock().await.drain(..).collect(); + for chunk in &buffered { + session.append_audio(chunk); + } + if !buffered.is_empty() { + log_rec!( + debug, + "Flushed {} buffered audio chunk(s) to ASR", + buffered.len() + ); + } + *slot = Some(session); } - *app_inner.asr_session.lock().await = Some(session); - set_app_state(&app_handle, &app_inner, app_state::AppState::Recording).await; - - let _ = app_handle.emit( - "overlay:event", - serde_json::json!({ - "type": "recording:start", - }), - ); - - // For non-streaming engines without simulated streaming, - // show a "recording" hint since partial results won't appear. - if entry.is_some_and(|e| !e.capabilities.streaming) - && !config.stream_simulate(engine_model_id, ®istry) - { + if show_recording_hint { let _ = app_handle.emit( "overlay:event", serde_json::json!({ @@ -751,25 +909,35 @@ async fn start_recording(app_handle: AppHandle) { let app_for_events = app_handle.clone(); tauri::async_runtime::spawn(async move { - forward_asr_events(app_for_events, event_rx).await; + manage_asr_session(app_for_events, event_rx, my_epoch).await; }); + + let _ = connect_tx.send(Ok(())); } Err(e) => { log_rec!(error, "ASR connection failed: {}", e); - *recording_state.0.lock().unwrap() = false; + let _ = connect_tx.send(Err(e.clone())); + + // If the user already stopped, stop_recording owns the error UI (it + // is awaiting connect_rx). Only handle the UI here when still + // recording (connect failed mid-session). + let still_recording = *app_handle.state::().0.lock().unwrap(); + if !still_recording { + return; + } + *app_handle.state::().0.lock().unwrap() = false; + app_inner.pending_audio.lock().await.clear(); stop_renderer_audio(&app_handle, &app_inner, 1200).await; - // Emit error hint BEFORE setting idle state so the overlay shows the - // error message. The state:idle handler in the frontend only clears - // hints whose level is "info", so an "error" level hint survives. + // Emit error hint BEFORE setting idle so the overlay shows it: the + // frontend's idle handler only clears "info"-level hints. let _ = app_handle.emit("overlay:event", serde_json::json!({ "type": "hint", "payload": { "text": format!("ASR 连接失败: {}", e), "level": "error", "variant": "text" } })); set_app_state(&app_handle, &app_inner, app_state::AppState::Idle).await; - // Auto-hide the overlay after a short delay so the user can read the - // error. Guard: only hide if still idle (not in a new session). + // Auto-hide after a delay so the user can read it; guard: still idle. let delayed_handle = app_handle.clone(); - let delayed_inner: Arc = Arc::clone(&*app_inner); + let delayed_inner: Arc = Arc::clone(&app_inner); tauri::async_runtime::spawn(async move { tokio::time::sleep(Duration::from_secs(3)).await; let still_idle = { @@ -800,13 +968,27 @@ async fn stop_recording(app_handle: AppHandle) { // 2. Stop renderer audio first so the final buffered chunk is flushed. stop_renderer_audio(&app_handle, &app_inner, 1200).await; - // 3. Take the ASR session (removes it from state) - let session = app_inner.asr_session.lock().await.take(); + // 3. Acquire the ready ASR session. If the background connect hasn't finished + // (user stopped before it was ready), wait for it to resolve so the buffered + // audio still gets transcribed instead of being thrown away. + let session = match app_inner.asr_session.lock().await.take() { + Some(s) => Some(s), + None => { + let rx = app_inner.connect_rx.lock().await.take(); + match rx { + Some(rx) => match tokio::time::timeout(Duration::from_secs(12), rx).await { + Ok(Ok(Ok(()))) => app_inner.asr_session.lock().await.take(), + _ => None, // connect failed / timed out / task gone + }, + None => None, + } + } + }; *app_inner.asr_events.lock().await = None; if let Some(session) = session { - // 4. Commit and get final text - let text = match session.commit_and_await_final().await { + // 4. Commit and get this session's final text. + let session_text = match session.commit_and_await_final().await { Ok(t) => t, Err(_) => { let (final_t, partial_t) = app_inner.latest_transcript.lock().await.clone(); @@ -818,196 +1000,65 @@ async fn stop_recording(app_handle: AppHandle) { } }; - let mut trimmed = text.trim().to_string(); - if !trimmed.is_empty() { + // Prepend any text accumulated across reconnects in this recording. + let prefix = app_inner.accumulated_text.lock().await.clone(); + let combined = format!("{}{}", prefix, session_text); + + // 5-9. Polish (if applicable), write clipboard, paste, record stats, end cue. + finalize_and_paste(&app_handle, &app_inner, combined).await; + + // 10. Close the WebSocket session + session.close(); + } else { + // No ready session: the connect never completed (or we stopped during a + // reconnect gap). Drop any buffered audio for this round. + app_inner.pending_audio.lock().await.clear(); + let prefix = app_inner.accumulated_text.lock().await.clone(); + if !prefix.trim().is_empty() { + // Salvage text accumulated before the disconnect instead of discarding. log_rec!( - info, - "Final text received ({} chars)", - trimmed.chars().count() + warn, + "Stop with no ready session; salvaging accumulated text" ); + finalize_and_paste(&app_handle, &app_inner, prefix).await; + } else { log_rec!( - debug, - "Final text preview: {:?}", - trimmed.chars().take(60).collect::() + warn, + "Stop with no ready ASR session; discarding buffered audio" ); - - // 5. Load config for LLM / behavior settings - let config = app_inner.config_manager.load_config().ok(); - let data_dir = app_handle - .path() - .app_data_dir() - .unwrap_or_else(|_| std::path::PathBuf::from(".")); - let resource_dir = app_handle - .path() - .resource_dir() - .unwrap_or_else(|_| std::path::PathBuf::from(".")); - let registry = crate::model::load_registry(&data_dir, &resource_dir); - - if let Some(ref config) = config { - let model_id = config.audio_provider(); - if config.hotword_replace(model_id, ®istry) { - let hotwords = app_inner.hotword_manager.active_words(); - if !hotwords.is_empty() { - trimmed = crate::asr::sherpa_onnx::online::restore_hotword_case( - &trimmed, &hotwords, - ); - } - } - } - - if config - .as_ref() - .map(|config| config.app.remove_trailing_period) - .unwrap_or(true) - && (trimmed.ends_with('。') || trimmed.ends_with('.')) - { - trimmed.pop(); - } - - // 6. Apply LLM structure_text only when a prompt-specific hotkey was used. - // The main hotkey (active_prompt_id = None) pastes raw text without polishing. - let active_prompt_id = app_handle - .try_state::() - .and_then(|s| s.0.lock().unwrap().clone()); - - let final_text = if let Some(ref config) = config { - if active_prompt_id.is_some() { - let prompts = app_inner.config_manager.load_prompts(); - let mut system_prompt = active_prompt_id - .as_ref() - .and_then(|pid| { - prompts - .iter() - .find(|p| &p.id == pid) - .map(|p| p.prompt.clone()) - .filter(|p| !p.trim().is_empty()) - }) - .unwrap_or_else(|| DEFAULT_STRUCTURE_PROMPT.to_string()); - - let model_id = config.audio_provider(); - let hotword_mode = config.hotword_llm_mode(model_id, ®istry); - let append_hotwords = match hotword_mode.as_str() { - "disabled" => false, - "force" => true, - _ => { - let resource_dir = app_handle - .path() - .resource_dir() - .unwrap_or_else(|_| std::path::PathBuf::from(".")); - let data_dir = app_handle - .path() - .app_data_dir() - .unwrap_or_else(|_| std::path::PathBuf::from(".")); - let registry = crate::model::load_registry(&data_dir, &resource_dir); - registry - .models - .iter() - .find(|m| m.id == model_id) - .map(|m| !m.capabilities.hotwords) - .unwrap_or(false) - } - }; - - if append_hotwords { - let hw: Vec = app_inner - .hotword_manager - .active_words() - .iter() - .map(|w| crate::hotword::strip_weight(w).to_string()) - .collect(); - if !hw.is_empty() { - system_prompt = format!( - "{}\n\n需要注意以下专有名词的准确拼写:{}", - system_prompt, - hw.join("、") - ); - } - } - - log_rec!( - debug, - "Applying LLM structure_text (prompt_id: {:?})", - active_prompt_id - ); - match crate::llm::call_llm_api(&config.llm, &trimmed, &system_prompt).await { - Ok(result) => { - log_rec!( - info, - "LLM polishing succeeded ({} chars)", - result.chars().count() - ); - result - } - Err(e) => { - log_rec!(warn, "LLM polishing failed: {}, using raw text", e); - let _ = app_handle.emit("overlay:event", serde_json::json!({ - "type": "hint", - "payload": { "text": format!("文本润色失败,已输出原文"), "level": "warn", "variant": "text" } - })); - trimmed.clone() - } + let _ = app_handle.emit("overlay:event", serde_json::json!({ + "type": "hint", + "payload": { "text": "语音服务连接失败,请重试", "level": "error", "variant": "text" } + })); + *app_inner.accumulated_text.lock().await = String::new(); + set_app_state(&app_handle, &app_inner, app_state::AppState::Idle).await; + let delayed_handle = app_handle.clone(); + let delayed_inner: Arc = Arc::clone(&app_inner); + tauri::async_runtime::spawn(async move { + tokio::time::sleep(Duration::from_secs(3)).await; + let still_idle = { + let s = delayed_inner.state.lock().await; + matches!(*s, app_state::AppState::Idle) + }; + if still_idle { + if let Some(overlay) = delayed_handle.get_webview_window("overlay") { + let _ = overlay.hide(); } - } else { - trimmed.clone() - } - } else { - trimmed.clone() - }; - - // Clear the active prompt ID after use - if let Some(active) = app_handle.try_state::() { - *active.0.lock().unwrap() = None; - } - - // 7. Write to clipboard - use tauri_plugin_clipboard_manager::ClipboardExt; - - // Save original clipboard content if we need to restore it later - let keep_clipboard = config - .as_ref() - .map(|c| c.app.keep_clipboard) - .unwrap_or(true); - let original_clipboard: Option = if !keep_clipboard { - app_handle.clipboard().read_text().ok() - } else { - None - }; - - if let Err(e) = app_handle.clipboard().write_text(&final_text) { - log_rec!(error, "Clipboard write failed: {}", e); - let _ = app_handle.emit("overlay:event", serde_json::json!({ - "type": "hint", - "payload": { "text": format!("剪贴板写入失败: {}", e), "level": "error", "variant": "text" } - })); - } - - // 8. Simulate paste keystroke - let _result = crate::paste::simulate_paste(); - - // Restore original clipboard content if keep_clipboard is disabled - if let Some(original) = original_clipboard { - if let Err(e) = app_handle.clipboard().write_text(&original) { - log_rec!(error, "Failed to restore clipboard: {}", e); } - } - - // 9. Record usage stats - app_inner.stats.lock().await.record_session(&final_text); - play_configured_sound(&app_handle, &app_inner, "end"); - } else { - log_rec!(warn, "Final text is empty, skipping paste"); + }); + return; } - - // 10. Close the WebSocket session - session.close(); } - // 11. Hide overlay + // 11. Clear cross-reconnect accumulated text for the next recording. + *app_inner.accumulated_text.lock().await = String::new(); + + // 12. Hide overlay if let Some(overlay) = app_handle.get_webview_window("overlay") { let _ = overlay.hide(); } - // 12. Set state back to idle + // 13. Set state back to idle set_app_state(&app_handle, &app_inner, app_state::AppState::Idle).await; } @@ -1044,6 +1095,13 @@ async fn cancel_recording(app_handle: AppHandle) { *recording_state.0.lock().unwrap() = false; log_rec!(debug, "Cancel requested"); + // Bump the epoch so any in-flight background connect task discards its + // result, and drop any audio buffered before the session was ready. + app_inner + .session_epoch + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + app_inner.pending_audio.lock().await.clear(); + // Clear the active prompt ID since the session was cancelled if let Some(active) = app_handle.try_state::() { *active.0.lock().unwrap() = None; @@ -1056,6 +1114,7 @@ async fn cancel_recording(app_handle: AppHandle) { } *app_inner.asr_events.lock().await = None; *app_inner.latest_transcript.lock().await = (String::new(), String::new()); + *app_inner.accumulated_text.lock().await = String::new(); let _ = app_handle.emit("overlay:event", serde_json::json!({ "type": "reset" })); if let Some(overlay) = app_handle.get_webview_window("overlay") { @@ -1067,93 +1126,497 @@ async fn cancel_recording(app_handle: AppHandle) { /// Default system prompt for LLM text structuring. const DEFAULT_STRUCTURE_PROMPT: &str = "整理语音转写内容,仅输出最终文本,不附加其他内容。\n- 删除语气词、重复内容及多余口语词汇\n- 理顺语序,保证逻辑流畅\n- 修正识别错误,还原正确词汇与专有名词\n- 忠于原意,不新增、改动信息\n- 篇幅较长则使用列表结构化呈现,短句不作格式调整"; -/// Forward ASR events from the event channel to the overlay window. -/// Runs in a spawned async task for the duration of an active recording session. -async fn forward_asr_events( +/// Maximum number of consecutive ASR reconnect attempts before giving up and +/// finalizing the recording with whatever text was recognized so far. Reset to +/// zero each time the reconnected session produces a fresh transcript. +const MAX_ASR_RECONNECT: u32 = 3; + +/// Read the current recording flag without holding the lock across await points. +fn is_recording(app: &AppHandle) -> bool { + app.try_state::() + .map(|state| *state.0.lock().unwrap()) + .unwrap_or(false) +} + +/// True while this manager's recording session is still the active, current one +/// (not stopped, cancelled, or superseded by a restart). +fn session_is_active(app: &AppHandle, app_inner: &app_state::AppInner, my_epoch: u64) -> bool { + is_recording(app) && is_current_epoch(app_inner, my_epoch) +} + +/// Snapshot the best text recognized in the current session from shared state. +/// `manage_asr_session` stores every transcript here, so it serves as the +/// carry-over source across a reconnect and the salvage source on failure. +async fn current_session_text(app_inner: &app_state::AppInner) -> String { + let (final_t, partial_t) = app_inner.latest_transcript.lock().await.clone(); + if !final_t.is_empty() { + final_t + } else { + partial_t + } +} + +/// Manage an ASR session for the duration of a recording: forward transcripts to +/// the overlay, and on a recoverable error/close, auto-reconnect a fresh session +/// (carrying already-recognized text). On a fatal error or after reconnects are +/// exhausted, finalize the recording with the accumulated text instead of +/// discarding it. +async fn manage_asr_session( app: AppHandle, mut event_rx: tokio::sync::mpsc::UnboundedReceiver, + my_epoch: u64, ) { use crate::asr::AsrEvent; - log_events!(debug, "Event forwarding task started"); - while let Some(event) = event_rx.recv().await { - match event { - AsrEvent::Transcript { - final_text, - partial_text, - } => { - // Save latest transcript in shared state - let state = app.state::>(); - *state.latest_transcript.lock().await = (final_text.clone(), partial_text.clone()); - - let _ = app.emit( - "overlay:event", - serde_json::json!({ - "type": "transcript", - "payload": { - "finalText": final_text, - "partialText": partial_text, + let app_inner = app.state::>().inner().clone(); + let mut reconnect_attempts: u32 = 0; + + log_events!(debug, "ASR session manager started (epoch {})", my_epoch); + 'outer: loop { + while let Some(event) = event_rx.recv().await { + match event { + AsrEvent::Transcript { + final_text, + partial_text, + } => { + // A real transcript means the (possibly reconnected) session is + // healthy again: reset the failure counter. + reconnect_attempts = 0; + + // Save this session's transcript (without the cross-reconnect + // prefix) so stop_recording can fall back to it. + *app_inner.latest_transcript.lock().await = + (final_text.clone(), partial_text.clone()); + + // Prepend text accumulated from prior (disconnected) sessions + // so the overlay shows the complete running transcript. + let prefix = app_inner.accumulated_text.lock().await.clone(); + let display_final = format!("{}{}", prefix, final_text); + + let _ = app.emit( + "overlay:event", + serde_json::json!({ + "type": "transcript", + "payload": { + "finalText": display_final, + "partialText": partial_text, + } + }), + ); + } + AsrEvent::Open => { + log_events!(info, "ASR connection opened"); + } + AsrEvent::Error { message, fatal } => { + log_events!(error, "ASR error: {} (fatal={})", message, fatal); + // If the user stopped/cancelled/restarted, the owning path + // handles finalization; don't interfere. + if !session_is_active(&app, &app_inner, my_epoch) { + break 'outer; + } + if !fatal && reconnect_attempts < MAX_ASR_RECONNECT { + if let Some(rx) = + try_reconnect_asr(&app, &app_inner, my_epoch, &mut reconnect_attempts) + .await + { + event_rx = rx; + continue 'outer; } - }), - ); - } - AsrEvent::Error(msg) => { - log_events!(error, "ASR error: {}", msg); - let _ = app.emit( - "overlay:event", - serde_json::json!({ - "type": "hint", - "payload": { - "text": msg, - "level": "error", - "variant": "text", + } + finalize_on_failure(&app, &app_inner, &message).await; + break 'outer; + } + AsrEvent::Close { code, reason } => { + log_events!( + info, + "ASR connection closed (code={:?}, reason={:?})", + code, + reason + ); + if !session_is_active(&app, &app_inner, my_epoch) { + break 'outer; + } + // Unexpected close mid-recording: treat as recoverable. + if reconnect_attempts < MAX_ASR_RECONNECT { + if let Some(rx) = + try_reconnect_asr(&app, &app_inner, my_epoch, &mut reconnect_attempts) + .await + { + event_rx = rx; + continue 'outer; } - }), - ); - // Auto-stop: reset recording state and hide overlay - if let Some(state) = app.try_state::() { - *state.0.lock().unwrap() = false; + } + finalize_on_failure(&app, &app_inner, "ASR 连接已断开").await; + break 'outer; } - if let Some(inner) = app.try_state::>() { - set_app_state(&app, &inner, app_state::AppState::Idle).await; + } + } + // Event channel closed without an explicit terminal event. + break 'outer; + } + log_events!(debug, "ASR session manager ended (epoch {})", my_epoch); +} + +/// Attempt to rebuild the ASR session after a recoverable failure. Carries the +/// dying session's recognized text into `accumulated_text`, connects a fresh +/// session, flushes any audio buffered during the reconnect gap, and swaps it +/// into shared state so audio routing resumes automatically. Returns the new +/// event receiver on success, or `None` if the recording ended or the reconnect +/// failed. +async fn try_reconnect_asr( + app: &AppHandle, + app_inner: &Arc, + my_epoch: u64, + attempts: &mut u32, +) -> Option> { + *attempts += 1; + log_events!( + warn, + "ASR reconnecting (attempt {}/{})", + attempts, + MAX_ASR_RECONNECT + ); + let _ = app.emit( + "overlay:event", + serde_json::json!({ + "type": "hint", + "payload": { "text": "网络中断,正在重连…", "level": "warn", "variant": "text" } + }), + ); + + // Play the end cue on the first reconnect attempt so the user audibly knows + // recording was interrupted and can stop talking until they hear it resume. + if *attempts == 1 { + emit_cue(app, app_inner, "end"); + } + + // Carry the dying session's recognized text into the accumulated prefix, and + // clear the session slot so chunks spoken during the gap buffer into + // pending_audio (drained into the new session below). + let carry = current_session_text(app_inner).await; + if let Some(old) = app_inner.asr_session.lock().await.take() { + old.close(); + } + if !carry.is_empty() { + app_inner.accumulated_text.lock().await.push_str(&carry); + } + *app_inner.latest_transcript.lock().await = (String::new(), String::new()); + + // Small backoff before retrying. + tokio::time::sleep(Duration::from_millis(300)).await; + + if !session_is_active(app, app_inner, my_epoch) { + return None; + } + + let config = match app_inner.config_manager.load_config() { + Ok(c) => c, + Err(e) => { + log_events!(error, "Reconnect aborted, config load failed: {}", e); + return None; + } + }; + let hotwords = app_inner.hotword_manager.active_words(); + + match create_active_session(app, &config, &hotwords).await { + Ok((session, event_rx, _)) => { + let session: Arc = Arc::from(session); + // Flush audio buffered during the gap, then publish, under the + // asr_session lock so send_audio_chunk cannot interleave. + { + let mut slot = app_inner.asr_session.lock().await; + if !session_is_active(app, app_inner, my_epoch) { + session.close(); + return None; + } + let buffered: Vec> = + app_inner.pending_audio.lock().await.drain(..).collect(); + for chunk in &buffered { + session.append_audio(chunk); } - if let Some(overlay) = app.get_webview_window("overlay") { + if !buffered.is_empty() { + log_events!( + debug, + "Flushed {} buffered chunk(s) after reconnect", + buffered.len() + ); + } + *slot = Some(session); + } + log_events!(info, "ASR reconnected successfully"); + let _ = app.emit( + "overlay:event", + serde_json::json!({ + "type": "hint", + "payload": { "text": "已重连", "level": "info", "variant": "text" } + }), + ); + // Play the start cue so the user audibly knows recording resumed. + emit_cue(app, app_inner, "start"); + Some(event_rx) + } + Err(e) => { + log_events!(error, "ASR reconnect failed: {}", e); + None + } + } +} + +/// Finalize a recording that failed mid-stream (fatal error or exhausted +/// reconnects): salvage the accumulated text plus the current session's text and +/// run it through the normal paste pipeline, then tear down and hide the overlay. +async fn finalize_on_failure(app: &AppHandle, app_inner: &Arc, message: &str) { + // Stop recording so audio routing and hotkey toggling settle. + if let Some(state) = app.try_state::() { + *state.0.lock().unwrap() = false; + } + + // Gather salvageable text: accumulated prefix + the dying session's text. + let session_text = current_session_text(app_inner).await; + if let Some(s) = app_inner.asr_session.lock().await.take() { + s.close(); + } + + let prefix = app_inner.accumulated_text.lock().await.clone(); + let combined = format!("{}{}", prefix, session_text); + + // Reset cross-reconnect / buffering state. + *app_inner.accumulated_text.lock().await = String::new(); + *app_inner.latest_transcript.lock().await = (String::new(), String::new()); + *app_inner.asr_events.lock().await = None; + app_inner.pending_audio.lock().await.clear(); + + stop_renderer_audio(app, app_inner, 1200).await; + + if combined.trim().is_empty() { + // Nothing to salvage: surface the error so the user understands the abort. + log_events!(warn, "ASR failed with no recognized text: {}", message); + let _ = app.emit( + "overlay:event", + serde_json::json!({ + "type": "hint", + "payload": { "text": message, "level": "error", "variant": "text" } + }), + ); + if let Some(active) = app.try_state::() { + *active.0.lock().unwrap() = None; + } + set_app_state(app, app_inner, app_state::AppState::Idle).await; + let delayed_handle = app.clone(); + let delayed_inner: Arc = Arc::clone(app_inner); + tauri::async_runtime::spawn(async move { + tokio::time::sleep(Duration::from_secs(3)).await; + let still_idle = { + let s = delayed_inner.state.lock().await; + matches!(*s, app_state::AppState::Idle) + }; + if still_idle { + if let Some(overlay) = delayed_handle.get_webview_window("overlay") { let _ = overlay.hide(); } } - AsrEvent::Open => { - log_events!(info, "ASR connection opened"); + }); + return; + } + + // Salvaged text exists: paste it as if the recording had ended normally. + log_events!( + warn, + "ASR failed; salvaging recognized text ({} chars): {}", + combined.chars().count(), + message + ); + finalize_and_paste(app, app_inner, combined).await; + + if let Some(overlay) = app.get_webview_window("overlay") { + let _ = overlay.hide(); + } + set_app_state(app, app_inner, app_state::AppState::Idle).await; +} + +/// Run recognized text through the finishing pipeline: optional LLM polishing +/// (prompt-specific hotkeys only, with sherpa-onnx hotword hinting), trailing- +/// period trimming, clipboard write (honoring keep_clipboard), simulated paste, +/// usage stats, and the end cue. Shared by the normal stop path and the +/// failure-salvage path. +async fn finalize_and_paste( + app_handle: &AppHandle, + app_inner: &Arc, + raw_text: String, +) { + let trimmed = raw_text.trim().to_string(); + + // Always clear the active prompt ID once a recording concludes. + let active_prompt_id = app_handle + .try_state::() + .and_then(|s| s.0.lock().unwrap().clone()); + if let Some(active) = app_handle.try_state::() { + *active.0.lock().unwrap() = None; + } + + if trimmed.is_empty() { + log_rec!(warn, "Final text is empty, skipping paste"); + return; + } + + log_rec!( + info, + "Final text received ({} chars)", + trimmed.chars().count() + ); + log_rec!( + debug, + "Final text preview: {:?}", + trimmed.chars().take(60).collect::() + ); + + // Load config + model registry for LLM / behavior settings. + let config = app_inner.config_manager.load_config().ok(); + let resource_dir = app_handle + .path() + .resource_dir() + .unwrap_or_else(|_| std::path::PathBuf::from(".")); + let data_dir = app_handle + .path() + .app_data_dir() + .unwrap_or_else(|_| std::path::PathBuf::from(".")); + let registry = crate::model::load_registry(&data_dir, &resource_dir); + + let mut trimmed = trimmed; + + // Restore hotword casing for engines configured to do so (e.g. sherpa-onnx + // lowercases proper nouns recognized via its hotword list). + if let Some(ref config) = config { + let model_id = config.audio_provider(); + if config.hotword_replace(model_id, ®istry) { + let hotwords = app_inner.hotword_manager.active_words(); + if !hotwords.is_empty() { + trimmed = + crate::asr::sherpa_onnx::online::restore_hotword_case(&trimmed, &hotwords); + } + } + } + + if config + .as_ref() + .map(|config| config.app.remove_trailing_period) + .unwrap_or(true) + && (trimmed.ends_with('。') || trimmed.ends_with('.')) + { + trimmed.pop(); + } + + // Apply LLM structure_text only when a prompt-specific hotkey was used. + // The main hotkey (active_prompt_id = None) pastes raw text without polishing. + let final_text = if let Some(ref config) = config { + if active_prompt_id.is_some() { + let prompts = app_inner.config_manager.load_prompts(); + let mut system_prompt = active_prompt_id + .as_ref() + .and_then(|pid| { + prompts + .iter() + .find(|p| &p.id == pid) + .map(|p| p.prompt.clone()) + .filter(|p| !p.trim().is_empty()) + }) + .unwrap_or_else(|| DEFAULT_STRUCTURE_PROMPT.to_string()); + + // Append hotwords to the LLM prompt as a proper-noun hint, per the + // model's hotword_llm_mode ("disabled" / "force" / auto = only when the + // engine itself lacks hotword support). + let model_id = config.audio_provider(); + let append_hotwords = match config.hotword_llm_mode(model_id, ®istry).as_str() { + "disabled" => false, + "force" => true, + _ => registry + .models + .iter() + .find(|m| m.id == model_id) + .map(|m| !m.capabilities.hotwords) + .unwrap_or(false), + }; + if append_hotwords { + let hw: Vec = app_inner + .hotword_manager + .active_words() + .iter() + .map(|w| crate::hotword::strip_weight(w).to_string()) + .collect(); + if !hw.is_empty() { + system_prompt = format!( + "{}\n\n需要注意以下专有名词的准确拼写:{}", + system_prompt, + hw.join("、") + ); + } } - AsrEvent::Close { code, reason } => { - log_events!( - info, - "ASR connection closed (code={:?}, reason={:?})", - code, - reason - ); - // If connection closed during recording, auto-stop - // Extract the flag eagerly to avoid holding MutexGuard across .await - let was_recording = app - .try_state::() - .map(|state| { - let mut recording = state.0.lock().unwrap(); - let is_rec = *recording; - *recording = false; - is_rec - }) - .unwrap_or(false); - if was_recording { - if let Some(inner) = app.try_state::>() { - set_app_state(&app, &inner, app_state::AppState::Idle).await; - } - if let Some(overlay) = app.get_webview_window("overlay") { - let _ = overlay.hide(); - } + + log_rec!( + debug, + "Applying LLM structure_text (prompt_id: {:?})", + active_prompt_id + ); + match crate::llm::call_llm_api(&config.llm, &trimmed, &system_prompt).await { + Ok(result) => { + log_rec!( + info, + "LLM polishing succeeded ({} chars)", + result.chars().count() + ); + result + } + Err(e) => { + log_rec!(warn, "LLM polishing failed: {}, using raw text", e); + let _ = app_handle.emit("overlay:event", serde_json::json!({ + "type": "hint", + "payload": { "text": "文本润色失败,已输出原文", "level": "warn", "variant": "text" } + })); + trimmed.clone() } } + } else { + trimmed.clone() } + } else { + trimmed.clone() + }; + + // Write to clipboard + use tauri_plugin_clipboard_manager::ClipboardExt; + + // Save original clipboard content if we need to restore it later + let keep_clipboard = config + .as_ref() + .map(|c| c.app.keep_clipboard) + .unwrap_or(true); + let original_clipboard: Option = if !keep_clipboard { + app_handle.clipboard().read_text().ok() + } else { + None + }; + + if let Err(e) = app_handle.clipboard().write_text(&final_text) { + log_rec!(error, "Clipboard write failed: {}", e); + let _ = app_handle.emit("overlay:event", serde_json::json!({ + "type": "hint", + "payload": { "text": format!("剪贴板写入失败: {}", e), "level": "error", "variant": "text" } + })); } - log_events!(debug, "Event forwarding task ended"); + + // Simulate paste keystroke + let _result = crate::paste::simulate_paste(); + + // Restore original clipboard content if keep_clipboard is disabled + if let Some(original) = original_clipboard { + if let Err(e) = app_handle.clipboard().write_text(&original) { + log_rec!(error, "Failed to restore clipboard: {}", e); + } + } + + // Record usage stats + app_inner.stats.lock().await.record_session(&final_text); + emit_cue(app_handle, app_inner, "end"); } /// Reload all hotkey bindings from the current config and prompts. diff --git a/web/src/ui/main-overlay.ts b/web/src/ui/main-overlay.ts index cf59385..d272c0c 100644 --- a/web/src/ui/main-overlay.ts +++ b/web/src/ui/main-overlay.ts @@ -334,8 +334,13 @@ function resetState(): void { // ---- Audio pipeline ---- -function flushPendingAudio(force = false): void { +// Returns a promise that resolves once every chunk dispatched in this call has +// been acked by the backend. The final flush (force=true) is awaited by +// stopAudioCapture so the last audio reaches the backend *before* audio_stopped +// fires, guaranteeing the commit's last packet is sent after all audio. +function flushPendingAudio(force = false): Promise { const targetChunkSize = 1600; + const sends: Promise[] = []; while ( state.pendingSamples.length >= targetChunkSize || (force && state.pendingSamples.length > 0) @@ -351,17 +356,195 @@ function flushPendingAudio(force = false): void { state.audioReady = true; updateView(); } - sendAudioChunk(base64Chunk).catch(() => { - state.hintText = "音频发送失败"; - state.hintLevel = "error"; - state.hintVariant = "text"; - updateView(); - }); + sends.push( + sendAudioChunk(base64Chunk).catch(() => { + state.hintText = "音频发送失败"; + state.hintLevel = "error"; + state.hintVariant = "text"; + updateView(); + }), + ); if (force) break; } + return Promise.all(sends); +} + +// Dedicated AudioContext used to play both the start and end cues. It is kept +// separate from the capture context on purpose: the capture context is driven by +// a main-thread ScriptProcessorNode doing heavy per-chunk work (downsample + +// base64 + IPC), and that congestion underruns the shared output, making a cue +// rendered there stutter. A capture-free context renders cues on its own audio +// thread, unaffected by capture load. +// +// Lifecycle (warm during the session, suspended while idle so it draws no power +// and does not hold the output device): +// - warmup → resume the context and start the keep-alive +// - start cue → plays; context stays running (decoded buffer cached) +// - recording / stop → stays running through the finishing flow +// - end cue → plays on the still-warm context +// - back to idle → stop the keep-alive and suspend +let cueAudioContext: AudioContext | null = null; +let cuePlaying = false; +let cueKeepAlive: AudioBufferSourceNode | null = null; + +// Decoded cue AudioBuffers cached by kind ("start" / "end"), so decodeAudioData +// runs once per sound instead of on every play — eliminating decode-time jank. +// Re-decoded only if the underlying bytes change (custom sound swapped in). +const cueBufferCache = new Map(); + +// Amplitude of the keep-alive priming signal — kept inaudibly low. NOTE: raising +// this (tried up to ~ -60 dBFS / 0.001) did NOT stop the cue stutter and was +// audible as hiss, so do not raise it. The cue stutter is instead addressed by +// pre-decoding the cue into a cached AudioBuffer (eliminating decode-time jank). +const CUE_KEEPALIVE_AMPLITUDE = 0.00012; + +// A looping low-amplitude noise source that keeps the output device actively +// rendering for the whole session. Without it (or with pure silence), a resumed +// context lets the OS power the output device back down, so the next cue plays +// into a cold device and glitches mid-playback (notably the start cue after even +// a few seconds idle). The continuous inaudible priming keeps the device hot. +function startCueKeepAlive(): void { + if (cueKeepAlive || !cueAudioContext || cueAudioContext.state !== "running") { + return; + } + const frames = Math.max(1, Math.floor(cueAudioContext.sampleRate * 0.5)); + const buffer = cueAudioContext.createBuffer(1, frames, cueAudioContext.sampleRate); + // Fill with tiny noise (not zeros) so the device stays powered. Inaudible. + const channel = buffer.getChannelData(0); + for (let index = 0; index < channel.length; index += 1) { + channel[index] = (Math.random() * 2 - 1) * CUE_KEEPALIVE_AMPLITUDE; + } + const source = cueAudioContext.createBufferSource(); + source.buffer = buffer; + source.loop = true; + source.connect(cueAudioContext.destination); + source.start(); + cueKeepAlive = source; +} + +function stopCueKeepAlive(): void { + if (cueKeepAlive) { + try { + cueKeepAlive.stop(); + } catch { + // already stopped + } + cueKeepAlive.disconnect(); + cueKeepAlive = null; + } +} + +// Create the cue context if needed, resume it, and start the keep-alive so the +// output device is warm and settled by the time a cue plays. Idempotent; called +// during warmup. +function ensureCueContextWarm(): void { + try { + if (!cueAudioContext) { + const AudioContextCtor = + window.AudioContext || + (window as unknown as { webkitAudioContext?: typeof AudioContext }).webkitAudioContext; + if (!AudioContextCtor) return; + cueAudioContext = new AudioContextCtor(); + } + if (cueAudioContext.state === "suspended") { + cueAudioContext + .resume() + .then(() => startCueKeepAlive()) + .catch(() => {}); + } else { + startCueKeepAlive(); + } + } catch (error) { + sendDiagnostic({ + type: "cue:warm-failed", + message: (error as Error).message || String(error), + }); + } +} + +// Release the cue context once the session is fully over. Suspends (and stops the +// keep-alive) only when back to idle and no cue is mid-playback, so the end cue — +// which plays during the finishing flow — is never cut off. +function maybeSuspendCue(): void { + if (state.appState !== "idle" || cuePlaying) { + return; + } + stopCueKeepAlive(); + if (cueAudioContext && cueAudioContext.state === "running") { + cueAudioContext.suspend().catch(() => {}); + } +} + +// Decode the cue bytes to an AudioBuffer, caching by kind so decodeAudioData runs +// once per sound. Re-decodes only when the bytes change (custom sound swapped in). +async function getCueBuffer(kind: string, base64Data: string): Promise { + const cached = cueBufferCache.get(kind); + if (cached && cached.data === base64Data) { + return cached.buffer; + } + const binary = atob(base64Data); + const bytes = new Uint8Array(binary.length); + for (let index = 0; index < binary.length; index += 1) { + bytes[index] = binary.charCodeAt(index); + } + if (!cueAudioContext) throw new Error("cue context unavailable"); + const buffer = await cueAudioContext.decodeAudioData(bytes.buffer); + cueBufferCache.set(kind, { data: base64Data, buffer }); + return buffer; +} + +// Play a cue (start or end). Backend sends the raw audio file bytes (e.g. mp3) as +// base64; we decode (cached) and play it through the cue context, then try to +// suspend once it ends (only takes effect when the session is idle). +async function playCue(payload: { kind?: string; data?: string }): Promise { + const kind = payload?.kind || "start"; + const base64Data = payload?.data; + if (!base64Data) return; + + // Mark the cue in-flight before any await. The end cue and the idle-state event + // arrive back-to-back; their async handlers interleave at await points. If + // maybeSuspendCue (from the idle handler) ran while this was still false it + // would suspend the context mid-decode, so the cue would start on a suspended + // context and only sound on the next resume (next session). Setting the flag up + // front makes that suspend a no-op until this cue's onended fires. + cuePlaying = true; + + try { + ensureCueContextWarm(); + if (!cueAudioContext) { + cuePlaying = false; + return; + } + if (cueAudioContext.state === "suspended") { + await cueAudioContext.resume(); + startCueKeepAlive(); + } + + const audioBuffer = await getCueBuffer(kind, base64Data); + const source = cueAudioContext.createBufferSource(); + source.buffer = audioBuffer; + source.connect(cueAudioContext.destination); + source.onended = () => { + cuePlaying = false; + maybeSuspendCue(); + }; + source.start(); + } catch (error) { + cuePlaying = false; + maybeSuspendCue(); + sendDiagnostic({ + type: "cue:play-failed", + message: (error as Error).message || String(error), + }); + } } async function startAudioCapture(): Promise { + // Create/resume the cue context early (during warmup) so the start cue has no + // cold-start. The keep-alive started here is restarted after getUserMedia below, + // because opening the mic reconfigures the shared output device. Suspended when idle. + ensureCueContextWarm(); + if (state.mediaStream) return; sendDiagnostic({ type: "audio:capture-starting" }); @@ -404,12 +587,21 @@ async function startAudioCapture(): Promise { state.processorNode = processorNode; state.analyserNode = analyserNode; + // getUserMedia just reconfigured the shared output device, which can silence the + // cue keep-alive started above (the device is re-routed without firing onended). + // Restart it now, after capture is fully set up, so it actively holds the output + // device warm through the backend's pre-cue settle delay — otherwise the device + // cools during that wait and the start cue plays back stuttering. + stopCueKeepAlive(); + ensureCueContextWarm(); + sendDiagnostic({ type: "audio:capture-started", sampleRate: audioContext.sampleRate }); } async function stopAudioCapture(): Promise { stopWaveformAnimation(); - flushPendingAudio(true); + // Await so the final chunk reaches the backend before we signal audio_stopped. + await flushPendingAudio(true); if (state.analyserNode) { state.analyserNode.disconnect(); @@ -433,6 +625,10 @@ async function stopAudioCapture(): Promise { state.audioContext = null; } state.pendingSamples = []; + + // Note: the cue context is intentionally left running here. The end cue plays + // later in the finishing flow, so it is only suspended once the session is back + // to idle (see maybeSuspendCue, called from the end cue and the idle handler). } // ---- Event handling ---- @@ -446,6 +642,11 @@ onOverlayEvent(async (event: OverlayEvent) => { case "state": state.appState = (payload as { state: AppState }).state; if (state.appState === "idle" || state.appState === "connecting") state.audioReady = false; + if (state.appState === "idle") { + // Session over: suspend the cue context. No-op if the end cue is still + // playing — its onended handler suspends once it finishes. + maybeSuspendCue(); + } if (state.appState === "recording") startWaveformAnimation(); if ( state.appState === "idle" || @@ -511,6 +712,9 @@ onOverlayEvent(async (event: OverlayEvent) => { updateView(); break; } + case "cue:play": + playCue(payload as { kind?: string; data?: string }); + break; case "paste:done": case "sound:config": break;