Skip to content

Commit feaeedd

Browse files
jbiskurclaude
andcommitted
feat: add DataPathwayMetricsFetchCommand and throughput metrics support
Add GET /api/v1/pathways/:id/metrics command for fetching per-pathway delivery throughput data. Also extend heartbeat input to carry optional throughput snapshot (events/sec, success rate, per-endpoint/flowType). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 88ad468 commit feaeedd

4 files changed

Lines changed: 136 additions & 0 deletions

File tree

src/commands/data-pathways/assignment.heartbeat.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,25 @@ export interface DataPathwayAssignmentHeartbeatInput {
1010
deliveryErrorsTotal?: number
1111
bufferDepth?: number
1212
lagSeconds?: number
13+
throughput?: {
14+
global: { eventsPerSecond: number; totalRecorded: number; windowSeconds: number }
15+
endpoints: Record<
16+
string,
17+
{
18+
eventsPerSecond: number
19+
successRate: number
20+
flowTypes: Record<
21+
string,
22+
{
23+
eventsPerSecond: number
24+
successRate: number
25+
avgDurationMs: number
26+
recentResults: Array<{ status: number; durationMs: number; success: boolean; ageMs: number }>
27+
}
28+
>
29+
}
30+
>
31+
}
1332
}
1433
}
1534

src/commands/data-pathways/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ export * from "./pathway.fetch.ts"
44
export * from "./pathway.list.ts"
55
export * from "./pathway.disable.ts"
66
export * from "./pathway.delete.ts"
7+
export * from "./pathway.metrics.fetch.ts"
78

89
// Slots
910
export * from "./slot.register.ts"
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { Command } from "../../common/command.ts"
2+
import { type DataPathwayMetrics, DataPathwayMetricsSchema } from "../../contracts/data-pathways.ts"
3+
import type { ClientError } from "../../exceptions/client-error.ts"
4+
import { NotFoundException } from "../../exceptions/not-found.ts"
5+
import { parseResponseHelper } from "../../utils/parse-response-helper.ts"
6+
7+
export interface DataPathwayMetricsFetchInput {
8+
id: string
9+
}
10+
11+
export class DataPathwayMetricsFetchCommand extends Command<DataPathwayMetricsFetchInput, DataPathwayMetrics> {
12+
protected override allowedModes: ("apiKey" | "bearer")[] = ["bearer"]
13+
14+
protected override getMethod(): string {
15+
return "GET"
16+
}
17+
18+
protected override getBaseUrl(): string {
19+
return "https://data-pathways.api.flowcore.io"
20+
}
21+
22+
protected override getPath(): string {
23+
return `/api/v1/pathways/${this.input.id}/metrics`
24+
}
25+
26+
protected override parseResponse(rawResponse: unknown): DataPathwayMetrics {
27+
return parseResponseHelper(DataPathwayMetricsSchema, rawResponse)
28+
}
29+
30+
protected override handleClientError(error: ClientError): void {
31+
if (error.status === 404) {
32+
throw new NotFoundException("DataPathway", { id: this.input.id })
33+
}
34+
throw error
35+
}
36+
}

src/contracts/data-pathways.ts

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,86 @@ export const DataPathwayDeliveryErrorListSchema: TObject<{
492492
})
493493
export type DataPathwayDeliveryErrorList = Static<typeof DataPathwayDeliveryErrorListSchema>
494494

495+
// ── Pathway Metrics ──
496+
497+
type TThroughputRecentResult = TObject<{
498+
status: TNumber
499+
durationMs: TNumber
500+
success: TBoolean
501+
ageMs: TNumber
502+
}>
503+
const ThroughputRecentResultSchema: TThroughputRecentResult = Type.Object({
504+
status: Type.Number(),
505+
durationMs: Type.Number(),
506+
success: Type.Boolean(),
507+
ageMs: Type.Number(),
508+
})
509+
510+
type TThroughputFlowType = TObject<{
511+
eventsPerSecond: TNumber
512+
successRate: TNumber
513+
avgDurationMs: TNumber
514+
recentResults: TArray<TThroughputRecentResult>
515+
}>
516+
const ThroughputFlowTypeSchema: TThroughputFlowType = Type.Object({
517+
eventsPerSecond: Type.Number(),
518+
successRate: Type.Number(),
519+
avgDurationMs: Type.Number(),
520+
recentResults: Type.Array(ThroughputRecentResultSchema),
521+
})
522+
523+
type TThroughputEndpoint = TObject<{
524+
eventsPerSecond: TNumber
525+
successRate: TNumber
526+
flowTypes: TRecord<TString, TThroughputFlowType>
527+
}>
528+
const ThroughputEndpointSchema: TThroughputEndpoint = Type.Object({
529+
eventsPerSecond: Type.Number(),
530+
successRate: Type.Number(),
531+
flowTypes: Type.Record(Type.String(), ThroughputFlowTypeSchema),
532+
})
533+
534+
type TThroughputGlobal = TObject<{
535+
eventsPerSecond: TNumber
536+
totalRecorded: TNumber
537+
windowSeconds: TNumber
538+
}>
539+
540+
type TThroughputSnapshot = TObject<{
541+
global: TThroughputGlobal
542+
endpoints: TRecord<TString, TThroughputEndpoint>
543+
}>
544+
const ThroughputSnapshotSchema: TThroughputSnapshot = Type.Object({
545+
global: Type.Object({
546+
eventsPerSecond: Type.Number(),
547+
totalRecorded: Type.Number(),
548+
windowSeconds: Type.Number(),
549+
}),
550+
endpoints: Type.Record(Type.String(), ThroughputEndpointSchema),
551+
})
552+
553+
type TMetricsAssignmentEntry = TObject<{
554+
assignmentId: TString
555+
status: TString
556+
throughput: TUnion<[TThroughputSnapshot, TNull]>
557+
updatedAt: TString
558+
}>
559+
const MetricsAssignmentEntrySchema: TMetricsAssignmentEntry = Type.Object({
560+
assignmentId: Type.String(),
561+
status: Type.String(),
562+
throughput: Type.Union([ThroughputSnapshotSchema, Type.Null()]),
563+
updatedAt: Type.String(),
564+
})
565+
566+
export const DataPathwayMetricsSchema: TObject<{
567+
pathwayId: TString
568+
assignments: TArray<TMetricsAssignmentEntry>
569+
}> = Type.Object({
570+
pathwayId: Type.String(),
571+
assignments: Type.Array(MetricsAssignmentEntrySchema),
572+
})
573+
export type DataPathwayMetrics = Static<typeof DataPathwayMetricsSchema>
574+
495575
// ── Health ──
496576

497577
export const DataPathwayHealthSchema: TObject<{

0 commit comments

Comments
 (0)