Skip to content

fix(RoutingLLMService): 修复 commit 后流式事件乱序问题#9

Open
ASGPIPO wants to merge 1 commit intonageoffer:mainfrom
ASGPIPO:fix/routing-llm-service-drain
Open

fix(RoutingLLMService): 修复 commit 后流式事件乱序问题#9
ASGPIPO wants to merge 1 commit intonageoffer:mainfrom
ASGPIPO:fix/routing-llm-service-drain

Conversation

@ASGPIPO
Copy link
Copy Markdown

@ASGPIPO ASGPIPO commented Mar 22, 2026

  • 引入串行 drain 机制统一分发缓存事件
  • commit 后不再使用快照回放
  • commit 后到达的事件继续入队,保证按顺序分发
  • 避免缓存回放阶段与实时回调交错,导致下游收到乱序事件

@magestacks
Copy link
Copy Markdown
Collaborator

请提供个复现问题的测试用例,不然不好评估修改是否合理

@ASGPIPO
Copy link
Copy Markdown
Author

ASGPIPO commented Mar 30, 2026

class RoutingLLMServiceTests {

    @Test
    void bufferedEventsShouldDrainBeforeEventsArrivingDuringCommit() throws Exception {
        List<String> forwarded = Collections.synchronizedList(new ArrayList<>());
        CountDownLatch firstBufferedEventDispatched = new CountDownLatch(1);
        CountDownLatch allowDrainToContinue = new CountDownLatch(1);
        AtomicReference<Throwable> callbackFailure = new AtomicReference<>();
        AtomicReference<Throwable> commitFailure = new AtomicReference<>();

        StreamCallback downstream = new StreamCallback() {
            @Override
            public void onContent(String content) {
                forwarded.add(content);
                if ("A".equals(content)) {
                    firstBufferedEventDispatched.countDown();
                    try {
                        allowDrainToContinue.await(1, TimeUnit.SECONDS);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        callbackFailure.set(e);
                    }
                }
            }

            @Override
            public void onComplete() {
            }

            @Override
            public void onError(Throwable error) {
                callbackFailure.set(error);
            }
        };

        FirstPacketAwaiter awaiter = new FirstPacketAwaiter();
        StreamCallback probeCallback = newProbeBufferingCallback(downstream, awaiter);
        Method commitMethod = probeCallback.getClass().getDeclaredMethod("commit");
        commitMethod.setAccessible(true);

        probeCallback.onContent("A");
        probeCallback.onContent("B");

        Thread commitThread = new Thread(() -> {
            try {
                commitMethod.invoke(probeCallback);
            } catch (Throwable t) {
                commitFailure.set(t);
            }
        }, "commit-thread");
        commitThread.start();

        assertTrue(firstBufferedEventDispatched.await(1, TimeUnit.SECONDS));

        probeCallback.onContent("C");
        allowDrainToContinue.countDown();

        commitThread.join(1000);

        assertFalse(commitThread.isAlive());
        assertNull(callbackFailure.get());
        assertNull(commitFailure.get());
        assertEquals(List.of("A", "B", "C"), forwarded,
                "buffered events should be fully replayed before new events are dispatched");
    }

    private static StreamCallback newProbeBufferingCallback(StreamCallback downstream,
                                                            FirstPacketAwaiter awaiter) throws Exception {
        Class<?> callbackClass = Class.forName(
                "com.nageoffer.ai.ragent.infra.chat.RoutingLLMService$ProbeBufferingCallback");
        Constructor<?> constructor = callbackClass.getDeclaredConstructor(StreamCallback.class, FirstPacketAwaiter.class);
        constructor.setAccessible(true);
        return (StreamCallback) constructor.newInstance(downstream, awaiter);
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants