Add graceful shutdown drain to ServiceBusProcessorClient#48192
Add graceful shutdown drain to ServiceBusProcessorClient#48192EldertGrootenboer wants to merge 33 commits intoAzure:mainfrom
Conversation
When ServiceBusProcessorClient.close() is called while message handlers are still executing, the receiver was disposed immediately, causing in-flight handlers to fail with IllegalStateException. Add drain-before-dispose logic using an AtomicInteger handler counter and Object monitor wait/notify to all processor shutdown paths: - MessagePump (V2 non-session) - ServiceBusProcessor.RollingMessagePump (V2 non-session lifecycle) - SessionsMessagePump.RollingSessionReceiver (V2 session) - ServiceBusProcessorClient V1 close path The drain executes before subscription cancellation/disposal, with a configurable timeout (default 30s) to prevent indefinite blocking. Includes 3 regression tests in ServiceBusProcessorGracefulShutdownTest.
There was a problem hiding this comment.
Pull request overview
This PR adds “drain before dispose” behavior to Service Bus processor shutdown paths to avoid failing in-flight message handlers (and their settlement calls) with IllegalStateException when the underlying receiver is disposed during close().
Changes:
- Track in-flight handler execution and block shutdown briefly to allow handlers to complete (with a 30s timeout).
- Apply draining to V2 non-session (
MessagePump/RollingMessagePump), V2 session (RollingSessionReceiver.terminate()), and V1 (ServiceBusProcessorClient.close()). - Add regression tests covering V2 non-session and V1 shutdown draining plus a drain-timeout test.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java | Adds handler counting + drainHandlers(Duration) used to block shutdown until in-flight handlers finish or timeout. |
| sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessor.java | Wires draining into RollingMessagePump.dispose() before disposing the subscription. |
| sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java | Adds per-session handler counting + drain during termination before disposing the worker scheduler. |
| sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java | Adds V1 handler counting + drainV1Handlers(Duration) invoked during close() before subscription cancellation. |
| sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java | Adds new unit tests validating drain behavior for V2 non-session, V1, and drain timeout. |
Comments suppressed due to low confidence (1)
sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java:458
SessionsMessagePump.RollingSessionReceivernow includes new drain-before-dispose behavior, but the added logic isn’t covered by the new regression tests. There are already isolated unit tests forSessionsMessagePumpbehavior; it should be possible to extend them to verify that termination waits for an in-flight handler (or respects the timeout) for the session path as well.
Please add a unit test that exercises session termination while a handler is blocked, to prevent regressions in this new shutdown behavior.
// Drain in-flight message handlers BEFORE disposing the worker scheduler.
// Disposing the scheduler interrupts handler threads (via ScheduledExecutorService.shutdownNow()).
// Draining first ensures handlers can complete message settlement before threads are interrupted.
// See https://github.com/Azure/azure-sdk-for-java/issues/45716
drainHandlers(DRAIN_TIMEOUT);
workerScheduler.dispose();
- Add ThreadLocal<Boolean> flag to detect when drainHandlers is called from within a message handler (e.g., user calls close() inside processMessage callback) - Guard all three drain paths: MessagePump.drainHandlers(), ServiceBusProcessorClient.drainV1Handlers(), and SessionsMessagePump.RollingSessionReceiver.drainHandlers() - When re-entrant call detected, skip drain with warning log and return immediately to avoid self-deadlock - Add v2DrainFromWithinHandlerShouldNotDeadlock test verifying the guard prevents deadlock when drain is called from handler thread
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (3)
sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:161
- This test uses a fixed
Thread.sleep(200)to “give dispose a moment to start”. Fixed sleeps are prone to flakiness on slow/loaded CI agents (either too short or unnecessarily long). Prefer a synchronization point that directly observes the expected state (e.g.,assertFalse(disposeDone.await(...)), a latch signaled right before/after entering drain, or Mockito’s timed verification APIs) so the test doesn’t depend on timing heuristics.
// Give dispose a moment to start; it should be blocked in drainHandlers().
Thread.sleep(200);
// Verify: client has NOT been closed yet (handler is still running, drain is blocking dispose).
verify(client, never()).close();
assertFalse(handlerCompleted.get(), "Handler should still be in-flight");
sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:256
- This test uses a fixed
Thread.sleep(200)to “give close a moment to start”. As written, it can be flaky under variable scheduling/CPU contention. Use an explicit synchronization condition (e.g.,assertFalse(closeDone.await(...))or a latch that confirms the close thread is blocked in the drain) instead of a fixed sleep.
// Give close a moment to start; it should be blocked in drainV1Handlers().
Thread.sleep(200);
// Verify: client has NOT been closed yet (handler is still running, drain is blocking close).
verify(asyncClient, never()).close();
assertFalse(handlerCompleted.get(), "Handler should still be in-flight");
sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:54
- The class-level "Coverage Matrix" JavaDoc is now out of sync with the actual tests in this class: it lists only the V2/V1/timeout scenarios, but the class also includes a re-entrant drain regression test (
v2DrainFromWithinHandlerShouldNotDeadlock). Please update the matrix to reflect all covered scenarios so readers don’t miss this important case.
* <h3>Coverage Matrix</h3>
* <ul>
* <li><b>V2 Non-Session</b> — {@link #v2CloseShouldWaitForInFlightHandlerBeforeClosingClient()}:
* Tests drain in {@code RollingMessagePump.dispose()} → {@code MessagePump.drainHandlers()}</li>
* <li><b>V1 Non-Session</b> — {@link #v1CloseShouldWaitForInFlightHandlerBeforeClosingClient()}:
* Tests drain in {@code ServiceBusProcessorClient.close()} → {@code drainV1Handlers()}</li>
* <li><b>Drain Timeout</b> — {@link #v2DrainShouldRespectTimeout()}:
* Tests {@code MessagePump.drainHandlers()} timeout behavior directly</li>
After drainHandlers() returns but before the Flux subscription is disposed, flatMap can dispatch a new handler that attempts settlement on a closing client. Add a volatile boolean closing flag to MessagePump and SessionsMessagePump.RollingSessionReceiver, set at the start of drainHandlers() and checked at the top of handleMessage(). Handlers that see the flag skip processing and return immediately. V1 path is unaffected (isRunning already gates subscription.request). New test: v2ClosingFlagPreventsNewHandlersAfterDrainStarts.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated no new comments.
Comments suppressed due to low confidence (3)
sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java:176
isHandlerThreadis an instanceThreadLocal. Setting it back toFALSEleaves an entry in the thread’sThreadLocalMapfor eachMessagePumpinstance that ever ran on that pooled thread. Since pumps can be recreated (rolling/retry) and worker threads are long-lived, this can accumulate stale entries and increase memory usage over time. Prefer callingisHandlerThread.remove()in thefinallyblock instead ofset(Boolean.FALSE)to ensure the entry is cleared.
} finally {
isHandlerThread.set(Boolean.FALSE);
if (activeHandlerCount.decrementAndGet() == 0) {
synchronized (drainLock) {
drainLock.notifyAll();
sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java:503
isV1HandlerThreadis an instanceThreadLocal. Resetting it withset(Boolean.FALSE)keeps a per-processor entry in the thread’sThreadLocalMap, which can accumulate on pooled threads over the lifetime of the application. PreferisV1HandlerThread.remove()in thefinallyblock to fully clear the entry after each callback.
} finally {
isV1HandlerThread.set(Boolean.FALSE);
if (activeV1HandlerCount.decrementAndGet() == 0) {
synchronized (v1DrainLock) {
v1DrainLock.notifyAll();
sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java:571
isHandlerThreadis an instanceThreadLocal. Setting it toFALSEleaves an entry behind in the thread’sThreadLocalMap, and since session pumps/receivers can be recreated while using long-lived pooled threads, this can accumulate stale entries over time. PreferisHandlerThread.remove()in thefinallyblock to clear theThreadLocalafter each handler execution.
} finally {
isHandlerThread.set(Boolean.FALSE);
if (activeHandlerCount.decrementAndGet() == 0) {
synchronized (drainLock) {
drainLock.notifyAll();
}
On pooled threads (Reactor boundedElastic), set(FALSE) leaves a stale entry in the ThreadLocalMap after the pump is GC'd. remove() clears the entry immediately, following Java best practice for ThreadLocal cleanup on long-lived worker threads.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (4)
sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:157
- These
Thread.sleep(...)calls make the tests timing-dependent and can be flaky under load/slow CI. Prefer synchronizing on explicit signals (additional latches) or Mockito’s time-based verification (verify(mock, after(ms).never())...,timeout(ms)with zero invocations) so the test waits deterministically without assuming scheduler timing.
Thread.sleep(200);
sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:251
- These
Thread.sleep(...)calls make the tests timing-dependent and can be flaky under load/slow CI. Prefer synchronizing on explicit signals (additional latches) or Mockito’s time-based verification (verify(mock, after(ms).never())...,timeout(ms)with zero invocations) so the test waits deterministically without assuming scheduler timing.
Thread.sleep(200);
sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:481
- These
Thread.sleep(...)calls make the tests timing-dependent and can be flaky under load/slow CI. Prefer synchronizing on explicit signals (additional latches) or Mockito’s time-based verification (verify(mock, after(ms).never())...,timeout(ms)with zero invocations) so the test waits deterministically without assuming scheduler timing.
Thread.sleep(500);
sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java:483
- The drain implementation (counter + lock + re-entrancy guard + timeout loop) is duplicated across
MessagePump,SessionsMessagePump, and V1 (ServiceBusProcessorClient). To reduce divergence risk and make future fixes (e.g., re-entrancy behavior) consistent, consider extracting a small shared helper (package-private) that encapsulates the counter/lock/wait-notify pattern and exposes a clear “drain result”.
private void drainHandlers(Duration timeout) {
closing = true;
if (isHandlerThread.get()) {
// Re-entrant call from within a session message handler (e.g., user called close() inside processMessage).
// Waiting here would self-deadlock because this thread's handler incremented the counter and
// cannot decrement it until it returns. Skip the drain — remaining handlers (if any) will
// complete naturally after this handler returns.
logger.atWarning()
.log("drainHandlers called from within a session message handler (re-entrant). "
+ "Skipping drain to avoid self-deadlock.");
return;
}
When a handler calls close() re-entrantly, the drain now waits for OTHER concurrent handlers to complete (threshold=1) before cancelling subscriptions and closing the client. Previously the drain returned immediately, which could interrupt concurrent handlers mid-settlement. Applied consistently across V1 (ServiceBusProcessorClient), V2 (MessagePump), and sessions (SessionsMessagePump). Notification threshold updated from == 0 to <= 1 so the re-entrant waiter gets notified.
- Document drainTimeout under spring-cloud-azure-autoconfigure / New Features - Document drainTimeout under spring-messaging-azure-servicebus / New Features Addresses reviewer feedback on PR Azure#48192.
|
@rujche Added CHANGELOG entries to |
…cessorClient.close() Holding the ServiceBusProcessorClient instance monitor for the entire drain wait stalled shutdown for the full drain timeout whenever an in-flight processMessage callback called any synchronized accessor on the same client (e.g. isRunning(), getIdentifier()): the handler blocked on the monitor while close() was waiting for that handler's count to reach zero. close() now snapshots state under the monitor (sets isRunning=false, marks v1Closing, captures processorV2 / drainTimeout), releases the monitor, performs the blocking drain (V1 drainV1Handlers or V2 processorV2.close()), then re-acquires the monitor for the non-blocking cleanup. Concurrent close() calls remain safe - every cleanup step is guarded by a null check or operates on already-cleared collections. Adds regression test v1CloseShouldNotHoldClientMonitorDuringDrain that asserts close() completes within 5s when an in-flight handler calls the synchronized isRunning() accessor (a regression that re-introduces the monitor hold would force close() to wait the full 30s drain timeout). Addresses Copilot feedback on PR Azure#48192.
…in + relocate spring CHANGELOG entries Two fixes addressing Copilot review feedback on PR Azure#48192: 1. ServiceBusProcessorClient: close() now releases the instance monitor across the drain wait, which opens a window where start()/stop()/restartMessageReceiver() could acquire the monitor, reset v1Closing, create a new async client, and start the connection monitor - only for close() to dispose those new resources during cleanup. Add a v1CloseInProgress AtomicBoolean that close() sets at the start of the V1 path and clears in finally. start(), stop(), and restartMessageReceiver() check the flag and return early with a log message when close() is in progress. Caller can retry start() once close() has returned. 2. sdk/spring/CHANGELOG.md: the previous entries landed inside the already-released 7.2.0 (2026-04-17) section after the recent merge from main brought 7.3.0-beta.1 (Unreleased) above it. Move the drainTimeout entries up to the unreleased section so the 7.2.0 release notes stay accurate. Adds regression test v1ConcurrentStartDuringCloseDrainIsIgnored verifying that a concurrent start() during close()'s drain window does not invoke the receiver builder a second time and does not leave the processor in a misleading running state.
…ceiver leak during/after close After close() releases the instance monitor across the drain wait, a concurrent getIdentifier() call could see asyncClient=null (just nulled by close()'s cleanup or never set) and fall through to the lazy createNewReceiver() path, leaving a fresh receiver behind that close() is no longer responsible for disposing. getIdentifier() now caches the identifier in a volatile field whenever it observes a live asyncClient, and close() captures it once more before nulling the client. When asyncClient is null but cachedV1Identifier is set, getIdentifier() returns the cached value without lazy-creating a receiver. The original lazy-init behavior is preserved only for the first-ever call before any start(). Adds regression test v1GetIdentifierDuringAndAfterCloseDoesNotCreateNewReceiver verifying the receiver builder is invoked exactly once across start() + getIdentifier() during drain + getIdentifier() after close. Addresses Copilot feedback on PR Azure#48192.
… null instead of empty string Two fixes addressing Copilot review feedback on PR Azure#48192: 1. Concurrent close() ownership: previous version used v1CloseInProgress.set(true)/finally clear, which let two concurrent close() calls both proceed through drain + cleanup. The first to finish would clear the flag and let a concurrent start() create new resources, which the still-running second close() could then dispose. close() now uses compareAndSet(false, true) so only the first call wins ownership; subsequent concurrent close() calls return immediately. The processor still gets fully closed - the owner finishes the work. 2. getIdentifier() consistency: previously returned an empty string when v1CloseInProgress was true with no cached identifier - a sentinel value callers don't expect. Now returns null, matching the V2 path's behavior when no identifier is available. Adds regression test v1ConcurrentCloseCallsDoNotRace verifying two concurrent close() calls: the second returns within 1 second (instead of waiting 30s drain timeout) and the asyncClient is closed exactly once.
Update getIdentifier() Javadoc to explicitly allow null and describe the two cases when null can be returned: V2 path before first start(), and V1 path when close() is in progress on a brand-new processor that never started. Addresses Copilot feedback on PR Azure#48192.
Fixes #45716
When
ServiceBusProcessorClient.close()is called while message handlers are still executing, the receiver is disposed immediately, causing in-flight handlers to fail withIllegalStateException: Cannot perform operation on a disposed receiver.What this PR does
Adds drain-before-dispose logic to all processor shutdown paths. An
AtomicIntegerhandler counter withObjectmonitor wait/notify blocksclose()until all in-flight message handlers complete (or a 30-second timeout expires) before subscription cancellation/disposal:MessagePump(V2 non-session) —drainHandlers()addedServiceBusProcessor.RollingMessagePump(V2 lifecycle) — callspump.drainHandlers()beforedisposable.dispose()SessionsMessagePump.RollingSessionReceiver(V2 session) — per-session drain interminate()beforeworkerScheduler.dispose()ServiceBusProcessorClient(V1) —drainV1Handlers()beforereceiverSubscriptions.cancel()This mirrors the .NET SDK's
StopProcessingAsyncbehavior which awaitsTask.WhenAllon in-flight handlers before disposing.V1 lifecycle hardening (added during review)
ServiceBusProcessorClient.close()originally held the instance monitor across the entire drain wait, which let any in-flight handler calling a synchronized accessor on the same client (isRunning(),getIdentifier()) stall shutdown for the full drain timeout. Releasing the monitor across the drain opened additional races withstart()/stop()/restartMessageReceiver()and concurrentclose()calls. The PR now:v1CloseInProgressAtomicBoolean(claimed viacompareAndSet) sostart(),stop(),restartMessageReceiver()return early during shutdown and only one concurrentclose()performs cleanup.getIdentifier()returns a stable value during/afterclose()without lazy-creating a fresh receiver.getIdentifier()may now returnnullon the V1 path whenclose()is in progress on a brand-new processor that never started (Javadoc updated).Tests
12 tests in
ServiceBusProcessorGracefulShutdownTestcovering the full drain + lifecycle surface:v2CloseShouldWaitForInFlightHandlerBeforeClosingClient,v1CloseShouldWaitForInFlightHandlerBeforeClosingClientv2DrainShouldRespectTimeout,v2DrainFromWithinHandlerShouldNotDeadlock,v1ReentrantCloseWaitsForOtherConcurrentHandlersv2ClosingFlagPreventsNewHandlersAfterDrainStarts,v1ClosingFlagPreventsNewHandlersAfterDrainStarts,v1StartAfterCloseResetsClosingFlagv1CloseShouldNotHoldClientMonitorDuringDrain,v1ConcurrentStartDuringCloseDrainIsIgnored,v1GetIdentifierDuringAndAfterCloseDoesNotCreateNewReceiver,v1ConcurrentCloseCallsDoNotRaceFull module test suite: 956 tests pass, 0 failures, 0 errors.