Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions .githooks/pre-commit
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
# Runs only the checks relevant to what's staged:
# - Go files -> make lint + make test-coverage-check
# - core/http/react-ui -> make test-ui-coverage-check (Playwright e2e + gate)
# A commit touching neither is skipped entirely (docs/YAML/etc. can't change
# lint findings, Go coverage, or the UI).
# - realtime state machines / specs -> make test-realtime-conformance
# (respcoord/**, turncoord/**, or formal-verification/** -- a pure .fizz
# spec edit must still re-verify the design, detected separately from Go)
# A commit touching none of these is skipped entirely (other docs/YAML can't
# change lint findings, Go coverage, the UI, or the realtime conformance gate).
#
# To bypass for a single commit (e.g. a WIP checkpoint): git commit --no-verify
set -eu
Expand All @@ -20,11 +23,13 @@ staged="$(git diff --cached --name-only --diff-filter=ACMRD)"

go_changed=0
ui_changed=0
rt_changed=0
if echo "$staged" | grep -qE '\.go$'; then go_changed=1; fi
if echo "$staged" | grep -qE '^core/http/react-ui/'; then ui_changed=1; fi
if echo "$staged" | grep -qE '^(core/http/endpoints/openai/(coordinator|respcoord|turncoord|conncoord|compactcoord|ttscoord)/|formal-verification/)'; then rt_changed=1; fi

if [ "$go_changed" -eq 0 ] && [ "$ui_changed" -eq 0 ]; then
echo "pre-commit: no Go or React UI changes staged — skipping."
if [ "$go_changed" -eq 0 ] && [ "$ui_changed" -eq 0 ] && [ "$rt_changed" -eq 0 ]; then
echo "pre-commit: no Go, React UI, or realtime-spec changes staged — skipping."
exit 0
fi

Expand Down Expand Up @@ -57,4 +62,11 @@ if [ "$ui_changed" -eq 1 ]; then
make test-ui-coverage-check
fi

if [ "$rt_changed" -eq 1 ]; then
echo "pre-commit ▶ realtime state-machine conformance (make test-realtime-conformance) —"
echo " Go transition/rapid tests under -race + FizzBee model check of the"
echo " authoritative specs. Fail-closed: needs FizzBee (make install-fizzbee)."
make test-realtime-conformance
fi

echo "pre-commit ✓ all relevant checks passed"
69 changes: 69 additions & 0 deletions .github/workflows/realtime-conformance.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
---
name: 'realtime-conformance'

# Verifies the realtime state-machine implementations conform to their formal
# designs (docs/design/realtime-state-machines.md, formal-verification/). BOTH
# layers are enforced and the gate is fail-closed: the Go conformance layer
# (respcoord + turncoord transition/rapid tests under -race) AND the FizzBee model check of
# the authoritative specs. FizzBee is pinned + checksum-verified
# (formal-verification/fizzbee.sha256), so a failed install fails the job rather
# than silently skipping verification.

on:
pull_request:
paths:
- 'core/http/endpoints/openai/coordinator/**'
- 'core/http/endpoints/openai/respcoord/**'
- 'core/http/endpoints/openai/turncoord/**'
- 'core/http/endpoints/openai/conncoord/**'
- 'core/http/endpoints/openai/compactcoord/**'
- 'core/http/endpoints/openai/ttscoord/**'
- 'formal-verification/**'
- 'scripts/realtime-conformance.sh'
- 'scripts/install-fizzbee.sh'
- '.github/workflows/realtime-conformance.yml'
push:
branches:
- master
paths:
- 'core/http/endpoints/openai/coordinator/**'
- 'core/http/endpoints/openai/respcoord/**'
- 'core/http/endpoints/openai/turncoord/**'
- 'core/http/endpoints/openai/conncoord/**'
- 'core/http/endpoints/openai/compactcoord/**'
- 'core/http/endpoints/openai/ttscoord/**'
- 'formal-verification/**'
- 'scripts/realtime-conformance.sh'

concurrency:
group: realtime-conformance-${{ github.event.pull_request.number || github.sha }}-${{ github.repository }}
cancel-in-progress: ${{ github.event_name == 'pull_request' }}

jobs:
conformance:
runs-on: ubuntu-latest
strategy:
matrix:
go-version: ['1.26.x']
steps:
- name: Clone
uses: actions/checkout@v7
- name: Setup Go ${{ matrix.go-version }}
uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go-version }}
cache: false
- name: Cache FizzBee
uses: actions/cache@v4
with:
path: .tools/fizzbee
key: fizzbee-v0.5.2-${{ runner.os }}-${{ hashFiles('formal-verification/fizzbee.sha256') }}
- name: Install FizzBee (pinned, checksum-verified)
# No `|| true`: a failed/forged download must fail the job, not silently
# drop the design verification. install-fizzbee.sh is a no-op if the
# cached binary is already present and valid.
run: ./scripts/install-fizzbee.sh
- name: Run conformance gate (fail-closed)
# No skip env: both the Go conformance and the FizzBee model check are
# required. The gate auto-detects .tools/fizzbee/fizz.
run: make test-realtime-conformance
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,12 @@ core/http/react-ui/test-results/

