Skip to content

Commit 8c8b6a2

Browse files
authored
fix: Queue messages during conversation compaction (#1546)
## Problem Messages sent during compaction bypass the queue and are lost because isCompacting was only a derived UI property, invisible to the service layer. Closes #1272 ## Changes 1. Add isCompacting state to AgentSession and gate sendPrompt on it 2. Track compaction lifecycle via STATUS and COMPACT_BOUNDARY notifications in service 3. Drain queued messages when compaction completes 4. Replace all _posthog/ string literals with POSTHOG_NOTIFICATIONS constants 5. Add isNotification() utility in agent package to handle potential double-prefix 6. Remove duplicate isPosthogMethod helpers from renderer files ## How did you test this? Maually
1 parent 4f63a70 commit 8c8b6a2

File tree

12 files changed

+115
-76
lines changed

12 files changed

+115
-76
lines changed

apps/code/src/main/services/agent/service.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@ import {
1212
type SessionConfigOption,
1313
type SessionNotification,
1414
} from "@agentclientprotocol/sdk";
15-
import { isMcpToolReadOnly } from "@posthog/agent";
15+
import {
16+
isMcpToolReadOnly,
17+
isNotification,
18+
POSTHOG_NOTIFICATIONS,
19+
} from "@posthog/agent";
1620
import { hydrateSessionJsonl } from "@posthog/agent/adapters/claude/session/jsonl-hydration";
1721
import { getEffortOptions } from "@posthog/agent/adapters/claude/session/models";
1822
import { Agent } from "@posthog/agent/agent";
@@ -1350,7 +1354,7 @@ For git operations while detached:
13501354
method: string,
13511355
params: Record<string, unknown>,
13521356
): Promise<void> => {
1353-
if (method === "_posthog/sdk_session") {
1357+
if (isNotification(method, POSTHOG_NOTIFICATIONS.SDK_SESSION)) {
13541358
const {
13551359
taskRunId: notifTaskRunId,
13561360
sessionId,

apps/code/src/renderer/features/sessions/components/buildConversationItems.ts

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import type {
44
} from "@agentclientprotocol/sdk";
55
import type { QueuedMessage } from "@features/sessions/stores/sessionStore";
66
import type { SessionUpdate, ToolCall } from "@features/sessions/types";
7+
import { isNotification, POSTHOG_NOTIFICATIONS } from "@posthog/agent";
78
import {
89
type AcpMessage,
910
isJsonRpcNotification,
@@ -282,12 +283,6 @@ function handlePromptResponse(
282283
b.pendingPrompts.delete(msg.id);
283284
}
284285

285-
/** Check if a method matches a PostHog notification name, accounting for
286-
* the SDK sometimes double-prefixing (`__posthog/` instead of `_posthog/`). */
287-
function isPosthogMethod(method: string, name: string): boolean {
288-
return method === `_posthog/${name}` || method === `__posthog/${name}`;
289-
}
290-
291286
function handleNotification(
292287
b: ItemBuilder,
293288
msg: { method: string; params?: unknown },
@@ -323,7 +318,7 @@ function handleNotification(
323318
return;
324319
}
325320

326-
if (isPosthogMethod(msg.method, "console")) {
321+
if (isNotification(msg.method, POSTHOG_NOTIFICATIONS.CONSOLE)) {
327322
if (!b.currentTurn) {
328323
ensureImplicitTurn(b, ts);
329324
}
@@ -339,7 +334,7 @@ function handleNotification(
339334
return;
340335
}
341336

342-
if (isPosthogMethod(msg.method, "compact_boundary")) {
337+
if (isNotification(msg.method, POSTHOG_NOTIFICATIONS.COMPACT_BOUNDARY)) {
343338
if (!b.currentTurn) ensureImplicitTurn(b, ts);
344339
const params = msg.params as {
345340
trigger: "manual" | "auto";
@@ -356,7 +351,7 @@ function handleNotification(
356351
return;
357352
}
358353

359-
if (isPosthogMethod(msg.method, "status")) {
354+
if (isNotification(msg.method, POSTHOG_NOTIFICATIONS.STATUS)) {
360355
if (!b.currentTurn) ensureImplicitTurn(b, ts);
361356
const params = msg.params as { status: string; isComplete?: boolean };
362357
if (params.status === "compacting" && !params.isComplete) {

apps/code/src/renderer/features/sessions/service/service.test.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ const createMockSession = (
235235
startedAt: Date.now(),
236236
status: "connected",
237237
isPromptPending: false,
238+
isCompacting: false,
238239
promptStartedAt: null,
239240
pendingPermissions: new Map(),
240241
pausedDurationMs: 0,
@@ -551,6 +552,21 @@ describe("SessionService", () => {
551552
);
552553
});
553554

555+
it("queues message when compaction is in progress", async () => {
556+
const service = getSessionService();
557+
mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(
558+
createMockSession({ isCompacting: true }),
559+
);
560+
561+
const result = await service.sendPrompt("task-123", "Hello");
562+
563+
expect(result.stopReason).toBe("queued");
564+
expect(mockSessionStoreSetters.enqueueMessage).toHaveBeenCalledWith(
565+
"task-123",
566+
"Hello",
567+
);
568+
});
569+
554570
it("sends prompt via tRPC when session is ready", async () => {
555571
const service = getSessionService();
556572
mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(

apps/code/src/renderer/features/sessions/service/service.ts

Lines changed: 58 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import {
2626
} from "@features/sessions/stores/sessionStore";
2727
import { useSettingsStore } from "@features/settings/stores/settingsStore";
2828
import { taskViewedApi } from "@features/sidebar/hooks/useTaskViewed";
29+
import { isNotification, POSTHOG_NOTIFICATIONS } from "@posthog/agent";
2930
import { DEFAULT_GATEWAY_MODEL } from "@posthog/agent/gateway-models";
3031
import { getIsOnline } from "@renderer/stores/connectivityStore";
3132
import { trpcClient } from "@renderer/trpc/client";
@@ -495,6 +496,7 @@ export class SessionService {
495496
errorMessage:
496497
"Session disconnected due to inactivity. Click Retry to reconnect.",
497498
isPromptPending: false,
499+
isCompacting: false,
498500
promptStartedAt: null,
499501
});
500502
}
@@ -765,27 +767,14 @@ export class SessionService {
765767
"stopReason" in msg.result
766768
) {
767769
const stopReason = (msg.result as { stopReason?: string }).stopReason;
768-
const hasQueuedMessages =
769-
session.messageQueue.length > 0 && session.status === "connected";
770+
const hasQueuedMessages = this.drainQueuedMessages(taskRunId, session);
770771

771772
// Only notify when queue is empty - queued messages will start a new turn
772773
if (stopReason && !hasQueuedMessages) {
773774
notifyPromptComplete(session.taskTitle, stopReason, session.taskId);
774775
}
775776

776777
taskViewedApi.markActivity(session.taskId);
777-
778-
// Process queued messages after turn completes - send all as one prompt
779-
if (hasQueuedMessages) {
780-
setTimeout(() => {
781-
this.sendQueuedMessages(session.taskId).catch((err) => {
782-
log.error("Failed to send queued messages", {
783-
taskId: session.taskId,
784-
error: err,
785-
});
786-
});
787-
}, 0);
788-
}
789778
}
790779

791780
if ("method" in msg && msg.method === "session/update" && "params" in msg) {
@@ -828,10 +817,10 @@ export class SessionService {
828817
}
829818
}
830819

831-
// Handle _posthog/sdk_session notifications for adapter info
820+
// Handle SDK_SESSION notifications for adapter info
832821
if (
833822
"method" in msg &&
834-
msg.method === "_posthog/sdk_session" &&
823+
isNotification(msg.method, POSTHOG_NOTIFICATIONS.SDK_SESSION) &&
835824
"params" in msg
836825
) {
837826
const params = msg.params as {
@@ -848,6 +837,54 @@ export class SessionService {
848837
});
849838
}
850839
}
840+
841+
if (
842+
"method" in msg &&
843+
"params" in msg &&
844+
isNotification(msg.method, POSTHOG_NOTIFICATIONS.STATUS)
845+
) {
846+
const params = msg.params as { status?: string; isComplete?: boolean };
847+
if (params?.status === "compacting") {
848+
sessionStoreSetters.updateSession(taskRunId, {
849+
isCompacting: !params.isComplete,
850+
});
851+
}
852+
}
853+
854+
if (
855+
"method" in msg &&
856+
isNotification(msg.method, POSTHOG_NOTIFICATIONS.COMPACT_BOUNDARY)
857+
) {
858+
sessionStoreSetters.updateSession(taskRunId, {
859+
isCompacting: false,
860+
});
861+
862+
this.drainQueuedMessages(taskRunId, session);
863+
}
864+
}
865+
866+
private drainQueuedMessages(
867+
taskRunId: string,
868+
session: AgentSession,
869+
): boolean {
870+
const freshSession = sessionStoreSetters.getSessions()[taskRunId];
871+
const hasQueuedMessages =
872+
freshSession &&
873+
freshSession.messageQueue.length > 0 &&
874+
freshSession.status === "connected";
875+
876+
if (hasQueuedMessages) {
877+
setTimeout(() => {
878+
this.sendQueuedMessages(session.taskId).catch((err) => {
879+
log.error("Failed to send queued messages", {
880+
taskId: session.taskId,
881+
error: err,
882+
});
883+
});
884+
}, 0);
885+
}
886+
887+
return hasQueuedMessages;
851888
}
852889

853890
private handlePermissionRequest(
@@ -921,12 +958,13 @@ export class SessionService {
921958
throw new Error(`Session is not ready (status: ${session.status})`);
922959
}
923960

924-
if (session.isPromptPending) {
961+
if (session.isPromptPending || session.isCompacting) {
925962
const promptText = extractPromptText(prompt);
926963
sessionStoreSetters.enqueueMessage(taskId, promptText);
927964
log.info("Message queued", {
928965
taskId,
929966
queueLength: session.messageQueue.length + 1,
967+
reason: session.isCompacting ? "compacting" : "prompt_pending",
930968
});
931969
return { stopReason: "queued" };
932970
}
@@ -1053,11 +1091,13 @@ export class SessionService {
10531091
errorDetails ||
10541092
"Session connection lost. Please retry or start a new session.",
10551093
isPromptPending: false,
1094+
isCompacting: false,
10561095
promptStartedAt: null,
10571096
});
10581097
} else {
10591098
sessionStoreSetters.updateSession(session.taskRunId, {
10601099
isPromptPending: false,
1100+
isCompacting: false,
10611101
promptStartedAt: null,
10621102
});
10631103
}
@@ -2150,6 +2190,7 @@ export class SessionService {
21502190
startedAt: Date.now(),
21512191
status: "connecting",
21522192
isPromptPending: false,
2193+
isCompacting: false,
21532194
promptStartedAt: null,
21542195
pendingPermissions: new Map(),
21552196
pausedDurationMs: 0,

apps/code/src/renderer/features/sessions/stores/sessionStore.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ export interface AgentSession {
4747
errorTitle?: string;
4848
errorMessage?: string;
4949
isPromptPending: boolean;
50+
isCompacting: boolean;
5051
promptStartedAt: number | null;
5152
logUrl?: string;
5253
processedLineCount?: number;

apps/code/src/renderer/features/task-detail/utils/cloudToolChanges.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import type {
22
ToolCallContent,
33
ToolCallLocation,
44
} from "@features/sessions/types";
5+
import { isNotification, POSTHOG_NOTIFICATIONS } from "@posthog/agent";
56
import type { ChangedFile, GitFileStatus } from "@shared/types";
67
import {
78
type AcpMessage,
@@ -169,7 +170,9 @@ export function buildCloudEventSummary(
169170

170171
const merged = mergeToolCall(toolCalls.get(toolCallId), patch);
171172
toolCalls.set(toolCallId, merged);
172-
} else if (isPosthogMethod(message.method, "tree_snapshot")) {
173+
} else if (
174+
isNotification(message.method, POSTHOG_NOTIFICATIONS.TREE_SNAPSHOT)
175+
) {
173176
const params = message.params as
174177
| {
175178
changes?: Array<{ path: string; status: "A" | "M" | "D" }>;
@@ -229,10 +232,6 @@ export function extractCloudFileDiff(
229232
};
230233
}
231234

232-
function isPosthogMethod(method: string, name: string): boolean {
233-
return method === `_posthog/${name}` || method === `__posthog/${name}`;
234-
}
235-
236235
export function extractCloudToolChangedFiles(
237236
toolCalls: Map<string, ParsedToolCall>,
238237
): ChangedFile[] {

packages/agent/src/acp-extensions.ts

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,6 @@
55
* - Custom notification methods are prefixed with `_posthog/`
66
* - Custom data can be attached via `_meta` fields
77
*
8-
* Note: When using `extNotification()` from the ACP SDK, it automatically
9-
* adds an extra underscore prefix (e.g., `_posthog/tree_snapshot` becomes
10-
* `__posthog/tree_snapshot` in the log). Code that reads logs should handle both.
11-
*
128
* See: https://agentclientprotocol.com/docs/extensibility
139
*/
1410

@@ -68,3 +64,18 @@ export const POSTHOG_NOTIFICATIONS = {
6864
/** Token usage update for a session turn */
6965
USAGE_UPDATE: "_posthog/usage_update",
7066
} as const;
67+
68+
type NotificationMethod =
69+
(typeof POSTHOG_NOTIFICATIONS)[keyof typeof POSTHOG_NOTIFICATIONS];
70+
71+
/**
72+
* Check if an ACP method matches a PostHog notification, handling the
73+
* possible `__posthog/` double-prefix from extNotification().
74+
*/
75+
export function isNotification(
76+
method: string | undefined,
77+
notification: NotificationMethod,
78+
): boolean {
79+
if (!method) return false;
80+
return method === notification || method === `_${notification}`;
81+
}

packages/agent/src/adapters/claude/claude-agent.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -931,7 +931,7 @@ export class ClaudeAcpAgent extends BaseAcpAgent {
931931
),
932932
...(meta?.taskRunId
933933
? [
934-
this.client.extNotification("_posthog/sdk_session", {
934+
this.client.extNotification(POSTHOG_NOTIFICATIONS.SDK_SESSION, {
935935
taskRunId: meta.taskRunId,
936936
sessionId,
937937
adapter: "claude",

packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import type {
1717
BetaContentBlock,
1818
BetaRawContentBlockDelta,
1919
} from "@anthropic-ai/sdk/resources/beta.mjs";
20+
import { POSTHOG_NOTIFICATIONS } from "@/acp-extensions";
2021
import { image, text } from "../../../utils/acp-content";
2122
import { unreachable } from "../../../utils/common";
2223
import type { Logger } from "../../../utils/logger";
@@ -550,7 +551,7 @@ export async function handleSystemMessage(
550551
case "init":
551552
break;
552553
case "compact_boundary":
553-
await client.extNotification("_posthog/compact_boundary", {
554+
await client.extNotification(POSTHOG_NOTIFICATIONS.COMPACT_BOUNDARY, {
554555
sessionId,
555556
trigger: message.compact_metadata.trigger,
556557
preTokens: message.compact_metadata.pre_tokens,
@@ -566,7 +567,7 @@ export async function handleSystemMessage(
566567
case "status":
567568
if (message.status === "compacting") {
568569
logger.info("Session compacting started", { sessionId });
569-
await client.extNotification("_posthog/status", {
570+
await client.extNotification(POSTHOG_NOTIFICATIONS.STATUS, {
570571
sessionId,
571572
status: "compacting",
572573
});
@@ -579,7 +580,7 @@ export async function handleSystemMessage(
579580
status: message.status,
580581
summary: message.summary,
581582
});
582-
await client.extNotification("_posthog/task_notification", {
583+
await client.extNotification(POSTHOG_NOTIFICATIONS.TASK_NOTIFICATION, {
583584
sessionId,
584585
taskId: message.task_id,
585586
status: message.status,

packages/agent/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
export { isNotification, POSTHOG_NOTIFICATIONS } from "./acp-extensions";
12
export {
23
getMcpToolMetadata,
34
isMcpToolReadOnly,

0 commit comments

Comments
 (0)