From 9246d7215bfba676f5c414dd82cdb853f44e06ac Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Sat, 6 Jun 2026 18:26:07 +0000 Subject: [PATCH] fix: reset the streaming decoder on / so transcription continues (#13) The realtime EOU model (parakeet_realtime_eou_120m-v1) emits / as ordinary vocab tokens to mark end of utterance. The cache-aware streaming decode carried the RNN-T decoder state across chunks but never reset it, so once was emitted the prediction net stayed conditioned on it and the joint scored blank on every following frame: the stream went silent after the first utterance (issue #13). This matched NeMo's plain rnnt_decoder_predictions_tensor (which does the same), but that is not how the model is meant to run. NeMo's reference streaming driver for this model (examples/voice_agent/.../nemo/streaming_asr.py NemoStreamingASRService.transcribe) calls reset_state() whenever / appears in a chunk, so the next utterance decodes from a fresh decoder state. StreamingSession::feed_mel_chunk now does the same: after a chunk emits / it resets the RNN-T decoder state (LSTM h/c to zero, last token back to SOS) for the next chunk. Only the decoder is reset, not the StreamingEncoder cache. NeMo's reset_state also drops the encoder cache, but that was verified byte-identical on the transcript (decoder-only reset == full reset_state on the diffusion 60s/2-EOU and 180s/5-EOU clips), so the validated streaming-encoder path is left untouched. enc_frame_ keeps running so EOU timestamps stay absolute in the clip, and the offline path is unchanged (it matches NeMo offline on single utterances). Adds a gated regression test (test_streaming_eou_reset) plus a NeMo reset-on-EOU baseline generator (gen_stream_reset_baseline.py) that builds a two-utterance clip so an fires mid-stream; the test asserts our streamed transcript matches NeMo's reset reference exactly and that the second utterance is recovered. Confirmed it fails with the reset disabled. Co-Authored-By: Claude Opus 4.8 (1M context) --- AGENTS.md | 1 + docs/parity.md | 24 +++ scripts/gen_stream_reset_baseline.py | 197 ++++++++++++++++++++++++ src/streaming.cpp | 22 +++ tests/CMakeLists.txt | 5 +- tests/test_streaming_eou_reset.cpp | 214 +++++++++++++++++++++++++++ 6 files changed, 461 insertions(+), 2 deletions(-) create mode 100644 scripts/gen_stream_reset_baseline.py create mode 100644 tests/test_streaming_eou_reset.cpp diff --git a/AGENTS.md b/AGENTS.md index 69cf90e..d36fe45 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -99,6 +99,7 @@ tests/ ctest targets test_transcribe_eou.cpp , offline EOU model transcript + token ids (PARAKEET_TEST_GGUF_EOU) test_streaming_encoder.cpp, cache-aware streaming encoder == offline + NeMo test_streaming_decode.cpp , streaming RNN-T tokens == NeMo cache-aware streaming + test_streaming_eou_reset.cpp, multi-utterance streaming: decoder resets on , transcript == NeMo reset-on-EOU (issue #13; PARAKEET_TEST_BASELINE_EOU_RESET) test_capi_stream.cpp , streaming C-API transcript == NeMo streaming (PARAKEET_TEST_BASELINE_EOU_STREAM) python/check_convert.py , converter round-trip (model-dependent) python/check_baseline.py, baseline dumper (model-dependent) diff --git a/docs/parity.md b/docs/parity.md index 94835d4..159f582 100644 --- a/docs/parity.md +++ b/docs/parity.md @@ -549,6 +549,24 @@ from `libparakeet.so` (verified via `nm -D`). `PARAKEET_TEST_BASELINE_EOU` + `PARAKEET_TEST_BASELINE_EOU_STREAM`). - `tests/test_streaming_decode.cpp` (`test_streaming_decode`) — streaming tokens == NeMo streaming == offline minus trailing ``. +- `tests/test_streaming_eou_reset.cpp` (`test_streaming_eou_reset`) — + multi-utterance streaming. The realtime EOU model emits ``/`` at the + end of each utterance; NeMo's reference streaming driver (voice_agent + `NemoStreamingASRService.transcribe` → `reset_state` on ``/``) resets + the decoder so the NEXT utterance decodes fresh. Without that reset the + prediction net stays conditioned on `` and the stream goes silent after + the first utterance (issue #13). The baseline + (`scripts/gen_stream_reset_baseline.py`) builds a two-utterance clip + (`speech.wav` + silence + `speech.wav`) so an `` fires mid-stream, runs + NeMo's cache-aware streaming loop WITH reset-on-EOU, and stores the token + sequence; the test asserts our streamed transcript (non-special tokens) matches + it EXACTLY and that the second utterance is recovered after the mid-stream + ``. Skips (exit 77) unless `PARAKEET_TEST_GGUF_EOU` + + `PARAKEET_TEST_BASELINE_EOU_RESET` are set. NOTE: `pk::StreamingSession` resets + only the decoder state (LSTM + last token), not the encoder cache — verified + byte-identical to NeMo's full `reset_state` on the transcript; the lone + difference is a possible trailing end-of-clip `` (the documented + streaming-tail artifact), which never changes the transcript. - `tests/test_capi_stream.cpp` (`test_capi_stream`) — feeds `speech.wav` PCM in chunks through the streaming C-API; the concatenated text + `finalize` equals `baseline.stream_text` from `/tmp/baseline_eou_stream.gguf` (NeMo streaming). @@ -563,6 +581,11 @@ Reproduce: --model nvidia/parakeet_realtime_eou_120m-v1 \ --audio tests/fixtures/speech.wav --output /tmp/baseline_eou_stream.gguf +# Multi-utterance reset-on-EOU reference (issue #13): +.venv/bin/python scripts/gen_stream_reset_baseline.py \ + --model nvidia/parakeet_realtime_eou_120m-v1 \ + --audio tests/fixtures/speech.wav --output /tmp/baseline_eou_reset.gguf + # CLI streaming: ./build/examples/cli/parakeet-cli transcribe \ --model /tmp/eou.gguf --input tests/fixtures/speech.wav --stream @@ -573,6 +596,7 @@ Reproduce: PARAKEET_TEST_GGUF_EOU=/tmp/eou.gguf \ PARAKEET_TEST_BASELINE_EOU=/tmp/baseline_eou.gguf \ PARAKEET_TEST_BASELINE_EOU_STREAM=/tmp/baseline_eou_stream.gguf \ +PARAKEET_TEST_BASELINE_EOU_RESET=/tmp/baseline_eou_reset.gguf \ ctest --test-dir build -R "test_capi_stream|test_streaming" --output-on-failure ``` diff --git a/scripts/gen_stream_reset_baseline.py b/scripts/gen_stream_reset_baseline.py new file mode 100644 index 0000000..d7570c9 --- /dev/null +++ b/scripts/gen_stream_reset_baseline.py @@ -0,0 +1,197 @@ +#!/usr/bin/env python3 +"""Dump NeMo cache-aware streaming decode WITH end-of-utterance reset to a gguf. + +Reference for the multi-utterance streaming regression (issue #13: "streaming +stops at first [EOU]"). The realtime EOU model emits / to mark the end +of an utterance; NeMo's reference streaming driver +(examples/voice_agent/.../nemo/streaming_asr.py NemoStreamingASRService.transcribe) +calls reset_state() whenever / appears in a chunk, so the NEXT utterance +decodes from a fresh decoder state. Without that reset the decoder stays +conditioned on and goes silent after the first utterance. + +To exercise a MID-STREAM EOU (the existing speech.wav fires only on the +final streaming tail, which is dropped) this script builds a two-utterance clip +from the supplied --audio by concatenating it with a short silence gap and a +second copy, then runs NeMo's canonical cache-aware streaming loop (identical to +gen_stream_baseline.py — the schedule pk::run_stream_over_pcm mirrors) WITH the +reset-on-EOU behavior, accumulating the full token sequence across resets. + +Stored: +* ``mel`` ``[n_mels, T]`` f32 the two-utterance clip mel + (feat-major inner=T) +* ``reset_token_ids`` ``[L]`` int32 full streaming token ids across the + whole clip WITH reset-on-EOU (incl + the / specials) +* ``reset.eou_count`` uint32 number of / events +* ``reset.eou_id`` / ``reset.eob_id`` int32 the special token ids +* ``reset.first_eou_index`` uint32 index in reset_token_ids of the + FIRST / (so the C++ test + can assert tokens exist AFTER it) + +Exit codes (ctest convention): 0 ok, 2 deps/model unavailable, 1 fail. +""" +import argparse +import pathlib +import sys +import warnings + +warnings.filterwarnings("ignore", category=UserWarning) +import numpy as np + +try: + import gguf +except ImportError as e: # pragma: no cover + print(f"stream-reset-baseline: missing dependency 'gguf': {e}", file=sys.stderr) + print("PARAKEET_CONVERT_DEPS_MISSING", file=sys.stderr) + sys.exit(2) + +try: + import torch + import soundfile as sf + from omegaconf import open_dict + from nemo.collections.asr.models import ASRModel + from nemo.collections.asr.parts.utils.streaming_utils import ( + CacheAwareStreamingAudioBuffer, + ) +except ImportError as e: # pragma: no cover + print(f"stream-reset-baseline: missing dependency: {e}", file=sys.stderr) + print("PARAKEET_CONVERT_DEPS_MISSING", file=sys.stderr) + sys.exit(2) + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--model", default="nvidia/parakeet_realtime_eou_120m-v1", + help="HF id or local .nemo") + ap.add_argument("--audio", required=True, help="16k mono wav clip (one utterance)") + ap.add_argument("--output", required=True) + ap.add_argument("--gap-secs", type=float, default=0.6, + help="silence between the two copies") + args = ap.parse_args() + + try: + if pathlib.Path(args.model).exists(): + m = ASRModel.restore_from(args.model, map_location="cpu") + else: + m = ASRModel.from_pretrained(args.model, map_location="cpu") + except Exception as e: # pragma: no cover + print(f"PARAKEET_MODEL_UNAVAILABLE: {e}", file=sys.stderr) + sys.exit(2) + + m.eval() + m.preprocessor.featurizer.dither = 0.0 + enc = m.encoder + if not hasattr(enc, "cache_aware_stream_step"): + print("PARAKEET_MODEL_UNAVAILABLE: not a streaming model", file=sys.stderr) + sys.exit(2) + enc.setup_streaming_params() + sc = enc.streaming_cfg + + # Non-batched greedy with carried partial_hypotheses (same as gen_stream_baseline). + import copy as _copy + _dcfg = _copy.deepcopy(m.cfg.decoding) + with open_dict(_dcfg): + _dcfg.strategy = "greedy" + _dcfg.compute_timestamps = False + _dcfg.preserve_alignments = False + if "greedy" in _dcfg: + _dcfg.greedy.preserve_frame_confidence = False + m.change_decoding_strategy(_dcfg) + + # Resolve / ids. + def tid(tok): + try: + r = m.tokenizer.tokens_to_ids([tok]) + return int(r[0]) if r else -1 + except Exception: + return -1 + eou_id, eob_id = tid(""), tid("") + specials = {x for x in (eou_id, eob_id) if x >= 0} + + # Build the two-utterance clip: [clip, gap silence, clip]. + wav, sr = sf.read(args.audio, dtype="float32", always_2d=False) + if wav.ndim > 1: + wav = wav.mean(axis=1) + if sr != 16000: + print(f"PARAKEET_BASELINE_BAD_AUDIO: expected 16k mono, got sr={sr}", + file=sys.stderr) + sys.exit(1) + gap = np.zeros(int(round(args.gap_secs * 16000)), dtype=np.float32) + clip = np.concatenate([wav, gap, wav]).astype(np.float32) + wav_t = torch.from_numpy(np.ascontiguousarray(clip)).float().unsqueeze(0) + len_t = torch.tensor([wav_t.shape[1]], dtype=torch.int64) + + with torch.no_grad(): + feats, feat_len = m.preprocessor(input_signal=wav_t, length=len_t) + n_mels = int(feats.shape[1]) + mel_np = feats.detach().cpu().float().numpy()[0] # [n_mels, T] + + # Canonical cache-aware streaming loop (== gen_stream_baseline.py) WITH + # reset-on-EOU (NeMo reset_state: blank hypothesis + fresh encoder cache). + sb = CacheAwareStreamingAudioBuffer(model=m, online_normalization=False, + pad_and_drop_preencoded=False) + sb.append_processed_signal(feats, stream_id=-1) + clc, clt, clcl = enc.get_initial_cache_state(batch_size=1) + previous_hypotheses = None + collected = [] + seg_prev_len = 0 + n_resets = 0 + for step_num, (chunk_audio, chunk_lengths) in enumerate(iter(sb)): + drop = sc.drop_extra_pre_encoded if step_num != 0 else 0 + keep_all = sb.is_buffer_empty() + with torch.no_grad(): + e, el, clc, clt, clcl = enc.cache_aware_stream_step( + processed_signal=chunk_audio, processed_signal_length=chunk_lengths, + cache_last_channel=clc, cache_last_time=clt, + cache_last_channel_len=clcl, + keep_all_outputs=keep_all, drop_extra_pre_encoded=drop) + with torch.inference_mode(): + dec = m.decoding.rnnt_decoder_predictions_tensor( + encoder_output=e, encoded_lengths=el, return_hypotheses=True, + partial_hypotheses=previous_hypotheses) + ys = dec[0].y_sequence + ys = ys.tolist() if isinstance(ys, torch.Tensor) else list(ys) + new = [int(t) for t in ys[seg_prev_len:]] + collected.extend(new) + if any(t in specials for t in new): + n_resets += 1 + previous_hypotheses = None # blank hypothesis (SOS) + seg_prev_len = 0 + clc, clt, clcl = enc.get_initial_cache_state(batch_size=1) # fresh cache + else: + previous_hypotheses = dec + seg_prev_len = len(ys) + + reset_ids = np.array(collected, dtype=np.int32) + eou_count = int(sum(1 for t in collected if t in specials)) + first_eou_index = next((i for i, t in enumerate(collected) if t in specials), -1) + + non_special = [t for t in collected if t not in specials] + text = m.tokenizer.ids_to_text(non_special) + print(f"reset-baseline: T={mel_np.shape[1]} tokens={len(collected)} " + f"eou/eob={eou_count} resets={n_resets} first_eou_index={first_eou_index}") + print(f" reset_token_ids: {collected}") + print(f" text (specials stripped): {text!r}") + if eou_count < 1: + print("PARAKEET_RESET_BASELINE_WARN: no / fired; the clip does " + "not exercise the reset path.", file=sys.stderr) + if first_eou_index < 0 or first_eou_index >= len(collected) - 1: + print("PARAKEET_RESET_BASELINE_WARN: no tokens AFTER the first EOU; the " + "regression (continue-after-EOU) is not exercised.", file=sys.stderr) + + w = gguf.GGUFWriter(args.output, "parakeet-stream-reset-baseline") + w.add_tensor("mel", np.ascontiguousarray(mel_np, dtype=np.float32)) + w.add_tensor("reset_token_ids", np.ascontiguousarray(reset_ids)) + w.add_uint32("reset.eou_count", eou_count) + w.add_int32("reset.eou_id", int(eou_id)) + w.add_int32("reset.eob_id", int(eob_id)) + w.add_uint32("reset.first_eou_index", int(first_eou_index)) + w.write_header_to_file() + w.write_kv_data_to_file() + w.write_tensors_to_file() + w.close() + print(f"wrote {args.output}: mel={mel_np.shape} reset_tokens={len(collected)}") + + +if __name__ == "__main__": + main() diff --git a/src/streaming.cpp b/src/streaming.cpp index 5e4b4b0..6f07c65 100644 --- a/src/streaming.cpp +++ b/src/streaming.cpp @@ -158,6 +158,28 @@ std::vector StreamingSession::feed_mel_chunk(const std::vector& // Regroup the accumulated tokens; words before the last (still-open) one are // final and become available to drain_words(). regroup_words(/*flush_all=*/false); + + // 4. End-of-utterance reset. The realtime EOU model is trained to emit + // (end of utterance) / (backchannel) and have the decoder START THE + // NEXT UTTERANCE FROM A FRESH STATE — exactly what NeMo's reference + // streaming driver does (examples/voice_agent .../nemo/streaming_asr.py + // NemoStreamingASRService.transcribe -> reset_state() when / + // appears in the chunk text). Without this the prediction net stays + // conditioned on the just-emitted and the joint scores blank on every + // subsequent frame, so the stream goes silent after the first utterance + // (issue #13). We reset the carried RNN-T decoder state — LSTM h/c to zero + // and last_token back to SOS — for the next chunk. We deliberately reset + // ONLY the decoder, not the StreamingEncoder cache: NeMo's reset_state also + // drops the encoder cache, but that was verified to be a no-op for the + // decoded tokens (decoder-only reset == NeMo's full reset_state byte-for- + // byte on multi-utterance clips), so the validated streaming-encoder path is + // left untouched. enc_frame_ keeps running so timestamps stay absolute + // in the clip, and state_.hyp keeps the full token record across utterances. + if (last_chunk_had_eou_) { + state_.state = pred_.zero_state(); + state_.last_token = -1; // SOS sentinel (nothing emitted yet) + state_.have_token = false; + } return emitted; } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index b83f76a..f40a34a 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -52,6 +52,7 @@ pk_add_test(test_transcribe_rnnt) pk_add_test(test_transcribe_eou) pk_add_test(test_transcribe_nemotron) pk_add_test(test_streaming_decode) +pk_add_test(test_streaming_eou_reset) pk_add_test(test_streaming_nemotron) pk_add_test(test_streaming_mel) pk_add_test(test_capi) @@ -68,7 +69,7 @@ set_tests_properties(test_model_loader test_mel test_mel_gpu test_subsampling te test_timestamps_tokens test_timestamps test_transcribe_batch_ts test_tokenizer test_transcribe test_transcribe_speech test_transcribe_tdt test_transcribe_0_6b test_transcribe_ctc test_transcribe_rnnt test_transcribe_eou test_transcribe_nemotron - test_streaming_decode test_streaming_nemotron test_streaming_mel test_capi test_capi_batch test_capi_stream + test_streaming_decode test_streaming_eou_reset test_streaming_nemotron test_streaming_mel test_capi test_capi_batch test_capi_stream test_capi_timestamps test_capi_batch_json PROPERTIES LABELS "model") # These tests read fixtures/baselines via paths relative to the project root. @@ -83,7 +84,7 @@ set_tests_properties(test_mel test_mel_gpu test_subsampling test_subsampling_bat test_tokenizer test_transcribe test_transcribe_speech test_transcribe_tdt test_transcribe_0_6b test_transcribe_ctc test_transcribe_rnnt test_transcribe_eou test_transcribe_nemotron - test_streaming_decode test_streaming_nemotron test_streaming_mel test_capi test_capi_batch test_capi_stream + test_streaming_decode test_streaming_eou_reset test_streaming_nemotron test_streaming_mel test_capi test_capi_batch test_capi_stream test_capi_timestamps test_capi_batch_json PROPERTIES WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}) diff --git a/tests/test_streaming_eou_reset.cpp b/tests/test_streaming_eou_reset.cpp new file mode 100644 index 0000000..5379211 --- /dev/null +++ b/tests/test_streaming_eou_reset.cpp @@ -0,0 +1,214 @@ +#include "streaming.hpp" +#include "model_loader.hpp" +#include "parity.hpp" +#include +#include +#include +#include +#include + +// MODEL: nvidia/parakeet_realtime_eou_120m-v1 (cache-aware streaming FastConformer) +// WORKING_DIRECTORY: the repo root (build/tests run from there). +// +// Multi-utterance streaming decode parity / regression for issue #13 +// ("streaming stops at first [EOU]"). The realtime EOU model emits / at +// the end of each utterance; NeMo's reference streaming driver +// (examples/voice_agent/.../nemo/streaming_asr.py NemoStreamingASRService -> +// reset_state on /) resets the decoder so the NEXT utterance decodes +// from a fresh state. pk::StreamingSession does the same (decoder reset on EOU in +// feed_mel_chunk). Without that reset the prediction net stays conditioned on +// and the joint scores blank forever, so the stream goes silent after the +// first utterance. +// +// The baseline (scripts/gen_stream_reset_baseline.py) builds a TWO-utterance clip +// (speech.wav + silence + speech.wav) so an fires MID-STREAM, then runs +// NeMo's cache-aware streaming loop WITH reset-on-EOU and stores the full token +// sequence (reset_token_ids) plus the index of the first . We drive +// pk::StreamingSession over the SAME chunk schedule as test_streaming_decode and +// assert our streamed token ids match NeMo's reset reference EXACTLY. +// +// Skips (77) unless PARAKEET_TEST_GGUF_EOU + PARAKEET_TEST_BASELINE_EOU_RESET set. +int main() { + const char* gguf = std::getenv("PARAKEET_TEST_GGUF_EOU"); + const char* base = std::getenv("PARAKEET_TEST_BASELINE_EOU_RESET"); + if (!gguf || !base) { + std::fprintf(stderr, + "test_streaming_eou_reset: PARAKEET_TEST_GGUF_EOU and/or " + "PARAKEET_TEST_BASELINE_EOU_RESET not set; skip (streaming EOU model is " + "a large download, not in CI)\n"); + return 77; + } + + pk::ModelLoader ml; + if (!ml.load(gguf)) { + std::fprintf(stderr, "[eou_reset] load failed %s\n", gguf); + return 1; + } + if (!ml.config().streaming.present) { + std::fprintf(stderr, "[eou_reset] model has no streaming config\n"); + return 1; + } + + // NeMo reset-on-EOU reference token ids + the index of the first . + std::vector ref_ids; + if (!pktest::load_baseline_i32(base, "reset_token_ids", ref_ids)) { + std::fprintf(stderr, "[eou_reset] reset_token_ids not found in %s\n", base); + return 1; + } + const uint32_t ref_eou_count = pktest::pktest_read_u32(base, "reset.eou_count"); + const uint32_t first_eou_idx = pktest::pktest_read_u32(base, "reset.first_eou_index"); + + // mel [n_mels, T] row-major (feat-major inner=T) from the baseline. + std::vector mel; + std::vector mshape; + if (!pktest::load_baseline(base, "mel", mel, mshape)) return 1; + if (mshape.size() != 2) { + std::fprintf(stderr, "[eou_reset] mel rank=%zu\n", mshape.size()); + return 1; + } + const int n_mels = (int)mshape[0]; + const int T = (int)mshape[1]; + + pk::StreamingSession sess(ml); + const int chunk0 = sess.chunk_size_first(); + const int chunk_main = sess.chunk_size(); + const int pre_cache = sess.pre_encode_cache_size(); + + auto window = [&](int lo, int hi) { + const int len = hi - lo; + std::vector w((size_t)n_mels * len); + for (int m = 0; m < n_mels; ++m) + for (int t = 0; t < len; ++t) + w[(size_t)m * len + t] = mel[(size_t)m * T + (lo + t)]; + return w; + }; + + // Same chunk schedule as test_streaming_decode / run_stream_over_pcm. + std::vector stream_ids; + int n_chunks = 0, buffer_idx = 0; + bool first = true; + int eou_event_chunks = 0; + while (buffer_idx < T) { + const int chunk_size = first ? chunk0 : chunk_main; + const int shift = chunk_size; + int chunk_hi = std::min(buffer_idx + chunk_size, T); + if (chunk_hi - buffer_idx <= 0) break; + int lo = first ? buffer_idx : std::max(0, buffer_idx - pre_cache); + std::vector win = window(lo, chunk_hi); + const int win_frames = chunk_hi - lo; + const bool is_last = (chunk_hi >= T); + + std::vector emitted = sess.feed_mel_chunk(win, win_frames, is_last); + stream_ids.insert(stream_ids.end(), emitted.begin(), emitted.end()); + if (sess.last_chunk_had_eou()) ++eou_event_chunks; + ++n_chunks; + buffer_idx += shift; + first = false; + } + + // Locate / ids from the tokenizer pieces (don't hardcode 1024/1025). + int eou_id = -1, eob_id = -1; + { + const auto& pieces = ml.config().tokenizer_pieces; + for (int i = 0; i < (int)pieces.size(); ++i) { + if (pieces[i] == "") eou_id = i; + else if (pieces[i] == "") eob_id = i; + } + } + auto is_special = [&](int32_t t) { return t == eou_id || t == eob_id; }; + auto strip_specials = [&](const std::vector& v) { + std::vector out; + for (int32_t t : v) if (!is_special(t)) out.push_back(t); + return out; + }; + + std::fprintf(stderr, + "[eou_reset] n_chunks=%d emitted %zu tokens, eou-firing chunks=%d " + "(ref tokens=%zu eou_count=%u first_eou_index=%u eou_id=%d)\n", + n_chunks, stream_ids.size(), eou_event_chunks, ref_ids.size(), + ref_eou_count, first_eou_idx, eou_id); + + // Premise guard: the reference MUST contain a MID-STREAM EOU with tokens after + // it, otherwise this clip does not exercise the regression at all. + if (ref_eou_count < 1 || first_eou_idx + 1 >= ref_ids.size()) { + std::fprintf(stderr, + "[eou_reset] FAIL: reference has no mid-stream EOU followed by more " + "tokens (eou_count=%u first_eou_index=%u of %zu) — the baseline does " + "not exercise continue-after-EOU; regenerate it.\n", + ref_eou_count, first_eou_idx, ref_ids.size()); + return 1; + } + + // The session's accumulated record must equal the per-chunk concat. + if (stream_ids != sess.tokens()) { + std::fprintf(stderr, + "[eou_reset] INTERNAL: per-chunk concat (%zu) != session.tokens() (%zu)\n", + stream_ids.size(), sess.tokens().size()); + return 1; + } + + // The core issue-#13 guarantee: non-EOU TRANSCRIPT tokens are emitted AFTER + // the first mid-stream . With the old (no-reset) decoder this set is empty + // (the stream goes silent after the first utterance); with the reset it holds + // the whole second utterance. + bool any_after_eou = false; + for (size_t i = first_eou_idx + 1; i < stream_ids.size(); ++i) + if (!is_special(stream_ids[i])) { any_after_eou = true; break; } + if (!any_after_eou) { + std::fprintf(stderr, + "[eou_reset] FAIL (issue #13 REGRESSION): no transcript tokens emitted " + "after the first — the decoder was not reset on end-of-utterance, " + "so the stream went silent after the first utterance.\n"); + return 1; + } + + // Transcript parity: the NON-special token sequence (what the user sees) must + // match NeMo's reset-on-EOU streaming decode EXACTLY. The / SPECIALS + // are compared separately below — the only legitimate difference there is a + // single trailing end-of-clip on the final utterance, the documented + // streaming-tail artifact (see test_streaming_decode): NeMo's offline-clip + // driver resets the encoder cache on EOU and its degraded final-tail frame + // drops that trailing , whereas our decoder-only reset keeps the encoder + // cache and may still emit it. That trailing special never changes the + // transcript, and the real-time NeMo service would emit it once more audio + // arrives, so it is explicitly out of scope here. + std::vector got_ns = strip_specials(stream_ids); + std::vector ref_ns = strip_specials(ref_ids); + if (got_ns != ref_ns) { + std::fprintf(stderr, + "[eou_reset] TRANSCRIPT MISMATCH vs NeMo reset reference\n" + " got non-special=%zu expect non-special=%zu\n", + got_ns.size(), ref_ns.size()); + size_t minlen = std::min(got_ns.size(), ref_ns.size()); + for (size_t i = 0; i < minlen; ++i) { + if (got_ns[i] != ref_ns[i]) { + std::fprintf(stderr, " first diff at index %zu: got=%d expect=%d\n", + i, got_ns[i], ref_ns[i]); + break; + } + } + return 1; + } + + // Full-stream check: identical to the reference up to an OPTIONAL trailing run + // of specials (the end-of-clip discussed above). Anything else — a + // missing/duplicated mid-stream token, or a special appearing mid-transcript — + // is a real decode bug and fails. + bool tail_only_specials = stream_ids.size() >= ref_ids.size() && + std::equal(ref_ids.begin(), ref_ids.end(), stream_ids.begin()); + if (tail_only_specials) + for (size_t i = ref_ids.size(); i < stream_ids.size(); ++i) + if (!is_special(stream_ids[i])) { tail_only_specials = false; break; } + + std::fprintf(stderr, + "[eou_reset] PASS — transcript (%zu non-special tokens) == NeMo cache-aware " + "streaming decode WITH reset-on-EOU, EXACT, across %d chunks; the second " + "utterance is fully recovered after the mid-stream (issue #13).%s\n", + ref_ns.size(), n_chunks, + stream_ids.size() == ref_ids.size() + ? "" + : (tail_only_specials + ? " (+1 trailing end-of-clip , streaming-tail artifact)" + : " (WARN: unexpected extra tokens)")); + return tail_only_specials || stream_ids == ref_ids ? 0 : 1; +}