# Local Apple signing material (never commit)
.certs/

# Pinned dev tools (e.g. FizzBee for the realtime-conformance gate)
.tools/

# FizzBee model-check artifacts: the parser emits <spec>.json next to each
# .fizz and the checker writes run dirs under out/. Both are regenerated by
# the realtime-conformance gate; only the .fizz sources are authoritative.
formal-verification/*.json
formal-verification/out/
14 changes: 13 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,18 @@ test-realtime: build-mock-backend
@echo 'Running realtime e2e tests (mock backend)'
$(GOCMD) run github.com/onsi/ginkgo/v2/ginkgo --label-filter="Realtime && !real-models" --flake-attempts $(TEST_FLAKES) -v -r ./tests/e2e

# Verify the realtime state-machine implementations conform to their formal
# designs (Go transition/rapid tests under -race + FizzBee model check of the
# authoritative specs). See docs/design/realtime-state-machines.md (Part 6) and
# docs/design/specs/README.md.
test-realtime-conformance:
GOCMD=$(GOCMD) ./scripts/realtime-conformance.sh

# Install the pinned, checksum-verified FizzBee model checker (into .tools/,
# gitignored) used by test-realtime-conformance. Idempotent; no-op if present.
install-fizzbee:
./scripts/install-fizzbee.sh

# Container-based real-model realtime testing. Build env vars / pipeline
# definition kept here so test-realtime-models-docker can drive a fully wired
# pipeline (VAD + STT + LLM + TTS) from inside a containerised runner.
Expand Down Expand Up @@ -1027,7 +1039,7 @@ test-extra-backend-whisper-transcription: docker-build-whisper
## is reachable.
test-extra-backend-parakeet-cpp-transcription: docker-build-parakeet-cpp
BACKEND_IMAGE=local-ai-backend:parakeet-cpp \
BACKEND_TEST_MODEL_URL=https://huggingface.co/mudler/parakeet-cpp-gguf/resolve/main/tdt_ctc-110m-f16.gguf \
BACKEND_TEST_MODEL_URL=https://huggingface.co/mudler/parakeet-cpp-gguf/resolve/main/realtime_eou_120m-v1-f16.gguf \
BACKEND_TEST_AUDIO_URL=https://github.com/ggml-org/whisper.cpp/raw/master/samples/jfk.wav \
BACKEND_TEST_CAPS=health,load,transcription \
$(MAKE) test-extra-backend
Expand Down
44 changes: 44 additions & 0 deletions backend/backend.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@ service Backend {
rpc GenerateVideo(GenerateVideoRequest) returns (Result) {}
rpc AudioTranscription(TranscriptRequest) returns (TranscriptResult) {}
rpc AudioTranscriptionStream(TranscriptRequest) returns (stream TranscriptStreamResponse) {}
// AudioTranscriptionLive is the bidirectional live-microphone ASR RPC. The
// first message MUST carry a Config; subsequent messages carry Audio frames
// (mono float PCM at config.sample_rate, 16 kHz default). After a
// successful open the backend replies with a single ready ack
// (TranscriptLiveResponse{ready:true}); backends or models without
// cache-aware streaming support return UNIMPLEMENTED instead. Newly
// finalized text streams back as deltas; eou=true marks the model's
// end-of-utterance token. One stream spans many utterances (the decoder
// resets itself after each EOU). Closing the send side finalizes: the
// backend flushes the decoder tail and emits a terminal message carrying
// final_result. A second Config mid-stream resets the decode session.
rpc AudioTranscriptionLive(stream TranscriptLiveRequest) returns (stream TranscriptLiveResponse) {}

@mudler mudler Jun 24, 2026

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if doing tihs uplift, I think would make sense to then deprecate rpc AudioTranscriptionStream(TranscriptRequest) returns (stream TranscriptStreamResponse) {} above, and since at it re-wire the backends to use AudioTranscriptionLive directly. Mainly to avoid code/logic dup split across where it could be unified in a single place

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most transcription backends can't really stream input. The Stream RPC only really means streaming the output and on live both are streamed. So to only use the live RPC you have to buffer input on each backend that doesn't support input streaming. Which would also mean that backends that don't really support bidirectional streaming would implement the RPC instead of returning unsupported which IMO is bad for UX.

I was very confused by what was going on in this code though so have made some changes. Including adding explicit state machines that are formally verified...

rpc TTS(TTSRequest) returns (Result) {}
rpc TTSStream(TTSRequest) returns (stream Reply) {}
rpc SoundGeneration(SoundGenerationRequest) returns (Result) {}
Expand Down Expand Up @@ -479,13 +491,45 @@ message TranscriptResult {
string text = 2;
string language = 3;
float duration = 4;
// True when the decode ended on the model's end-of-utterance special token
// (<EOU>/<EOB>, emitted by cache-aware streaming models such as
// parakeet_realtime_eou_120m-v1). The marker itself is stripped from text.
bool eou = 5;
}

message TranscriptStreamResponse {
string delta = 1;
TranscriptResult final_result = 2;
}

// === AudioTranscriptionLive messages =====================================

message TranscriptLiveRequest {
oneof payload {
TranscriptLiveConfig config = 1;
TranscriptLiveAudio audio = 2;
}
}

message TranscriptLiveConfig {
string language = 1; // "" => model default
int32 sample_rate = 2; // 0 => 16000; backends may reject others
map<string, string> params = 3; // backend-specific tuning
}

message TranscriptLiveAudio {
repeated float pcm = 1; // mono PCM in [-1,1] at config.sample_rate
}

message TranscriptLiveResponse {
bool ready = 1; // open ack: sent once, before any delta
string delta = 2; // newly-finalized text since previous response
bool eou = 3; // <EOU> fired during this feed (the user yielded the turn)
repeated TranscriptWord words = 4; // words finalized by this feed (stream-relative ns)
TranscriptResult final_result = 5; // terminal message only, after the send side closes
bool eob = 6; // <EOB> fired: a backchannel ("uh-huh") ended — NOT a turn boundary
}

message TranscriptWord {
int64 start = 1;
int64 end = 2;
Expand Down
81 changes: 81 additions & 0 deletions backend/go/parakeet-cpp/boundary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package main

// utteranceBoundary is the single definition of a small state machine that was
// previously open-coded three times — as a bare `finalEou` bool with an ad-hoc
// toggle — in the live feed (live.go), the file-stream text path, and the
// file-stream JSON path (goparakeetcpp.go).
//
// It answers one running question: does the decode currently rest on an
// end-of-utterance boundary? That is the value a closing FinalResult reports as
// .Eou and the realtime turn detector treats as a commit point.
//
// parakeet auto-resets its decoder after every <EOU>/<EOB>, so one streaming
// session is a sequence of utterances and this is a LATCH, not a monotonic
// flag: it closes on an <EOU> and reopens as soon as the next utterance starts.
// (Contrast the realtime API's per-turn `eouSeen`, which only ever goes
// false->true because each turn gets a fresh stream. Here the stream outlives
// the turn, so the boundary status must be able to reopen.)
//
// The only transitions, over the events one streamFeedResult carries — an
// <EOU>, an <EOB> (backchannel), or plain speech output (text and/or words):
//
// <EOU>
// open ───────────► closed
// ▲ ▲ │ │ │
// │ └─┘ <EOB>|speech │ │ <EOU>
// │ (stay open) │ └─┘ (stay closed)
// └──────────────────┘
// <EOB>|speech
//
// open = NOT on an utterance boundary: mid-utterance, the last boundary was
// a backchannel <EOB>, or the stream just began (the initial state).
// closed = the last meaningful event was an <EOU> with no later speech: a real
// turn boundary.
//
// A feed that carries nothing (no eou/eob/text/words — e.g. a finalize flush
// that produced no tail) is a no-op and leaves the state unchanged, matching
// the legacy "leave finalEou as it was" behaviour.
//
// The state carries no data, so it is modelled as a two-valued type (a named
// bool) rather than an int enum: every inhabitant is legal, so illegal states
// are unrepresentable — the payload-free analog of the sealed sum types the
// realtime machines use (those need interfaces because their states carry data,
// e.g. Active{ID}, where "Active with no ID" is the illegal combination a scalar
// cannot even express).
type utteranceBoundary bool

const (
// boundaryOpen is the zero value (false), so a fresh decode starts open —
// exactly the legacy `var finalEou bool` (false) initial condition.
boundaryOpen utteranceBoundary = false
boundaryClosed utteranceBoundary = true
)

// observe folds one decode increment into the latch and returns the new state.
//
// <EOU> takes priority when a single feed carries both an <EOU> and speech
// (e.g. {"text":"hello","eou":1}): the utterance both produced that text AND
// ended, so the decode rests on the boundary. This matches the legacy
// eou-checked-first ordering at every call site.
func (b utteranceBoundary) observe(r streamFeedResult) utteranceBoundary {
switch {
case r.Eou:
return boundaryClosed
case r.Eob || r.Delta != "" || len(r.Words) > 0:
return boundaryOpen
default:
return b
}
}

// ended reports whether the decode currently rests on an end-of-utterance
// boundary (a real <EOU>, not a backchannel <EOB>). This is what a closing
// FinalResult carries as .Eou.
func (b utteranceBoundary) ended() bool { return b == boundaryClosed }

func (b utteranceBoundary) String() string {
if b == boundaryClosed {
return "closed"
}
return "open"
}
92 changes: 92 additions & 0 deletions backend/go/parakeet-cpp/boundary_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package main

import (
"math/rand/v2"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("utteranceBoundary (decode end-of-utterance latch)", func() {
It("starts open: a fresh decode is not on a boundary", func() {
var b utteranceBoundary
Expect(b).To(Equal(boundaryOpen))
Expect(b.ended()).To(BeFalse())
})

DescribeTable("single feed transition from the open state",
func(r streamFeedResult, wantEnded bool) {
Expect(boundaryOpen.observe(r).ended()).To(Equal(wantEnded))
},
Entry("<EOU> closes it", streamFeedResult{Eou: true}, true),
Entry("<EOU> with text closes it (eou wins)", streamFeedResult{Delta: "hi", Eou: true}, true),
Entry("<EOB> stays open (backchannel is not a turn boundary)", streamFeedResult{Eob: true}, false),
Entry("plain text stays open", streamFeedResult{Delta: "hello"}, false),
Entry("words-only stays open", streamFeedResult{Words: []transcriptWord{{W: "x"}}}, false),
Entry("empty feed is a no-op (stays open)", streamFeedResult{}, false),
)

DescribeTable("single feed transition from the closed state",
func(r streamFeedResult, wantEnded bool) {
Expect(boundaryClosed.observe(r).ended()).To(Equal(wantEnded))
},
Entry("another <EOU> stays closed", streamFeedResult{Eou: true}, true),
Entry("trailing speech reopens it", streamFeedResult{Delta: "and more"}, false),
Entry("words reopen it", streamFeedResult{Words: []transcriptWord{{W: "x"}}}, false),
Entry("a backchannel <EOB> reopens it", streamFeedResult{Eob: true}, false),
Entry("empty feed is a no-op (stays closed)", streamFeedResult{}, true),
)

It("is a latch: <EOU> then trailing speech reopens, then <EOU> closes again", func() {
b := boundaryOpen
b = b.observe(streamFeedResult{Delta: "turn one", Eou: true})
Expect(b.ended()).To(BeTrue())
b = b.observe(streamFeedResult{Delta: " and more"})
Expect(b.ended()).To(BeFalse(), "trailing speech without an EOU is an open utterance")
b = b.observe(streamFeedResult{Eou: true})
Expect(b.ended()).To(BeTrue())
})

It("treats a backchannel before a real EOU correctly", func() {
b := boundaryOpen
b = b.observe(streamFeedResult{Delta: "uh huh", Eob: true})
Expect(b.ended()).To(BeFalse(), "a backchannel must not masquerade as a turn boundary")
b = b.observe(streamFeedResult{Delta: "done", Eou: true})
Expect(b.ended()).To(BeTrue())
})

It("matches the reference fold over seeded random feed sequences", func() {
// The invariant: after any sequence of feeds, ended() is true iff the
// last feed that carried ANY event was an <EOU>. <EOU> takes priority
// when a feed carries both an EOU and speech; empty feeds are ignored.
for seed := uint64(1); seed <= 200; seed++ {
rng := rand.New(rand.NewPCG(seed, seed*2654435761))
b := boundaryOpen
lastWasEou := false // reference: did the last meaningful feed end on EOU?
steps := rng.IntN(30)
for i := 0; i < steps; i++ {
var r streamFeedResult
switch rng.IntN(5) {
case 0:
r = streamFeedResult{Eou: true}
case 1:
r = streamFeedResult{Eob: true}
case 2:
r = streamFeedResult{Delta: "w"}
case 3:
r = streamFeedResult{Delta: "w", Eou: true} // eou + speech, eou wins
case 4:
r = streamFeedResult{} // empty: no-op
}
b = b.observe(r)
if r.Eou {
lastWasEou = true
} else if r.Eob || r.Delta != "" || len(r.Words) > 0 {
lastWasEou = false
}
}
Expect(b.ended()).To(Equal(lastWasEou),
"seed %d: latch disagreed with the reference fold", seed)
}
})
})
Loading
Loading