From 4598124f9902b081a005f72f9c332c5bdf231c52 Mon Sep 17 00:00:00 2001 From: Wu Sheng Date: Sun, 14 Jun 2026 17:19:55 +0800 Subject: [PATCH 1/3] Fix BanyanDB runtime-rule self-heal + v2 MAL CounterWindow collision & Elvis falsy semantics * BanyanDB schema-cache self-heal: persist DAOs re-derive a missing local schema (RPC-free) once before failing; the no-init defer loop retries a transient backend probe error (isRetryableNoInitProbeFailure, default false / BanyanDB opt-in) instead of crash-looping the pod. * v2 MAL CounterWindow key collision: rate()/increase()/irate() keyed each counter's sliding window on the rule's output metric name (shared by every input metric of a rule) instead of the counter's own name, so counters that reduce to the same labels after .sum() shared one window slot and rated against each other's values -- fabricating non-zero rates from frozen counters (BanyanDB liaison gRPC error rate). Now keyed by the counter's own metric name. * v2 MAL Elvis ?: honored only null (Optional.ofNullable().orElse()); now Groovy-falsy via MalRuntimeHelper.elvis/isTruthy, single-evaluated -- fixes BanyanDB liaison node_type="" stored instead of "n/a". * banyandb otel-rules: PT15S -> PT1M rate window. * Tests: BanyanDBErrorRateReproTest, MALElvisFalsyTest, MetadataRegistryTest, ModelInstallerNoInitTest. --- docs/en/changes/changes.md | 3 + .../v2/compiler/MALClosureCodegen.java | 8 +- .../v2/compiler/rt/MalRuntimeHelper.java | 28 +++- .../meter/analyzer/v2/dsl/SampleFamily.java | 6 +- .../v2/dsl/BanyanDBErrorRateReproTest.java | 125 ++++++++++++++++++ .../analyzer/v2/dsl/MALElvisFalsyTest.java | 98 ++++++++++++++ .../core/storage/model/ModelInstaller.java | 43 +++++- .../model/ModelInstallerNoInitTest.java | 83 +++++++++++- .../banyandb/banyandb-instance.yaml | 42 +++--- .../otel-rules/banyandb/banyandb-service.yaml | 6 +- .../banyandb/BanyanDBIndexInstaller.java | 48 ++++++- .../banyandb/BanyanDBNoneStreamDAO.java | 6 + .../plugin/banyandb/MetadataRegistry.java | 47 ++++++- .../banyandb/measure/BanyanDBMetricsDAO.java | 33 +++-- .../banyandb/stream/BanyanDBRecordDAO.java | 9 ++ .../plugin/banyandb/MetadataRegistryTest.java | 81 ++++++++++++ 16 files changed, 611 insertions(+), 55 deletions(-) create mode 100644 oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/BanyanDBErrorRateReproTest.java create mode 100644 oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/MALElvisFalsyTest.java create mode 100644 oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistryTest.java diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index 51aa0dfa7956..59aa378a6b5a 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -242,6 +242,9 @@ admin-host only" entry above for the public REST retirement. #### OAP Server +* Fix BanyanDB peer nodes permanently flooding ` is not registered` when a node holds a live persist worker but its local `MetadataRegistry` schema cache was never populated for that model — e.g. a `withoutSchemaChange` peer apply or a runtime-rule bundled fall-over rebuilt the dispatch worker but skipped the populate, and nothing (the registry never evicts, the 30s reconcile only covers runtime-rule rows) ever re-derived it. The persist DAOs now self-heal a missing entry once with an RPC-free local re-derivation before failing, and the no-init defer poll loop retries a transient backend probe error instead of escaping and crash-looping the pod. +* Fix a v2 MAL `CounterWindow` key collision: `rate()` / `increase()` / `irate()` keyed each counter's sliding window on the rule's output metric name (the same for every input metric of a rule) instead of the counter's own name, so two or more counters that reduce to the same label set after `.sum(...)` shared one window and computed rates against each other's values — fabricating non-zero rates from unchanged counters (e.g. the BanyanDB liaison gRPC error rate read a steady non-zero off three frozen error counters). The window is now keyed by the counter's own metric name. +* Fix the v2 MAL Elvis operator `?:` to honor Groovy-falsy semantics. It compiled to `Optional.ofNullable(primary).orElse(fallback)`, applying the fallback only when the primary is `null`, so an empty-string primary kept `""` instead — e.g. a BanyanDB liaison `ServiceInstance` stored `node_type=""` rather than `n/a`, because `.sum([...,'node_type'])` fills an absent group-by label with `""`. The fallback now applies for falsy primaries such as null, false, numeric zero, and empty strings/containers. * SWIP-15: rebuild BanyanDB self-observability around the cluster / container / group model (requires BanyanDB 0.11+). A BanyanDB cluster is modeled as one `Service`, each container as a `ServiceInstance` (role/tier as attributes), and each storage group as an `Endpoint`. The `otel-rules/banyandb/` rules are category-separated by role (`node_*` / `liaison_*` / `data_*` / `lifecycle_*`) and by data type (`measure_*` / `stream_*` / `trace_*` / `property_*`), mirroring the upstream FODC-proxy Grafana boards, and include queue batch/message granularity (apache/skywalking-banyandb#1169). Adds a `SERVICE_INSTANCE_RELATION` MAL scope and `serviceInstanceRelation(...)` builder powering a new intra-cluster pod-to-pod deployment topology (`banyandb-instance-relation.yaml`). The stale single-node `host_name` model is removed. * Runtime MAL/LAL hot-update rules can declare `layerDefinitions:` to introduce new layers. Ordinals are operator-pinned in the `100_000+` tier; the layer is diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/compiler/MALClosureCodegen.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/compiler/MALClosureCodegen.java index 8b46f68cf91d..bc2806e5e393 100644 --- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/compiler/MALClosureCodegen.java +++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/compiler/MALClosureCodegen.java @@ -300,9 +300,13 @@ void generateClosureExpr(final StringBuilder sb, } else if (expr instanceof MALExpressionModel.ClosureElvisExpr) { final MALExpressionModel.ClosureElvisExpr elvis = (MALExpressionModel.ClosureElvisExpr) expr; - sb.append("java.util.Optional.ofNullable("); + // Groovy `?:` applies the fallback when the primary is falsy (null, + // empty string/container, numeric zero, false), not only when null. + // Keep the primary single-evaluated so expressions such as tags.remove(...) + // do not observe different values between the truth check and result. + sb.append(MALCodegenHelper.RUNTIME_HELPER_FQCN).append(".elvis("); generateClosureExpr(sb, elvis.getPrimary(), paramName, beanMode); - sb.append(").orElse("); + sb.append(", "); generateClosureExpr(sb, elvis.getFallback(), paramName, beanMode); sb.append(")"); } else if (expr instanceof MALExpressionModel.ClosureRegexMatchExpr) { diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/compiler/rt/MalRuntimeHelper.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/compiler/rt/MalRuntimeHelper.java index 669be6ef7cec..7275e11db459 100644 --- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/compiler/rt/MalRuntimeHelper.java +++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/compiler/rt/MalRuntimeHelper.java @@ -17,6 +17,9 @@ package org.apache.skywalking.oap.meter.analyzer.v2.compiler.rt; +import java.lang.reflect.Array; +import java.util.Collection; +import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.skywalking.oap.meter.analyzer.v2.dsl.Sample; @@ -55,12 +58,9 @@ public static String[][] regexMatch(final String input, final String regex) { return new String[][] {row}; } - /** - * Reverse division: computes {@code numerator / v} for each sample value {@code v}. - * Used by generated code for {@code Number / SampleFamily} expressions. - */ /** * Groovy truth check: {@code null → false}, empty string → {@code false}, + * numeric zero → {@code false}, empty collection/map/array → {@code false}, * {@code Boolean.FALSE → false}, everything else → {@code true}. * Used by generated filter code for standalone expressions in boolean context * (e.g., {@code tags.ApiId || tags.ApiName}). @@ -75,9 +75,29 @@ public static boolean isTruthy(final Object value) { if (value instanceof CharSequence) { return ((CharSequence) value).length() > 0; } + if (value instanceof Number) { + return ((Number) value).doubleValue() != 0.0D; + } + if (value instanceof Collection) { + return !((Collection) value).isEmpty(); + } + if (value instanceof Map) { + return !((Map) value).isEmpty(); + } + if (value.getClass().isArray()) { + return Array.getLength(value) > 0; + } return true; } + public static T elvis(final T primary, final T fallback) { + return isTruthy(primary) ? primary : fallback; + } + + /** + * Reverse division: computes {@code numerator / v} for each sample value {@code v}. + * Used by generated code for {@code Number / SampleFamily} expressions. + */ public static SampleFamily divReverse(final double numerator, final SampleFamily sf) { if (sf == SampleFamily.EMPTY) { diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/SampleFamily.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/SampleFamily.java index e3d1aec7b3a7..f07ccc8bf38c 100644 --- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/SampleFamily.java +++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/SampleFamily.java @@ -431,7 +431,7 @@ public SampleFamily increase(String range) { Arrays.stream(samples) .map(sample -> sample.increase( range, - context.metricName, + sample.getName(), (lowerBoundValue, unused) -> sample.value - lowerBoundValue )) .toArray(Sample[]::new) @@ -448,7 +448,7 @@ public SampleFamily rate(String range) { Arrays.stream(samples) .map(sample -> sample.increase( range, - context.metricName, + sample.getName(), (lowerBoundValue, lowerBoundTime) -> { final long timeDiff = (sample.timestamp - lowerBoundTime) / 1000; return timeDiff < 1L ? 0.0 : (sample.value - lowerBoundValue) / timeDiff; @@ -466,7 +466,7 @@ public SampleFamily irate() { this.context, Arrays.stream(samples) .map(sample -> sample.increase( - context.metricName, + sample.getName(), (lowerBoundValue, lowerBoundTime) -> { final long timeDiff = (sample.timestamp - lowerBoundTime) / 1000; return timeDiff < 1L ? 0.0 : (sample.value - lowerBoundValue) / timeDiff; diff --git a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/BanyanDBErrorRateReproTest.java b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/BanyanDBErrorRateReproTest.java new file mode 100644 index 000000000000..361a35b9fff0 --- /dev/null +++ b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/BanyanDBErrorRateReproTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.meter.analyzer.v2.dsl; + +import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.skywalking.oap.meter.analyzer.v2.dsl.counter.CounterWindow; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Reproduces the BanyanDB liaison_grpc_error_rate fabrication using the EXACT rule expression and the + * real (frozen) counter values scraped from the live demo FODC proxy. Counters never change across the + * simulated scrapes, so every rate term — and the summed result — MUST be 0. Any non-zero output proves + * the CounterWindow key collision: the three distinct error counters reduce to identical labels after + * .sum([...]) and, because the rate keys on the (shared, rule-level) context.metricName instead of each + * counter's own name, they share one CounterWindow slot and rate against each other's values. + */ +public class BanyanDBErrorRateReproTest { + + private static final String GROUP_BY = "['cluster','pod_name','container_name','node_role','node_type']"; + + // Verbatim from otel-rules/banyandb/banyandb-instance.yaml : liaison_grpc_error_rate (value part). + private static final String EXPR = + "(banyandb_liaison_grpc_total_err.sum(" + GROUP_BY + ").rate('PT1M')" + + " + banyandb_liaison_grpc_total_registry_err.sum(" + GROUP_BY + ").rate('PT1M')" + + " + banyandb_liaison_grpc_total_stream_msg_received_err.sum(" + GROUP_BY + ").rate('PT1M')) * 60"; + + @BeforeEach + void resetWindow() { + CounterWindow.INSTANCE.reset(); + } + + private static Sample s(final String name, final double value, final long ts, final String... kv) { + final ImmutableMap.Builder b = ImmutableMap.builder(); + for (int i = 0; i < kv.length; i += 2) { + b.put(kv[i], kv[i + 1]); + } + return Sample.builder().name(name).labels(b.build()).value(value).timestamp(ts).build(); + } + + // The three liaison-1 families, with the real frozen values (total_err=5, registry_err=166, stream=5). + // node_type is intentionally ABSENT on liaison samples, exactly as the FODC proxy exposes them. + private Map scrape(final long ts) { + final String[] common = { + "cluster", "showcase-banyandb", + "pod_name", "demo-banyandb-liaison-1", + "container_name", "liaison", + "node_role", "ROLE_LIAISON", + }; + final List totalErr = new ArrayList<>(); + totalErr.add(s("banyandb_liaison_grpc_total_err", 1, ts, with(common, "service", "measure", "method", "query", "group", "sw_metadata"))); + totalErr.add(s("banyandb_liaison_grpc_total_err", 2, ts, with(common, "service", "measure", "method", "query", "group", "sw_metricsMinute"))); + totalErr.add(s("banyandb_liaison_grpc_total_err", 1, ts, with(common, "service", "measure", "method", "query", "group", "sw_metricsHour"))); + totalErr.add(s("banyandb_liaison_grpc_total_err", 1, ts, with(common, "service", "measure", "method", "query", "group", "sw_metricsDay"))); + + final List registryErr = new ArrayList<>(); + registryErr.add(s("banyandb_liaison_grpc_total_registry_err", 47, ts, with(common, "service", "measure", "method", "get", "group", "sw_metricsHour"))); + registryErr.add(s("banyandb_liaison_grpc_total_registry_err", 47, ts, with(common, "service", "measure", "method", "get", "group", "sw_metricsMinute"))); + registryErr.add(s("banyandb_liaison_grpc_total_registry_err", 47, ts, with(common, "service", "measure", "method", "get", "group", "sw_metricsDay"))); + registryErr.add(s("banyandb_liaison_grpc_total_registry_err", 7, ts, with(common, "service", "indexRule", "method", "create", "group", "sw_metricsDay"))); + registryErr.add(s("banyandb_liaison_grpc_total_registry_err", 7, ts, with(common, "service", "indexRule", "method", "create", "group", "sw_metricsHour"))); + registryErr.add(s("banyandb_liaison_grpc_total_registry_err", 7, ts, with(common, "service", "indexRule", "method", "create", "group", "sw_metricsMinute"))); + registryErr.add(s("banyandb_liaison_grpc_total_registry_err", 2, ts, with(common, "service", "trace", "method", "get", "group", "sw_trace"))); + registryErr.add(s("banyandb_liaison_grpc_total_registry_err", 2, ts, with(common, "service", "trace", "method", "get", "group", "sw_zipkinTrace"))); + + final List streamErr = new ArrayList<>(); + streamErr.add(s("banyandb_liaison_grpc_total_stream_msg_received_err", 1, ts, with(common, "service", "measure", "method", "write", "group", "sw_metadata"))); + streamErr.add(s("banyandb_liaison_grpc_total_stream_msg_received_err", 2, ts, with(common, "service", "trace", "method", "write", "group", "sw_trace"))); + streamErr.add(s("banyandb_liaison_grpc_total_stream_msg_received_err", 2, ts, with(common, "service", "stream", "method", "write", "group", "sw_recordsLog"))); + + final Map map = new HashMap<>(); + map.put("banyandb_liaison_grpc_total_err", SampleFamilyBuilder.newBuilder(totalErr.toArray(new Sample[0])).build()); + map.put("banyandb_liaison_grpc_total_registry_err", SampleFamilyBuilder.newBuilder(registryErr.toArray(new Sample[0])).build()); + map.put("banyandb_liaison_grpc_total_stream_msg_received_err", SampleFamilyBuilder.newBuilder(streamErr.toArray(new Sample[0])).build()); + return map; + } + + private static String[] with(final String[] common, final String... extra) { + final String[] out = new String[common.length + extra.length]; + System.arraycopy(common, 0, out, 0, common.length); + System.arraycopy(extra, 0, out, common.length, extra.length); + return out; + } + + @Test + void unchangedCounters_errorRate_mustBeZero() { + final Expression expr = DSL.parse("meter_banyandb_instance_liaison_grpc_error_rate", EXPR); + long ts = 1_700_000_000_000L; + final long step = 10_000L; // 10s scrape, matching the showcase collector + for (int scrape = 0; scrape < 6; scrape++, ts += step) { + final Result result = expr.run(scrape(ts)); + double maxAbs = 0.0; + if (result.isSuccess() && result.getData() != SampleFamily.EMPTY) { + for (final Sample out : result.getData().samples) { + maxAbs = Math.max(maxAbs, Math.abs(out.getValue())); + } + } + // Counters never changed -> error rate MUST be 0 on every scrape. + assertEquals(0.0, maxAbs, 1e-9, + "Unchanged counters must yield 0 error rate, but scrape " + scrape + " produced " + maxAbs); + } + } +} diff --git a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/MALElvisFalsyTest.java b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/MALElvisFalsyTest.java new file mode 100644 index 000000000000..08f334551a19 --- /dev/null +++ b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/MALElvisFalsyTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.meter.analyzer.v2.dsl; + +import com.google.common.collect.ImmutableMap; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.skywalking.oap.meter.analyzer.v2.compiler.rt.MalRuntimeHelper; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Groovy's Elvis `?:` applies the fallback when the primary is falsy — including the empty string. + * The v2 codegen previously emitted Optional.ofNullable(P).orElse(F), which only fires on null, so an + * empty-string primary (e.g. a label that .sum() filled with "" for an absent key) leaked "" instead + * of the fallback. This is the exact mechanism behind BanyanDB liaison instances storing node_type="" + * instead of "n/a". + */ +public class MALElvisFalsyTest { + + private static String tagAfterElvis(final String nodeTypeValue) { + final ImmutableMap labels = nodeTypeValue == null + ? ImmutableMap.of("svc", "s") + : ImmutableMap.of("svc", "s", "node_type", nodeTypeValue); + final SampleFamily sf = SampleFamilyBuilder.newBuilder( + Sample.builder().name("metric").labels(labels).value(1.0).timestamp(1L).build()).build(); + final Expression expr = DSL.parse("test_elvis", + "metric.tag({tags -> tags['nt'] = tags.node_type ?: 'n/a'})"); + final Result r = expr.run(Map.of("metric", sf)); + return r.getData().samples[0].getLabels().get("nt"); + } + + private static String tagAfterSideEffectingElvis(final String nodeTypeValue) { + final SampleFamily sf = SampleFamilyBuilder.newBuilder( + Sample.builder() + .name("metric") + .labels(ImmutableMap.of("svc", "s", "node_type", nodeTypeValue)) + .value(1.0) + .timestamp(1L) + .build()).build(); + final Expression expr = DSL.parse("test_elvis_remove", + "metric.tag({tags -> tags['nt'] = tags.remove('node_type') ?: 'n/a'})"); + final Result r = expr.run(Map.of("metric", sf)); + return r.getData().samples[0].getLabels().get("nt"); + } + + @Test + void emptyStringPrimary_usesFallback() { + assertEquals("n/a", tagAfterElvis(""), "empty-string primary must fall through to 'n/a' (Groovy-falsy)"); + } + + @Test + void absentPrimary_usesFallback() { + assertEquals("n/a", tagAfterElvis(null), "absent (null) primary must fall through to 'n/a'"); + } + + @Test + void nonEmptyPrimary_keptAsIs() { + assertEquals("hot", tagAfterElvis("hot"), "non-empty primary must be kept"); + } + + @Test + void sideEffectingPrimary_evaluatedOnce() { + assertEquals("hot", tagAfterSideEffectingElvis("hot"), + "Elvis must not evaluate the primary twice; tags.remove(...) returns a value only once"); + } + + @Test + void runtimeTruthiness_matchesGroovyFalsyValues() { + assertFalse(MalRuntimeHelper.isTruthy(0)); + assertFalse(MalRuntimeHelper.isTruthy(0.0D)); + assertFalse(MalRuntimeHelper.isTruthy(Collections.emptyList())); + assertFalse(MalRuntimeHelper.isTruthy(Collections.emptyMap())); + assertFalse(MalRuntimeHelper.isTruthy(new String[0])); + assertTrue(MalRuntimeHelper.isTruthy(-1)); + assertTrue(MalRuntimeHelper.isTruthy(List.of("value"))); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java index d65d7fb75bc1..f73bd824273d 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java @@ -99,20 +99,39 @@ public void whenCreating(Model model, StorageManipulationOpt opt) throws Storage // resource that only this very apply would ever create. if (deferDDLToInitNode(opt)) { while (true) { - InstallInfo info = isExists(model, opt); - if (!info.isAllExist()) { - try { + boolean allExist; + try { + InstallInfo info = isExists(model, opt); + allExist = info.isAllExist(); + if (!allExist) { log.info( "install info: {}.table for model: [{}] not all required resources exist. OAP is running in 'no-init' mode, waiting create or update... retry 3s later.", info.buildInstallInfoMsg(), model.getName() ); - Thread.sleep(3000L); - } catch (InterruptedException e) { - log.error(e.getMessage()); } - } else { + } catch (final StorageException e) { + if (!isRetryableNoInitProbeFailure(e)) { + throw e; + } + // A transient backend error during the probe (e.g. a BanyanDB cluster data node + // still Init-ing, "client connection is closing") is NOT a reason to abort boot: + // the init OAP will create the resource and the next probe succeeds. Treat it like + // "not present yet" and retry in-loop, rather than letting it escape and crash-loop + // the pod — which would only re-enter this same loop after a full restart. + allExist = false; + log.warn("install info: existence probe for model: [{}] threw a transient backend " + + "error. OAP is running in 'no-init' mode, retry 3s later.", model.getName(), e); + } + if (allExist) { break; } + try { + Thread.sleep(3000L); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new StorageException( + "interrupted while waiting for no-init backend resources for model " + model.getName(), e); + } } return; } @@ -170,6 +189,16 @@ protected static boolean deferDDLToInitNode(final StorageManipulationOpt opt) { return RunningMode.isNoInitMode() && opt.getFlags().isDeferDDLToInitNode(); } + /** + * Whether a {@link StorageException} from the no-init defer-loop existence probe is + * known to be transient and should be retried in-loop. The base implementation is + * conservative so permanent model/config errors do not become an infinite boot wait; + * storage backends opt in only for transport-level probe failures they can classify. + */ + protected boolean isRetryableNoInitProbeFailure(final StorageException e) { + return false; + } + public void start() { } diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstallerNoInitTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstallerNoInitTest.java index d9cda58cd7ae..5ee2bbcaf374 100644 --- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstallerNoInitTest.java +++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstallerNoInitTest.java @@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; @@ -102,25 +103,103 @@ void withSchemaChangeSkipsCreateWhenResourceAlreadyExists() throws StorageExcept "withSchemaChange must not re-create a resource that already exists"); } + @Test + void noInitDeferLoopRetriesTransientProbeErrorInsteadOfCrashing() { + RunningMode.setMode("no-init"); + // The first existence probe throws a transient StorageException (mimicking a BanyanDB + // cluster data node still Init-ing); the next probe reports the resource present. + final RecordingInstaller installer = new RecordingInstaller(true /* present after transient */, + 1 /* one transient probe failure */, true /* retryable probe failure */); + final Model model = mock(Model.class); + when(model.getName()).thenReturn("static_metric_transient"); + + // Must NOT propagate the transient (which would escape whenCreating and crash-loop the pod); + // must retry in-loop, then return on the defer path without creating. 10s covers the 3s sleep. + assertTimeoutPreemptively(Duration.ofSeconds(10), () -> + installer.whenCreating(model, StorageManipulationOpt.schemaCreateIfAbsent())); + assertEquals(0, installer.createTableCalls, + "a transient probe error must be retried, then defer to the init node without creating"); + assertTrue(installer.probeCalls >= 2, + "the loop must probe again after the transient instead of escaping on the first throw"); + } + + @Test + void noInitDeferLoopPropagatesNonRetryableProbeError() { + RunningMode.setMode("no-init"); + final RecordingInstaller installer = new RecordingInstaller(true /* unused */, + 1 /* one probe failure */, false /* permanent/non-retryable */); + final Model model = mock(Model.class); + when(model.getName()).thenReturn("static_metric_bad_model"); + + assertThrows(StorageException.class, + () -> installer.whenCreating(model, StorageManipulationOpt.schemaCreateIfAbsent()), + "permanent model/config probe failures must not be converted into an infinite no-init wait"); + assertEquals(1, installer.probeCalls, + "a non-retryable failure must escape without sleeping and probing again"); + assertEquals(0, installer.createTableCalls); + } + + @Test + void noInitDeferLoopPropagatesInterruptedSleep() { + RunningMode.setMode("no-init"); + final RecordingInstaller installer = new RecordingInstaller(false /* resource absent */); + final Model model = mock(Model.class); + when(model.getName()).thenReturn("static_metric_wait_interrupted"); + + Thread.currentThread().interrupt(); + try { + assertThrows(StorageException.class, + () -> installer.whenCreating(model, StorageManipulationOpt.schemaCreateIfAbsent()), + "an interrupted no-init wait must fail fast so shutdown can proceed"); + assertTrue(Thread.currentThread().isInterrupted(), + "the interrupt flag must be restored for upstream shutdown handling"); + } finally { + Thread.interrupted(); + } + } + /** Minimal concrete {@link ModelInstaller} that records createTable calls and reports a * fixed existence result, so the base whenCreating branching can be exercised without a - * real storage backend. */ + * real storage backend. Optionally throws a transient {@link StorageException} on the first + * {@code transientProbeFailures} existence probes to exercise the no-init defer-loop retry. */ private static final class RecordingInstaller extends ModelInstaller { private final boolean resourcePresent; + private final int transientProbeFailures; + private final boolean retryableProbeFailure; + private int probeCalls; private int createTableCalls; private RecordingInstaller(final boolean resourcePresent) { + this(resourcePresent, 0, false); + } + + private RecordingInstaller(final boolean resourcePresent, final int transientProbeFailures) { + this(resourcePresent, transientProbeFailures, true); + } + + private RecordingInstaller(final boolean resourcePresent, final int transientProbeFailures, + final boolean retryableProbeFailure) { super(null, null); this.resourcePresent = resourcePresent; + this.transientProbeFailures = transientProbeFailures; + this.retryableProbeFailure = retryableProbeFailure; } @Override - public InstallInfo isExists(final Model model, final StorageManipulationOpt opt) { + public InstallInfo isExists(final Model model, final StorageManipulationOpt opt) throws StorageException { + if (probeCalls++ < transientProbeFailures) { + throw new StorageException("transient backend error"); + } final TestInstallInfo info = new TestInstallInfo(model); info.setAllExist(resourcePresent); return info; } + @Override + protected boolean isRetryableNoInitProbeFailure(final StorageException e) { + return retryableProbeFailure; + } + @Override public void createTable(final Model model) { createTableCalls++; diff --git a/oap-server/server-starter/src/main/resources/otel-rules/banyandb/banyandb-instance.yaml b/oap-server/server-starter/src/main/resources/otel-rules/banyandb/banyandb-instance.yaml index def21fc3086d..add9cdc45522 100644 --- a/oap-server/server-starter/src/main/resources/otel-rules/banyandb/banyandb-instance.yaml +++ b/oap-server/server-starter/src/main/resources/otel-rules/banyandb/banyandb-instance.yaml @@ -52,7 +52,7 @@ metricsRules: exp: banyandb_system_up_time # CPU usage (cores). process_* rides on every container including lifecycle. - name: cpu_usage - exp: process_cpu_seconds_total.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + exp: process_cpu_seconds_total.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') # resident memory (bytes). Raw gauge, present on all containers. - name: rss_memory exp: process_resident_memory_bytes @@ -75,45 +75,45 @@ metricsRules: exp: banyandb_system_disk.tagEqual('kind','used_percent').avg(['cluster','pod_name','container_name','node_role','node_type','path']) * 100 # network throughput (bytes/s) by interface name. - name: network_recv - exp: banyandb_system_net_state.tagEqual('kind','bytes_recv').sum(['cluster','pod_name','container_name','node_role','node_type','name']).rate('PT15S') + exp: banyandb_system_net_state.tagEqual('kind','bytes_recv').sum(['cluster','pod_name','container_name','node_role','node_type','name']).rate('PT1M') - name: network_sent - exp: banyandb_system_net_state.tagEqual('kind','bytes_sent').sum(['cluster','pod_name','container_name','node_role','node_type','name']).rate('PT15S') + exp: banyandb_system_net_state.tagEqual('kind','bytes_sent').sum(['cluster','pod_name','container_name','node_role','node_type','name']).rate('PT1M') # Go runtime. - name: goroutines exp: go_goroutines # average GC pause (s) = rate(Σpause) / rate(Σcount). go_gc_duration_seconds is a summary (no buckets), # so this ratio of _sum/_count is the only valid average — do not apply histogram_percentile to it. - name: gc_pause_avg - exp: go_gc_duration_seconds_sum.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S').safeDiv(go_gc_duration_seconds_count.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')) + exp: go_gc_duration_seconds_sum.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M').safeDiv(go_gc_duration_seconds_count.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')) - name: heap_inuse exp: go_memstats_heap_inuse_bytes - name: heap_next_gc exp: go_memstats_next_gc_bytes - name: alloc_rate - exp: go_memstats_alloc_bytes_total.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + exp: go_memstats_alloc_bytes_total.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') # ---- Liaison only (front door; the UI gates these on container_name == liaison) ---- # query rate (req/s) by data-model service (measure/stream/trace/property). method literal is "query". - name: liaison_query_rate - exp: banyandb_liaison_grpc_total_started.tagEqual('method','query').sum(['cluster','pod_name','container_name','node_role','node_type','service']).rate('PT15S') + exp: banyandb_liaison_grpc_total_started.tagEqual('method','query').sum(['cluster','pod_name','container_name','node_role','node_type','service']).rate('PT1M') # gRPC errors/min. Three liaison-side error families (mirrors the Grafana "gRPC Error Rate" panel, # which sums total_err + registry_err + stream_msg_received_err). All lazily registered -> empty on a # healthy cluster; each pre-aggregated to the same label set before '+'. - name: liaison_grpc_error_rate - exp: (banyandb_liaison_grpc_total_err.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + banyandb_liaison_grpc_total_registry_err.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + banyandb_liaison_grpc_total_stream_msg_received_err.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')) * 60 + exp: (banyandb_liaison_grpc_total_err.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') + banyandb_liaison_grpc_total_registry_err.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') + banyandb_liaison_grpc_total_stream_msg_received_err.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')) * 60 # registry operation rate (req/s): schema registry ops on the liaison front door. total_started is # query-only on the wire, so the former tagNotEqual('method','query') term was empty and is dropped; # registry_started carries the non-query op count. - name: liaison_registry_op_rate - exp: banyandb_liaison_grpc_total_registry_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + exp: banyandb_liaison_grpc_total_registry_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') # write rate (writes/s) seen at the liaison front door. group label dropped (instance-level total). - name: liaison_write_rate - exp: banyandb_measure_total_written.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + banyandb_stream_tst_total_written.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + banyandb_trace_tst_total_written.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + exp: banyandb_measure_total_written.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') + banyandb_stream_tst_total_written.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') + banyandb_trace_tst_total_written.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') # tier-2 publish pipeline (liaison -> data): throughput by operation, bytes/s, and p99 send latency. - name: liaison_publish_throughput - exp: banyandb_queue_pub_total_finished.sum(['cluster','pod_name','container_name','node_role','node_type','operation']).rate('PT15S') + exp: banyandb_queue_pub_total_finished.sum(['cluster','pod_name','container_name','node_role','node_type','operation']).rate('PT1M') - name: liaison_publish_bytes - exp: banyandb_queue_pub_sent_bytes.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + exp: banyandb_queue_pub_sent_bytes.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') - name: liaison_publish_latency_p99 exp: banyandb_queue_pub_total_latency.sum(['cluster','pod_name','container_name','node_role','node_type','operation','le']).histogram().histogram_percentile([99]) # tier-2 publish, batch granularity (BanyanDB #1169): batches published/s by operation and the batch @@ -121,7 +121,7 @@ metricsRules: # BUILD-GATED: _batch_finished/_batch_latency are absent on current builds -> emit nothing until the # shipped BanyanDB build registers them. - name: liaison_publish_batch_throughput - exp: banyandb_queue_pub_total_batch_finished.sum(['cluster','pod_name','container_name','node_role','node_type','operation']).rate('PT15S') + exp: banyandb_queue_pub_total_batch_finished.sum(['cluster','pod_name','container_name','node_role','node_type','operation']).rate('PT1M') - name: liaison_publish_batch_latency_p99 exp: banyandb_queue_pub_total_batch_latency.sum(['cluster','pod_name','container_name','node_role','node_type','operation','le']).histogram().histogram_percentile([99]) # liaison write-queue (wqueue) depth: pending records buffered at the front door before publish. @@ -144,34 +144,34 @@ metricsRules: exp: banyandb_measure_pending_data_count.sum(['cluster','pod_name','container_name','node_role','node_type']) + banyandb_stream_tst_pending_data_count.sum(['cluster','pod_name','container_name','node_role','node_type']) + banyandb_trace_tst_pending_data_count.sum(['cluster','pod_name','container_name','node_role','node_type']) # merge-loop iterations/s. - name: data_merge_file_rate - exp: banyandb_measure_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + banyandb_stream_tst_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + banyandb_trace_tst_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + exp: banyandb_measure_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') + banyandb_stream_tst_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') + banyandb_trace_tst_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') # avg parts merged per merge loop on the file path (matches Grafana = rate(merged_parts{type=file}) / # rate(merge_loop_started)). type='file' is data-only on the wire (liaison emits only type='mem'). - name: data_merge_file_partitions - exp: banyandb_measure_total_merged_parts.tagEqual('type','file').sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S').safeDiv(banyandb_measure_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')) + banyandb_stream_tst_total_merged_parts.tagEqual('type','file').sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S').safeDiv(banyandb_stream_tst_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')) + banyandb_trace_tst_total_merged_parts.tagEqual('type','file').sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S').safeDiv(banyandb_trace_tst_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')) + exp: banyandb_measure_total_merged_parts.tagEqual('type','file').sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M').safeDiv(banyandb_measure_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')) + banyandb_stream_tst_total_merged_parts.tagEqual('type','file').sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M').safeDiv(banyandb_stream_tst_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')) + banyandb_trace_tst_total_merged_parts.tagEqual('type','file').sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M').safeDiv(banyandb_trace_tst_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')) # avg file-merge latency (ms) per merge loop. - name: data_merge_file_latency - exp: (banyandb_measure_total_merge_latency.tagEqual('type','file').sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S').safeDiv(banyandb_measure_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')) + banyandb_stream_tst_total_merge_latency.tagEqual('type','file').sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S').safeDiv(banyandb_stream_tst_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')) + banyandb_trace_tst_total_merge_latency.tagEqual('type','file').sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S').safeDiv(banyandb_trace_tst_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S'))) * 1000 + exp: (banyandb_measure_total_merge_latency.tagEqual('type','file').sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M').safeDiv(banyandb_measure_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')) + banyandb_stream_tst_total_merge_latency.tagEqual('type','file').sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M').safeDiv(banyandb_stream_tst_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')) + banyandb_trace_tst_total_merge_latency.tagEqual('type','file').sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M').safeDiv(banyandb_trace_tst_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M'))) * 1000 # inverted-index (series) write rate / term-search rate / total docs. *_inverted_index_total_* are # # TYPE=gauge but cumulative, so rate() yields a per-window delta. Stream's series index is the # storage scope (stream_storage_*); the tst scope is reported separately below. Trace's series index # (trace_storage_*) is included so trace series writes/docs are not silently dropped. - name: data_series_write_rate - exp: banyandb_measure_inverted_index_total_updates.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + banyandb_stream_storage_inverted_index_total_updates.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + banyandb_trace_storage_inverted_index_total_updates.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + exp: banyandb_measure_inverted_index_total_updates.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') + banyandb_stream_storage_inverted_index_total_updates.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') + banyandb_trace_storage_inverted_index_total_updates.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') - name: data_series_term_search_rate - exp: banyandb_measure_inverted_index_total_term_searchers_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + banyandb_stream_storage_inverted_index_total_term_searchers_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + banyandb_trace_storage_inverted_index_total_term_searchers_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + exp: banyandb_measure_inverted_index_total_term_searchers_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') + banyandb_stream_storage_inverted_index_total_term_searchers_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') + banyandb_trace_storage_inverted_index_total_term_searchers_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') - name: data_total_series exp: banyandb_measure_inverted_index_total_doc_count.sum(['cluster','pod_name','container_name','node_role','node_type']) + banyandb_stream_storage_inverted_index_total_doc_count.sum(['cluster','pod_name','container_name','node_role','node_type']) + banyandb_trace_storage_inverted_index_total_doc_count.sum(['cluster','pod_name','container_name','node_role','node_type']) # stream time-series-table (tst) index, distinct from the stream series (storage) index above. - name: data_stream_tst_write_rate - exp: banyandb_stream_tst_inverted_index_total_updates.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + exp: banyandb_stream_tst_inverted_index_total_updates.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') - name: data_stream_tst_term_search_rate - exp: banyandb_stream_tst_inverted_index_total_term_searchers_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + exp: banyandb_stream_tst_inverted_index_total_term_searchers_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') - name: data_stream_tst_total_docs exp: banyandb_stream_tst_inverted_index_total_doc_count.sum(['cluster','pod_name','container_name','node_role','node_type']) # subscribe-side queue (data receives from liaison): throughput by operation + p99 latency. - name: data_queue_sub_throughput - exp: banyandb_queue_sub_total_finished.sum(['cluster','pod_name','container_name','node_role','node_type','operation']).rate('PT15S') + exp: banyandb_queue_sub_total_finished.sum(['cluster','pod_name','container_name','node_role','node_type','operation']).rate('PT1M') - name: data_queue_sub_latency_p99 exp: banyandb_queue_sub_total_latency.sum(['cluster','pod_name','container_name','node_role','node_type','operation','le']).histogram().histogram_percentile([99]) # subscribe-side per-message throughput (BanyanDB #1169). A data node ingests writes via the @@ -181,7 +181,7 @@ metricsRules: # intentionally not modeled here. Batch-level granularity lives on the liaison's publish side # (liaison_publish_batch_throughput / liaison_publish_batch_latency_p99 above). - name: data_queue_sub_message_throughput - exp: banyandb_queue_sub_total_message_finished.sum(['cluster','pod_name','container_name','node_role','node_type','operation']).rate('PT15S') + exp: banyandb_queue_sub_total_message_finished.sum(['cluster','pod_name','container_name','node_role','node_type','operation']).rate('PT1M') # retention disk-usage % per data-model scope (0-100 gauge). Kept per scope rather than summed (a sum # of three percentages is meaningless). Not in the upstream Grafana boards; a SkyWalking addition. - name: data_retention_measure_disk_usage_percent diff --git a/oap-server/server-starter/src/main/resources/otel-rules/banyandb/banyandb-service.yaml b/oap-server/server-starter/src/main/resources/otel-rules/banyandb/banyandb-service.yaml index 97c6cac8f6c8..45d505d8f43c 100644 --- a/oap-server/server-starter/src/main/resources/otel-rules/banyandb/banyandb-service.yaml +++ b/oap-server/server-starter/src/main/resources/otel-rules/banyandb/banyandb-service.yaml @@ -29,11 +29,11 @@ metricsRules: # cluster writes/s across the three data-model scopes (measure, stream, trace). Each scope's # write counter is collapsed to one per-cluster series before `+`. - name: cluster_write_rate - exp: (banyandb_measure_total_written.sum(['cluster']).rate('PT15S') + banyandb_stream_tst_total_written.sum(['cluster']).rate('PT15S') + banyandb_trace_tst_total_written.sum(['cluster']).rate('PT15S')) + exp: (banyandb_measure_total_written.sum(['cluster']).rate('PT1M') + banyandb_stream_tst_total_written.sum(['cluster']).rate('PT1M') + banyandb_trace_tst_total_written.sum(['cluster']).rate('PT1M')) # cluster queries/s. `service` on this family is BanyanDB's data-model facet # (measure/stream/trace/property), not a SkyWalking service; method literal is "query". - name: cluster_query_rate - exp: banyandb_liaison_grpc_total_started.tagEqual('method','query').sum(['cluster']).rate('PT15S') + exp: banyandb_liaison_grpc_total_started.tagEqual('method','query').sum(['cluster']).rate('PT1M') # cluster errors/min. The seven liaison-side error families mirror the upstream Grafana # "Error Rate" stat (grafana-fodc-workload.json). Each is pre-aggregated to ['cluster'] # BEFORE `+` because their wire label sets differ (stream_msg_received_err carries @@ -42,7 +42,7 @@ metricsRules: # registered and emit no series; MAL treats an empty operand as the additive identity, so the # sum emits from whatever has fired and renders absent-as-0 when nothing has. - name: cluster_error_rate - exp: (banyandb_liaison_grpc_total_err.sum(['cluster']).rate('PT15S') + banyandb_liaison_grpc_total_registry_err.sum(['cluster']).rate('PT15S') + banyandb_liaison_grpc_total_stream_msg_received_err.sum(['cluster']).rate('PT15S') + banyandb_queue_pub_total_err.sum(['cluster']).rate('PT15S') + banyandb_measure_total_sync_loop_err.sum(['cluster']).rate('PT15S') + banyandb_stream_tst_total_sync_loop_err.sum(['cluster']).rate('PT15S') + banyandb_trace_tst_total_sync_loop_err.sum(['cluster']).rate('PT15S')) * 60 + exp: (banyandb_liaison_grpc_total_err.sum(['cluster']).rate('PT1M') + banyandb_liaison_grpc_total_registry_err.sum(['cluster']).rate('PT1M') + banyandb_liaison_grpc_total_stream_msg_received_err.sum(['cluster']).rate('PT1M') + banyandb_queue_pub_total_err.sum(['cluster']).rate('PT1M') + banyandb_measure_total_sync_loop_err.sum(['cluster']).rate('PT1M') + banyandb_stream_tst_total_sync_loop_err.sum(['cluster']).rate('PT1M') + banyandb_trace_tst_total_sync_loop_err.sum(['cluster']).rate('PT1M')) * 60 # live container count by role. count(['cluster','container_name','pod_name']) groups by all # three then re-groups excluding the last key (pod_name), yielding one sample per # (cluster, container_name) whose value = distinct pod_name count -> data=N, liaison=M. diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java index cabfb75276cd..34c00bfabffd 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java @@ -19,9 +19,11 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb; import io.grpc.Status; +import java.time.Duration; import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -40,7 +42,6 @@ import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.IndexRuleBinding; import org.apache.skywalking.banyandb.schema.v1.BanyandbSchema.SchemaKey; import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TopNAggregation; -import java.time.Duration; import org.apache.skywalking.library.banyandb.v1.client.BanyanDBClient; import org.apache.skywalking.library.banyandb.v1.client.SchemaWatcher; import org.apache.skywalking.library.banyandb.v1.client.grpc.exception.BanyanDBException; @@ -103,6 +104,51 @@ public class BanyanDBIndexInstaller extends ModelInstaller { public BanyanDBIndexInstaller(Client client, ModuleManager moduleManager, BanyanDBStorageConfig config) { super(client, moduleManager); this.config = config; + // Let read/persist paths self-heal a missing local schema entry (MetadataRegistry.repopulateLocally): + // re-derive the model's Schema locally with zero server RPC via the same primitive the peer + // boot path uses. This closes the " is not registered" flood that arises when a + // withoutSchemaChange peer apply or a runtime-rule bundled fall-over rebuilds the dispatch + // worker but skips the populate. DownSamplingConfigService is resolved lazily per call — a + // self-heal only fires post-boot, when CoreModule is long up. + MetadataRegistry.INSTANCE.registerLocalSchemaPopulator(model -> { + final DownSamplingConfigService downSamplingConfigService = moduleManager.find(CoreModule.NAME) + .provider() + .getService(DownSamplingConfigService.class); + registerLocallyByKind(model, downSamplingConfigService); + }); + } + + @Override + protected boolean isRetryableNoInitProbeFailure(final StorageException e) { + Throwable cause = e.getCause(); + while (cause != null) { + if (cause instanceof BanyanDBException) { + return isTransientBanyanDBProbeFailure((BanyanDBException) cause); + } + cause = cause.getCause(); + } + return false; + } + + private static boolean isTransientBanyanDBProbeFailure(final BanyanDBException e) { + final Status.Code code = e.getStatus(); + if (Status.Code.UNAVAILABLE.equals(code) + || Status.Code.DEADLINE_EXCEEDED.equals(code) + || Status.Code.CANCELLED.equals(code) + || Status.Code.RESOURCE_EXHAUSTED.equals(code) + || Status.Code.ABORTED.equals(code)) { + return true; + } + if (!Status.Code.UNKNOWN.equals(code)) { + return false; + } + final String message = String.valueOf(e.getMessage()).toLowerCase(Locale.ROOT); + return message.contains("client connection is closing") + || message.contains("connection is closing") + || message.contains("transport is closing") + || message.contains("connection refused") + || message.contains("connection reset") + || message.contains("broken pipe"); } @Override diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java index 09bd256e14a4..98763aa3e115 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java @@ -40,7 +40,13 @@ public BanyanDBNoneStreamDAO(BanyanDBStorageClient client, StorageBuilder registry = new HashMap<>(); + // ConcurrentHashMap (not HashMap): boot populates single-threaded, but the self-heal path + // (repopulateLocally) writes from persistence/query threads concurrently with reads. + private final Map registry = new ConcurrentHashMap<>(); + + /** + * Re-derive and locally register a model's BanyanDB {@link Schema} with NO server RPC. + * Registered once by the active {@code BanyanDBIndexInstaller} at boot and invoked by + * {@link #repopulateLocally(Model)} when a read path finds the cache empty for a model whose + * dispatch worker is already live — e.g. a {@code withoutSchemaChange} peer apply or a + * runtime-rule bundled fall-over rebuilt the worker but skipped the local populate. The + * {@code Model} is always known locally and its schema is a pure local derivation, so such a + * miss is always re-derivable without touching the backend. + */ + @FunctionalInterface + public interface LocalSchemaPopulator { + void populateLocally(Model model); + } + + private volatile LocalSchemaPopulator localSchemaPopulator; + + /** Register the boot-time, RPC-free local schema populator. Called once by the active installer. */ + public void registerLocalSchemaPopulator(final LocalSchemaPopulator populator) { + this.localSchemaPopulator = populator; + } + + /** + * Best-effort, RPC-free re-derivation of a model's local {@link Schema} so a read/persist path + * can self-heal a missing cache entry instead of throwing {@code " is not registered"} + * forever (the registry never evicts, so an entry that was never populated on this node stays + * absent otherwise). No-op when no populator is registered (e.g. non-BanyanDB unit tests). + * Swallows derivation exceptions so a self-heal attempt is never worse than the pre-existing + * throw — the caller re-reads and surfaces its own not-registered error if still absent. + */ + public void repopulateLocally(final Model model) { + final LocalSchemaPopulator populator = this.localSchemaPopulator; + if (populator == null) { + return; + } + try { + populator.populateLocally(model); + } catch (final Exception e) { + log.debug("local schema self-heal re-derivation failed for model [{}]; " + + "caller will surface the not-registered error", model.getName(), e); + } + } public StreamModel registerStreamModel(Model model, BanyanDBStorageConfig config) { final SchemaMetadata schemaMetadata = parseMetadata(model, config, null); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java index 4f1ff1928e88..d52c70ec5ffd 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java @@ -63,12 +63,29 @@ public BanyanDBMetricsDAO(BanyanDBStorageClient client, StorageBuilder this.storageBuilder = storageBuilder; } - @Override - public List multiGet(Model model, List metrics) throws IOException { + /** + * Resolve the model's BanyanDB schema, self-healing a missing local entry once before failing. + * A null here means this node has a live persist worker for the model but its schema cache was + * never populated (or lost) — typically a {@code withoutSchemaChange} peer apply or a + * runtime-rule bundled fall-over that rebuilt the worker without the populate. Re-derive the + * schema locally with no server RPC and re-read; throw only if the entry is still absent, so + * a genuinely unknown model still fails fast instead of flooding forever. + */ + private MetadataRegistry.Schema resolveSchema(Model model) throws IOException { MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model); if (schema == null) { - throw new IOException(model.getName() + " is not registered"); + MetadataRegistry.INSTANCE.repopulateLocally(model); + schema = MetadataRegistry.INSTANCE.findMetadata(model); + if (schema == null) { + throw new IOException(model.getName() + " is not registered"); + } } + return schema; + } + + @Override + public List multiGet(Model model, List metrics) throws IOException { + MetadataRegistry.Schema schema = resolveSchema(model); final Map> seriesIDColumns = new HashMap<>(); if (model.getBanyanDBModelExtension().isIndexMode()) { seriesIDColumns.put(ID, new ArrayList<>()); @@ -144,10 +161,7 @@ protected void apply(MeasureQuery query) { @Override public InsertRequest prepareBatchInsert(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException { - MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model); - if (schema == null) { - throw new IOException(model.getName() + " is not registered"); - } + MetadataRegistry.Schema schema = resolveSchema(model); MeasureWrite measureWrite = getClient().createMeasureWrite(schema.getMetadata().getGroup(), // group name schema.getMetadata().name(), // measure-name TimeBucket.getTimestamp(metrics.getTimeBucket(), model.getDownsampling())); // timestamp @@ -161,10 +175,7 @@ public InsertRequest prepareBatchInsert(Model model, Metrics metrics, SessionCac @Override public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException { - MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model); - if (schema == null) { - throw new IOException(model.getName() + " is not registered"); - } + MetadataRegistry.Schema schema = resolveSchema(model); MeasureWrite measureWrite = getClient().createMeasureWrite(schema.getMetadata().getGroup(), // group name schema.getMetadata().name(), // measure-name TimeBucket.getTimestamp(metrics.getTimeBucket(), model.getDownsampling())); // timestamp diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java index 8bb0d28f8009..4632982b7631 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java @@ -50,7 +50,13 @@ public BanyanDBRecordDAO(BanyanDBStorageClient client, StorageBuilder st @Override public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException { + // Self-heal a missing local schema entry once (RPC-free re-derivation) before failing — + // see MetadataRegistry.repopulateLocally. Throw only if the entry is still absent. MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model); + if (schema == null) { + MetadataRegistry.INSTANCE.repopulateLocally(model); + schema = MetadataRegistry.INSTANCE.findMetadata(model); + } if (schema == null) { throw new IOException(model.getName() + " is not registered"); } @@ -60,6 +66,9 @@ public InsertRequest prepareBatchInsert(Model model, Record record) throws IOExc if (record instanceof BanyanDBTrace.MergeTable) { BanyanDBTrace.MergeTable mergeTable = (BanyanDBTrace.MergeTable) record; MetadataRegistry.Schema mergeTableSchema = MetadataRegistry.INSTANCE.findRecordMetadata(mergeTable.getMergeTableName()); + if (mergeTableSchema == null) { + throw new IOException(mergeTable.getMergeTableName() + " is not registered"); + } traceWrite = getClient().createTraceWrite( schema.getMetadata().getGroup(), diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistryTest.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistryTest.java new file mode 100644 index 000000000000..479644ca94c6 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistryTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.banyandb; + +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.skywalking.oap.server.core.storage.model.Model; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit coverage for the local schema-cache self-heal on {@link MetadataRegistry}. A read/persist + * path that finds the cache empty for a model whose dispatch worker is already live (e.g. a + * {@code withoutSchemaChange} peer apply or a runtime-rule bundled fall-over that rebuilt the + * worker but skipped the populate) must be able to re-derive the schema locally with no server + * RPC, instead of throwing {@code " is not registered"} forever. + */ +class MetadataRegistryTest { + + @AfterEach + void clearPopulator() { + // MetadataRegistry is an enum singleton; clear the populator so global state set by a test + // does not leak into others. + MetadataRegistry.INSTANCE.registerLocalSchemaPopulator(null); + } + + @Test + void repopulateLocallyInvokesRegisteredPopulator() { + final Model model = mock(Model.class); + when(model.getName()).thenReturn("meter_test_metric"); + final AtomicInteger calls = new AtomicInteger(); + MetadataRegistry.INSTANCE.registerLocalSchemaPopulator(m -> calls.incrementAndGet()); + + MetadataRegistry.INSTANCE.repopulateLocally(model); + + assertEquals(1, calls.get(), "a registered populator must be invoked on a self-heal attempt"); + } + + @Test + void repopulateLocallyIsNoOpWhenNoPopulatorRegistered() { + MetadataRegistry.INSTANCE.registerLocalSchemaPopulator(null); + final Model model = mock(Model.class); + assertDoesNotThrow(() -> MetadataRegistry.INSTANCE.repopulateLocally(model), + "self-heal with no populator (e.g. a non-BanyanDB context) must be a no-op"); + } + + @Test + void repopulateLocallySwallowsPopulatorError() { + final Model model = mock(Model.class); + when(model.getName()).thenReturn("meter_test_metric"); + MetadataRegistry.INSTANCE.registerLocalSchemaPopulator(m -> { + throw new RuntimeException("derivation boom"); + }); + + // A failed re-derivation must never be worse than the pre-existing throw: the caller + // re-reads and surfaces its own not-registered error, so repopulateLocally itself must + // not propagate. + assertDoesNotThrow(() -> MetadataRegistry.INSTANCE.repopulateLocally(model), + "a throwing populator must be swallowed so self-heal never worsens the failure"); + } +} From a684d6135a04082e8403f1bf1644a74d4b4b42ee Mon Sep 17 00:00:00 2001 From: Wu Sheng Date: Sun, 14 Jun 2026 22:19:02 +0800 Subject: [PATCH 2/3] CI: bump GHA third-party actions to ASF-approved v4 SHAs ASF infrastructure-actions approved_patterns.yml dropped the v3 SHAs for these actions, so the stale pins were rejected and the CI workflow failed with startup_failure. Updated to the newest approved v4 SHA each: * docker/login-action v3.7.0 -> v4.2.0 (650006c6) * docker/setup-buildx-action v3.12.0 -> v4.1.0 (d7f5e7f5) * docker/setup-qemu-action v3.6.0 -> v4.1.0 (06116385) * dorny/paths-filter v3.0.2 -> v4.0.1 (fbd0ab8f) --- .github/workflows/publish-docker-e2e-service.yaml | 6 +++--- .github/workflows/publish-docker.yaml | 6 +++--- .github/workflows/skywalking.yaml | 10 +++++----- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/.github/workflows/publish-docker-e2e-service.yaml b/.github/workflows/publish-docker-e2e-service.yaml index e96ab745eb6f..0887df009d12 100644 --- a/.github/workflows/publish-docker-e2e-service.yaml +++ b/.github/workflows/publish-docker-e2e-service.yaml @@ -51,14 +51,14 @@ jobs: restore-keys: | ${{ runner.os }}-maven- - name: Log in to the Container registry - uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 + uses: docker/login-action@650006c6eb7dba73a995cc03b0b2d7f5ca915bee # v4.2.0 with: registry: ${{ env.HUB }} username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} - name: Set up QEMU - uses: docker/setup-qemu-action@29109295f81e9208d7d86ff1c6c12d2833863392 + uses: docker/setup-qemu-action@06116385d9baf250c9f4dcb4858b16962ea869c3 # v4.1.0 - name: Set up Docker Buildx - uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f + uses: docker/setup-buildx-action@d7f5e7f509e45cec5c76c4d5afdd7de93d0b3df5 # v4.1.0 - name: Build and push images run: make -C test build.e2e-service docker.push-e2e-service diff --git a/.github/workflows/publish-docker.yaml b/.github/workflows/publish-docker.yaml index 765c8e2340dd..41bd15b3fa5d 100644 --- a/.github/workflows/publish-docker.yaml +++ b/.github/workflows/publish-docker.yaml @@ -65,15 +65,15 @@ jobs: echo "TAG=${{ github.sha }}" >> $GITHUB_ENV fi - name: Log in to the Container registry - uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 + uses: docker/login-action@650006c6eb7dba73a995cc03b0b2d7f5ca915bee # v4.2.0 with: registry: ${{ env.DOCKER_REGISTRY }} username: ${{ env.DOCKER_USERNAME }} password: ${{ env.DOCKER_PASSWORD }} - name: Set up QEMU - uses: docker/setup-qemu-action@29109295f81e9208d7d86ff1c6c12d2833863392 + uses: docker/setup-qemu-action@06116385d9baf250c9f4dcb4858b16962ea869c3 # v4.1.0 - name: Set up Docker Buildx - uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f + uses: docker/setup-buildx-action@d7f5e7f509e45cec5c76c4d5afdd7de93d0b3df5 # v4.1.0 - name: Build and push docker images based on Java 11 env: SW_OAP_BASE_IMAGE: eclipse-temurin:11-jre diff --git a/.github/workflows/skywalking.yaml b/.github/workflows/skywalking.yaml index 05adfffa9e65..29102a7fc6e0 100644 --- a/.github/workflows/skywalking.yaml +++ b/.github/workflows/skywalking.yaml @@ -122,7 +122,7 @@ jobs: persist-credentials: false - name: Filter id: filter - uses: dorny/paths-filter@de90cc6fb38fc0963ad72b210f1f284cd68cea36 + uses: dorny/paths-filter@fbd0ab8f3e69293af611ebaee6363fc25e6d187d # v4.0.1 with: list-files: 'shell' predicate-quantifier: 'every' @@ -765,7 +765,7 @@ jobs: name: dist path: dist - name: Login to ghcr - uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 + uses: docker/login-action@650006c6eb7dba73a995cc03b0b2d7f5ca915bee # v4.2.0 with: registry: ghcr.io username: ${{ github.repository_owner }} @@ -854,7 +854,7 @@ jobs: find docker-images -name "*.tar" -exec docker load -i {} \; find docker-images -name "*.tar" -exec rm {} \; - name: Login to ghcr - uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 + uses: docker/login-action@650006c6eb7dba73a995cc03b0b2d7f5ca915bee # v4.2.0 with: registry: ghcr.io username: ${{ github.repository_owner }} @@ -926,7 +926,7 @@ jobs: find docker-images -name "*.tar" -exec docker load -i {} \; find docker-images -name "*.tar" -exec rm {} \; - name: Login to ghcr - uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 + uses: docker/login-action@650006c6eb7dba73a995cc03b0b2d7f5ca915bee # v4.2.0 with: registry: ghcr.io username: ${{ github.repository_owner }} @@ -1042,7 +1042,7 @@ jobs: name: dist path: dist - name: Login to ghcr - uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 + uses: docker/login-action@650006c6eb7dba73a995cc03b0b2d7f5ca915bee # v4.2.0 with: registry: ghcr.io username: ${{ github.repository_owner }} From 0032d96495e1241c9478f11f0df784a798027894 Mon Sep 17 00:00:00 2001 From: Wu Sheng Date: Sun, 14 Jun 2026 23:29:22 +0800 Subject: [PATCH 3/3] Fix MALExpressionExecutionTest isolation after CounterWindow key change MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The v2 MAL CounterWindow collision fix re-keyed rate()/increase() windows on each counter's own sample name instead of the rule-level context.metricName. MALExpressionExecutionTest relied on context.metricName (set to a unique sourceFile/metricName) to keep each rule's prime/real pair isolated in the process-wide CounterWindow.INSTANCE singleton — the new keying ignores that field, so leftover samples from one rule leaked into the next across the ~1350 sequential dynamic tests, producing wrong/negative deltas (e.g. 8.333 = 50/6, a lower bound pulled from an earlier rule). Reset CounterWindow.INSTANCE per rule (the pattern BanyanDBErrorRateReproTest already uses via @BeforeEach) and drop the now-dead setMetricName scaffolding (context.metricName has no readers after the keying change). No production code or expected values changed; 1350/1350 tests pass. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../mal/MALExpressionExecutionTest.java | 26 +++++++------------ 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/oap-server/analyzer/meter-analyzer-scripts-test/src/test/java/org/apache/skywalking/oap/server/dsl/tester/mal/MALExpressionExecutionTest.java b/oap-server/analyzer/meter-analyzer-scripts-test/src/test/java/org/apache/skywalking/oap/server/dsl/tester/mal/MALExpressionExecutionTest.java index 1d0f22a1ec18..296be78fb5b3 100644 --- a/oap-server/analyzer/meter-analyzer-scripts-test/src/test/java/org/apache/skywalking/oap/server/dsl/tester/mal/MALExpressionExecutionTest.java +++ b/oap-server/analyzer/meter-analyzer-scripts-test/src/test/java/org/apache/skywalking/oap/server/dsl/tester/mal/MALExpressionExecutionTest.java @@ -198,33 +198,27 @@ private void executeWithInput( final Map inputSection, final Map expectedSection) { final String metricName = rule.getName(); - // Unique per file+rule to isolate CounterWindow entries across files - final String cwMetricName = rule.getSourceFile().getName() + "/" + metricName; final String expression = rule.getFullExpression(); final boolean hasIncrease = expression.contains(".increase(") || expression.contains(".rate("); - // v2 prime + v2 real (also consecutive, same delta) + // rate()/increase() resolve their lower bound from the process-wide + // CounterWindow.INSTANCE, keyed by each counter's own (name, labels) — not + // the rule-level metric name. Input counter names recur across rules and + // files, so without a reset one rule's prime/real pair would rate against + // another rule's leftover window samples. Clear it so each rule is isolated + // to its own prime (t0) + real (t0+2s) pair. + org.apache.skywalking.oap.meter.analyzer.v2.dsl.counter.CounterWindow.INSTANCE.reset(); + + // v2 prime + v2 real (consecutive scrapes 2 s apart, same delta) final Map v2Data; if (hasIncrease) { try { - final Map primeData = - buildV2MockDataFromInput(inputSection, 0.5); - for (final org.apache.skywalking.oap.meter.analyzer.v2.dsl.SampleFamily s : primeData.values()) { - if (s != org.apache.skywalking.oap.meter.analyzer.v2.dsl.SampleFamily.EMPTY) { - s.context.setMetricName(cwMetricName); - } - } - v2MalExpr.run(primeData); + v2MalExpr.run(buildV2MockDataFromInput(inputSection, 0.5)); } catch (Exception ignored) { } } v2Data = buildV2MockDataFromInput(inputSection, 1.0); - for (final org.apache.skywalking.oap.meter.analyzer.v2.dsl.SampleFamily s : v2Data.values()) { - if (s != org.apache.skywalking.oap.meter.analyzer.v2.dsl.SampleFamily.EMPTY) { - s.context.setMetricName(cwMetricName); - } - } // V2 run org.apache.skywalking.oap.meter.analyzer.v2.dsl.SampleFamily v2Sf;