Skip to content

Commit e26027a

Browse files
rkannan82claude
andcommitted
Aggregate internal task queues under __temporal_sys__ instead of noop handler
Instead of suppressing all metrics for /temporal-sys/ queues with a noop handler, aggregate them under a synthetic "__temporal_sys__" task queue tag. This preserves operational observability while preventing cardinality explosion. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 60a0149 commit e26027a

3 files changed

Lines changed: 47 additions & 18 deletions

File tree

common/metrics/task_queues.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,13 @@ import (
77
)
88

99
const (
10-
omitted = "__omitted__"
11-
normal = "__normal__"
12-
sticky = "__sticky__"
10+
omitted = "__omitted__"
11+
normal = "__normal__"
12+
sticky = "__sticky__"
13+
temporalSys = "__temporal_sys__"
14+
15+
// NormalPartitionTagValue is the partition tag value for normal (non-sticky) partitions.
16+
NormalPartitionTagValue = normal
1317
)
1418

1519
// GetPerTaskQueueFamilyScope returns "namespace" and "taskqueue" tags. "taskqueue" will be "__omitted__" if
@@ -90,3 +94,9 @@ func GetPerTaskQueuePartitionTypeScope(
9094
return GetPerTaskQueueScope(handler, namespaceName, partition.TaskQueue(), taskQueueBreakdown,
9195
append(tags, PartitionTag(value))...)
9296
}
97+
98+
// TemporalSysTaskQueueTag returns a taskqueue tag with the "__temporal_sys__" synthetic value,
99+
// used to aggregate server-internal task queues under a single label.
100+
func TemporalSysTaskQueueTag() Tag {
101+
return UnsafeTaskQueueTag(temporalSys)
102+
}

service/matching/matching_engine.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,16 @@ func (e *matchingEngineImpl) loggerAndMetricsForPartition(
547547
tag.WorkflowNamespace(nsName))
548548
var metricsHandler metrics.Handler
549549
if strings.HasPrefix(partition.TaskQueue().Name(), internalTaskQueuePrefix) {
550-
metricsHandler = metrics.NoopMetricsHandler
550+
// Aggregate all internal task queues under "__temporal_sys__" to avoid
551+
// cardinality explosion from per-worker queue names.
552+
metricsHandler = e.metricsHandler.WithTags(
553+
metrics.NamespaceTag(nsName),
554+
metrics.TemporalSysTaskQueueTag(),
555+
metrics.TaskQueueTypeTag(partition.TaskType()),
556+
metrics.PartitionTag(metrics.NormalPartitionTagValue),
557+
metrics.OperationTag(metrics.MatchingTaskQueuePartitionManagerScope),
558+
metrics.NamespaceStateTag(nsState),
559+
)
551560
} else {
552561
metricsHandler = metrics.GetPerTaskQueuePartitionIDScope(
553562
e.metricsHandler,

service/matching/matching_engine_test.go

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5230,35 +5230,45 @@ func TestLoggerAndMetricsForPartition_InternalTaskQueue(t *testing.T) {
52305230
ns, mockNamespaceCache := createMockNamespaceCache(controller, matchingTestNamespace)
52315231
config := defaultTestConfig()
52325232
e := createTestMatchingEngine(log.NewTestLogger(), controller, config, nil, mockNamespaceCache)
5233-
e.metricsHandler = metricstest.NewCaptureHandler()
5233+
captureHandler := metricstest.NewCaptureHandler()
5234+
e.metricsHandler = captureHandler
52345235

52355236
tests := []struct {
5236-
name string
5237-
tqName string
5238-
expectNoop bool
5237+
name string
5238+
tqName string
5239+
expectTQValue string
52395240
}{
52405241
{
5241-
name: "normal task queue gets real metrics handler",
5242-
tqName: "my-task-queue",
5243-
expectNoop: false,
5242+
name: "normal task queue uses actual queue name",
5243+
tqName: "my-task-queue",
5244+
expectTQValue: "my-task-queue",
52445245
},
52455246
{
5246-
name: "internal nexus task queue gets noop metrics handler",
5247-
tqName: "/temporal-sys/worker-commands/ns/key",
5248-
expectNoop: true,
5247+
name: "internal task queue gets __temporal_sys__ taskqueue tag",
5248+
tqName: "/temporal-sys/worker-commands/ns/key",
5249+
expectTQValue: "__temporal_sys__",
52495250
},
52505251
}
52515252

52525253
for _, tc := range tests {
52535254
t.Run(tc.name, func(t *testing.T) {
5255+
capture := captureHandler.StartCapture()
52545256
prtn := newRootPartition(ns.ID().String(), tc.tqName, enumspb.TASK_QUEUE_TYPE_NEXUS)
52555257
tqConfig := newTaskQueueConfig(prtn.TaskQueue(), config, matchingTestNamespace)
52565258
_, _, handler := e.loggerAndMetricsForPartition(ns, prtn, tqConfig)
5257-
if tc.expectNoop {
5258-
assert.Equal(t, metrics.NoopMetricsHandler, handler)
5259-
} else {
5260-
assert.NotEqual(t, metrics.NoopMetricsHandler, handler)
5259+
// Emit a test metric through the handler and verify the taskqueue tag value.
5260+
metrics.PollSuccessPerTaskQueueCounter.With(handler).Record(1)
5261+
snap := capture.Snapshot()
5262+
captureHandler.StopCapture(capture)
5263+
recordings := snap["poll_success"]
5264+
require.NotEmpty(t, recordings, "expected poll_success metric to be recorded")
5265+
found := false
5266+
for _, rec := range recordings {
5267+
if rec.Tags["taskqueue"] == tc.expectTQValue {
5268+
found = true
5269+
}
52615270
}
5271+
assert.True(t, found, "expected taskqueue tag value %q", tc.expectTQValue)
52625272
})
52635273
}
52645274
}

0 commit comments

Comments
 (0)