Skip to content

[VoiceLive] Harden async samples for GA: fix threading races and reactor stream issues#49036

Open
xitzhang wants to merge 2 commits intomainfrom
xitzhang/fixsamples
Open

[VoiceLive] Harden async samples for GA: fix threading races and reactor stream issues#49036
xitzhang wants to merge 2 commits intomainfrom
xitzhang/fixsamples

Conversation

@xitzhang
Copy link
Copy Markdown
Member

@xitzhang xitzhang commented May 4, 2026

Description

Addresses reviewer feedback before GA:

"The async samples have some issues that should be addressed before GA. Customers using these samples in production can run into various threading issues and can also lead to certain events not getting processed."

All SDK Contribution checklist:

  • The pull request does not introduce [breaking changes]
  • CHANGELOG is updated for new features, bug fixes or other significant changes.
  • I have read the contribution guidelines.

General Guidelines and Best Practices

  • Title of the pull request is clear and informative.
  • There are a small number of commits, each of which have an informative message. This means that previously merged commits do not appear in the history of the PR. For more information on cleaning up the commits in your PR, see this page.

Testing Guidelines

  • Pull request includes test coverage for the included changes.

…ssues

- Mark cross-thread reference fields volatile to fix JMM visibility races between reactor event-handler thread and JVM shutdown-hook thread (session, audioProcessor, microphone/speaker/inputLine/outputLine, capture/playback threads)

- Surface dropped audio packets when bounded playback queue is full (LinkedBlockingQueue.offer returns false)

- Replace detached receiveEvents().subscribe() with chained thenMany() so a single reactor stream processes events (avoids events being silently dropped by competing subscribers)

- Replace poll()+Thread.sleep busy-wait with blocking take() in audio playback loops

- Convert fire-and-forget sendInputAudio/sendEvent .subscribe() to subscribe(onNext, onError) so errors surface

- Replace doOnError(...).subscribe() with subscribe(onNext, onError) (doOnError is a side-effect, not a terminal handler)

- Bound previously-unbounded LinkedBlockingQueue<>() to capacity 1000 to prevent OOM with slow consumers

- GlobalTracingSample: move countDown() out of doFinally into subscribe(onComplete, onError)

- AudioPlaybackSample: fix broken thenMany chain using take(Duration.ofSeconds(10))

- Switch all production samples to DefaultAzureCredential (KeyCredential shown as alternative comment)

- MCPSample: fix runMCPSample signature mismatch left over from credential migration
Copilot AI review requested due to automatic review settings May 4, 2026 16:16
@xitzhang xitzhang requested a review from srnagar May 4, 2026 16:17
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR hardens the VoiceLive Java async samples ahead of GA by addressing common race conditions in audio processing and improving reactive subscriptions/error handling, while also shifting sample authentication guidance toward DefaultAzureCredential (with API key as an option).

Changes:

  • Update samples to use DefaultAzureCredentialBuilder by default (API key becomes optional / flag-driven where applicable).
  • Reduce concurrency hazards in audio capture/playback by adding volatile fields and bounding playback queues with drop warnings.
  • Improve reactive subscribe() usage by providing explicit error consumers and restructuring some event pipelines.

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/VoiceAssistantSample.java Switches auth default to DefaultAzureCredential; adds volatile audio lines and bounded playback queue; updates CLI/env var guidance (needs doc fixes).
sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/telemetry/GlobalTracingSample.java Uses DefaultAzureCredential by default and updates env var requirements accordingly.
sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/MicrophoneInputSample.java Uses DefaultAzureCredential by default and updates env var requirements accordingly.
sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/MCPSample.java Uses DefaultAzureCredential; bounds playback queue; adds subscribe error handlers (but introduces a blocking playback-thread shutdown issue).
sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/FunctionCallingSample.java Uses DefaultAzureCredential; bounds playback queue; adds subscribe error handlers (but introduces a blocking playback-thread shutdown issue).
sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/BasicVoiceConversationSample.java Uses DefaultAzureCredential by default and updates env var requirements accordingly.
sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/AudioPlaybackSample.java Uses DefaultAzureCredential; bounds audio queue; changes receiveEvents timing and adds a receiveSink-related comment (needs adjustment to avoid relying on internals).
sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/AgentV2Sample.java Makes session/audio processor references volatile; improves subscribe error handling; bounds playback queue and adds drop warnings.
Comments suppressed due to low confidence (2)

sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/MCPSample.java:554

  • startPlayback() now blocks on playbackQueue.take(), but cleanup() only sets isPlaying=false and closes the speaker. If the queue is empty, the playback thread can block indefinitely and won’t observe isPlaying=false, so playback may not stop cleanly. Consider sending a shutdown marker into the queue and/or keeping a thread reference and interrupting it (and/or mark the thread daemon).
                // Start playback thread
                new Thread(() -> {
                    while (isPlaying.get()) {
                        try {
                            byte[] audioData = playbackQueue.take();
                            speaker.write(audioData, 0, audioData.length);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }, "AudioPlayback").start();

sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/FunctionCallingSample.java:539

  • startPlayback() now blocks on playbackQueue.take(), but cleanup() only flips isPlaying=false and clears the queue. If the queue is empty at shutdown, the playback thread can block indefinitely and won’t exit. Consider enqueuing a sentinel/shutdown item during cleanup and/or keeping a thread reference and interrupting it (and/or mark the thread daemon).
                // Start playback thread
                new Thread(() -> {
                    while (isPlaying.get()) {
                        try {
                            byte[] audioData = playbackQueue.take();
                            speaker.write(audioData, 0, audioData.length);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }, "AudioPlayback").start();

- Add Sinks.One eventSubscribed receive-first barrier so sendEvent(sessionConfig) waits for the hot multicast receiveEvents() subscription, preventing dropped events.
- Compose receive + send into a single Flux.merge(...).then() lifecycle (notably AgentV2Sample) instead of detached subscribe() calls.
- Mark audio capture/playback worker threads as daemons; replace busy poll() with blocking take()/read(); interrupt threads during cleanup so JVM shutdown completes.
- Use volatile / AtomicReference for cross-thread audio line and thread handles to fix JMM visibility races.
- Replace unbounded queues with bounded LinkedBlockingQueue(1000); offer() overflow now logs a warning instead of silently dropping audio.
- Replace doOnError().subscribe() and bare subscribe() patterns with subscribe(onNext, onError) so errors are not swallowed.
- Fix AuthenticationMethodsSample unreachable completion message and MCPSample.runMCPSample signature.
- Enhance Javadoc on all 9 runnable samples (when to use / what happens at runtime).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants