Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 61 additions & 10 deletions src/handlers/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,20 @@ import type { AssistantMessage, EventMessageUpdated, EventMessagePartUpdated, To
import { errorSummary, setBoundedMap, accumulateSessionTotals, isMetricEnabled } from "../util.ts"
import type { HandlerContext } from "../types.ts"

type SubtaskPart = {
type: "subtask"
sessionID: string
messageID: string
prompt: string
description: string
agent: string
}

/**
* Handles a completed assistant message: increments token and cost counters and emits
* either an `api_request` or `api_error` log event depending on whether the message errored.
* The `agent` attribute is sourced from the session totals, which are populated by the
* `chat.message` hook when the user prompt is received.
*/
export function handleMessageUpdated(e: EventMessageUpdated, ctx: HandlerContext) {
const msg = e.properties.info
Expand All @@ -15,45 +26,47 @@ export function handleMessageUpdated(e: EventMessageUpdated, ctx: HandlerContext

const { sessionID, modelID, providerID } = assistant
const duration = assistant.time.completed - assistant.time.created
const agent = ctx.sessionTotals.get(sessionID)?.agent ?? "unknown"

const totalTokens = assistant.tokens.input + assistant.tokens.output + assistant.tokens.reasoning
+ assistant.tokens.cache.read + assistant.tokens.cache.write

if (isMetricEnabled("token.usage", ctx)) {
const { tokenCounter } = ctx.instruments
tokenCounter.add(assistant.tokens.input, { ...ctx.commonAttrs, "session.id": sessionID, model: modelID, type: "input" })
tokenCounter.add(assistant.tokens.output, { ...ctx.commonAttrs, "session.id": sessionID, model: modelID, type: "output" })
tokenCounter.add(assistant.tokens.reasoning, { ...ctx.commonAttrs, "session.id": sessionID, model: modelID, type: "reasoning" })
tokenCounter.add(assistant.tokens.cache.read, { ...ctx.commonAttrs, "session.id": sessionID, model: modelID, type: "cacheRead" })
tokenCounter.add(assistant.tokens.cache.write, { ...ctx.commonAttrs, "session.id": sessionID, model: modelID, type: "cacheCreation" })
tokenCounter.add(assistant.tokens.input, { ...ctx.commonAttrs, "session.id": sessionID, model: modelID, agent, type: "input" })
tokenCounter.add(assistant.tokens.output, { ...ctx.commonAttrs, "session.id": sessionID, model: modelID, agent, type: "output" })
tokenCounter.add(assistant.tokens.reasoning, { ...ctx.commonAttrs, "session.id": sessionID, model: modelID, agent, type: "reasoning" })
tokenCounter.add(assistant.tokens.cache.read, { ...ctx.commonAttrs, "session.id": sessionID, model: modelID, agent, type: "cacheRead" })
tokenCounter.add(assistant.tokens.cache.write, { ...ctx.commonAttrs, "session.id": sessionID, model: modelID, agent, type: "cacheCreation" })
}

if (isMetricEnabled("cost.usage", ctx)) {
ctx.instruments.costCounter.add(assistant.cost, { ...ctx.commonAttrs, "session.id": sessionID, model: modelID })
ctx.instruments.costCounter.add(assistant.cost, { ...ctx.commonAttrs, "session.id": sessionID, model: modelID, agent })
}

if (isMetricEnabled("cache.count", ctx)) {
if (assistant.tokens.cache.read > 0) {
ctx.instruments.cacheCounter.add(1, { ...ctx.commonAttrs, "session.id": sessionID, model: modelID, type: "cacheRead" })
ctx.instruments.cacheCounter.add(1, { ...ctx.commonAttrs, "session.id": sessionID, model: modelID, agent, type: "cacheRead" })
}
if (assistant.tokens.cache.write > 0) {
ctx.instruments.cacheCounter.add(1, { ...ctx.commonAttrs, "session.id": sessionID, model: modelID, type: "cacheCreation" })
ctx.instruments.cacheCounter.add(1, { ...ctx.commonAttrs, "session.id": sessionID, model: modelID, agent, type: "cacheCreation" })
}
}

if (isMetricEnabled("message.count", ctx)) {
ctx.instruments.messageCounter.add(1, { ...ctx.commonAttrs, "session.id": sessionID, model: modelID })
ctx.instruments.messageCounter.add(1, { ...ctx.commonAttrs, "session.id": sessionID, model: modelID, agent })
}

if (isMetricEnabled("model.usage", ctx)) {
ctx.instruments.modelUsageCounter.add(1, { ...ctx.commonAttrs, "session.id": sessionID, model: modelID, provider: providerID })
ctx.instruments.modelUsageCounter.add(1, { ...ctx.commonAttrs, "session.id": sessionID, model: modelID, provider: providerID, agent })
}

accumulateSessionTotals(sessionID, totalTokens, assistant.cost, ctx)

ctx.log("debug", "otel: token+cost counters incremented", {
sessionID,
model: modelID,
agent,
input: assistant.tokens.input,
output: assistant.tokens.output,
reasoning: assistant.tokens.reasoning,
Expand All @@ -74,6 +87,7 @@ export function handleMessageUpdated(e: EventMessageUpdated, ctx: HandlerContext
"session.id": sessionID,
model: modelID,
provider: providerID,
agent,
error: errorSummary(assistant.error),
duration_ms: duration,
...ctx.commonAttrs,
Expand All @@ -82,6 +96,7 @@ export function handleMessageUpdated(e: EventMessageUpdated, ctx: HandlerContext
return ctx.log("error", "otel: api_error", {
sessionID,
model: modelID,
agent,
error: errorSummary(assistant.error),
duration_ms: duration,
})
Expand All @@ -98,6 +113,7 @@ export function handleMessageUpdated(e: EventMessageUpdated, ctx: HandlerContext
"session.id": sessionID,
model: modelID,
provider: providerID,
agent,
cost_usd: assistant.cost,
duration_ms: duration,
input_tokens: assistant.tokens.input,
Expand All @@ -111,6 +127,7 @@ export function handleMessageUpdated(e: EventMessageUpdated, ctx: HandlerContext
return ctx.log("info", "otel: api_request", {
sessionID,
model: modelID,
agent,
cost_usd: assistant.cost,
duration_ms: duration,
input_tokens: assistant.tokens.input,
Expand All @@ -121,9 +138,43 @@ export function handleMessageUpdated(e: EventMessageUpdated, ctx: HandlerContext
/**
* Tracks tool execution time between `running` and `completed`/`error` part updates,
* records a `tool.duration` histogram measurement, and emits a `tool_result` log event.
* Also handles `subtask` parts, incrementing the sub-agent invocation counter and emitting
* a `subtask_invoked` log event.
*/
export function handleMessagePartUpdated(e: EventMessagePartUpdated, ctx: HandlerContext) {
const part = e.properties.part

if (part.type === "subtask") {
const subtask = part as unknown as SubtaskPart
if (isMetricEnabled("subtask.count", ctx)) {
ctx.instruments.subtaskCounter.add(1, {
...ctx.commonAttrs,
"session.id": subtask.sessionID,
agent: subtask.agent,
})
}
ctx.logger.emit({
severityNumber: SeverityNumber.INFO,
severityText: "INFO",
timestamp: Date.now(),
observedTimestamp: Date.now(),
body: "subtask_invoked",
attributes: {
"event.name": "subtask_invoked",
"session.id": subtask.sessionID,
agent: subtask.agent,
description: subtask.description,
prompt_length: subtask.prompt.length,
...ctx.commonAttrs,
},
})
return ctx.log("info", "otel: subtask_invoked", {
sessionID: subtask.sessionID,
agent: subtask.agent,
description: subtask.description,
})
}

if (part.type !== "tool") return

const toolPart = part as ToolPart
Expand Down
13 changes: 7 additions & 6 deletions src/handlers/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,22 @@ import type { HandlerContext } from "../types.ts"

/** Increments the session counter, records start time, and emits a `session.created` log event. */
export function handleSessionCreated(e: EventSessionCreated, ctx: HandlerContext) {
const sessionID = e.properties.info.id
const createdAt = e.properties.info.time.created
const { id: sessionID, time, parentID } = e.properties.info
const createdAt = time.created
const isSubagent = !!parentID
if (isMetricEnabled("session.count", ctx)) {
ctx.instruments.sessionCounter.add(1, { ...ctx.commonAttrs, "session.id": sessionID })
ctx.instruments.sessionCounter.add(1, { ...ctx.commonAttrs, "session.id": sessionID, is_subagent: isSubagent })
}
setBoundedMap(ctx.sessionTotals, sessionID, { startMs: createdAt, tokens: 0, cost: 0, messages: 0 })
setBoundedMap(ctx.sessionTotals, sessionID, { startMs: createdAt, tokens: 0, cost: 0, messages: 0, agent: "unknown" })
ctx.logger.emit({
severityNumber: SeverityNumber.INFO,
severityText: "INFO",
timestamp: createdAt,
observedTimestamp: Date.now(),
body: "session.created",
attributes: { "event.name": "session.created", "session.id": sessionID, ...ctx.commonAttrs },
attributes: { "event.name": "session.created", "session.id": sessionID, is_subagent: isSubagent, ...ctx.commonAttrs },
})
return ctx.log("info", "otel: session.created", { sessionID, createdAt })
return ctx.log("info", "otel: session.created", { sessionID, createdAt, isSubagent })
}

function sweepSession(sessionID: string, ctx: HandlerContext) {
Expand Down
5 changes: 4 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ export const OtelPlugin: Plugin = async ({ project, client }) => {
},

"chat.message": safe("chat.message", async (input, output) => {
const agent = input.agent ?? "unknown"
const totals = sessionTotals.get(input.sessionID)
if (totals) totals.agent = agent
const promptLength = output.parts.reduce(
(acc, p) => (p.type === "text" ? acc + p.text.length : acc),
0,
Expand All @@ -148,7 +151,7 @@ export const OtelPlugin: Plugin = async ({ project, client }) => {
attributes: {
"event.name": "user_prompt",
"session.id": input.sessionID,
agent: input.agent ?? "unknown",
agent,
prompt_length: promptLength,
model: input.model
? `${input.model.providerID}/${input.model.modelID}`
Expand Down
4 changes: 4 additions & 0 deletions src/otel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,5 +133,9 @@ export function createInstruments(prefix: string): Instruments {
unit: "{retry}",
description: "Number of API retries observed via session.status events",
}),
subtaskCounter: meter.createCounter(`${prefix}subtask.count`, {
unit: "{subtask}",
description: "Number of sub-agent invocations observed via subtask message parts",
}),
}
}
2 changes: 2 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ export type Instruments = {
sessionCostGauge: Histogram
modelUsageCounter: Counter
retryCounter: Counter
subtaskCounter: Counter
}

/** Accumulated per-session totals used for gauge snapshots on session.idle. */
Expand All @@ -57,6 +58,7 @@ export type SessionTotals = {
tokens: number
cost: number
messages: number
agent: string
}

/** Shared context threaded through every event handler. */
Expand Down
1 change: 1 addition & 0 deletions src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,6 @@ export function accumulateSessionTotals(
tokens: existing.tokens + tokens,
cost: existing.cost + cost,
messages: existing.messages + 1,
agent: existing.agent,
})
}
37 changes: 36 additions & 1 deletion tests/handlers/disabled-metrics.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,15 +200,48 @@ describe("OPENCODE_DISABLE_METRICS", () => {
})
})

describe("subtask.count disabled", () => {
test("does not increment subtask counter", async () => {
const { ctx, counters } = makeCtx("proj_test", ["subtask.count"])
const e = {
type: "message.part.updated",
properties: {
part: { type: "subtask", sessionID: "ses_1", messageID: "msg_1", agent: "build", description: "desc", prompt: "prompt" },
},
} as unknown as EventMessagePartUpdated
await handleMessagePartUpdated(e, ctx)
expect(counters.subtask.calls).toHaveLength(0)
})

test("still emits subtask_invoked log record", async () => {
const { ctx, logger } = makeCtx("proj_test", ["subtask.count"])
const e = {
type: "message.part.updated",
properties: {
part: { type: "subtask", sessionID: "ses_1", messageID: "msg_1", agent: "build", description: "desc", prompt: "prompt" },
},
} as unknown as EventMessagePartUpdated
await handleMessagePartUpdated(e, ctx)
expect(logger.records.at(0)!.body).toBe("subtask_invoked")
})
})

describe("multiple disabled at once", () => {
test("disabling all metrics stops all counter/histogram calls", async () => {
const all = [
"session.count", "token.usage", "cost.usage", "lines_of_code.count",
"commit.count", "tool.duration", "cache.count", "session.duration",
"message.count", "session.token.total", "session.cost.total",
"model.usage", "retry.count",
"model.usage", "retry.count", "subtask.count",
]
const { ctx, counters, histograms, gauges } = makeCtx("proj_test", all)
const subtaskEvent = {
type: "message.part.updated",
properties: {
part: { type: "subtask", sessionID: "ses_1", messageID: "msg_1", agent: "build", description: "desc", prompt: "prompt" },
},
} as unknown as EventMessagePartUpdated

await handleSessionCreated(makeSessionCreated("ses_1"), ctx)
await handleMessageUpdated(makeAssistantMessage(), ctx)
handleSessionIdle(makeSessionIdle("ses_1"), ctx)
Expand All @@ -217,6 +250,7 @@ describe("OPENCODE_DISABLE_METRICS", () => {
handleCommandExecuted(makeCommandExecuted("git commit -m 'test'"), ctx)
await handleMessagePartUpdated(makeToolPart("running"), ctx)
await handleMessagePartUpdated(makeToolPart("completed"), ctx)
await handleMessagePartUpdated(subtaskEvent, ctx)

expect(counters.session.calls).toHaveLength(0)
expect(counters.token.calls).toHaveLength(0)
Expand All @@ -227,6 +261,7 @@ describe("OPENCODE_DISABLE_METRICS", () => {
expect(counters.retry.calls).toHaveLength(0)
expect(counters.lines.calls).toHaveLength(0)
expect(counters.commit.calls).toHaveLength(0)
expect(counters.subtask.calls).toHaveLength(0)
expect(histograms.tool.calls).toHaveLength(0)
expect(histograms.sessionDuration.calls).toHaveLength(0)
expect(gauges.sessionToken.calls).toHaveLength(0)
Expand Down
Loading
Loading