Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@
import org.apache.camel.spi.Configurer;
import org.apache.camel.spi.annotations.JdkService;
import org.apache.camel.support.CamelContextHelper;
import org.apache.camel.telemetry.Span;
import org.apache.camel.telemetry.SpanContextPropagationExtractor;
import org.apache.camel.telemetry.SpanContextPropagationInjector;
import org.apache.camel.telemetry.SpanLifecycleManager;
import org.apache.camel.telemetry.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -126,7 +123,9 @@ private MicrometerObservabilitySpanLifecycleManager() {
}

@Override
public Span create(String spanName, Span parent, SpanContextPropagationExtractor extractor) {
public Span create(
String spanName, SpanKind kind, Span parent,
SpanContextPropagationExtractor extractor) {
io.micrometer.tracing.Span span;
if (parent != null) {
MicrometerObservabilitySpanAdapter microObsParentSpan = (MicrometerObservabilitySpanAdapter) parent;
Expand All @@ -139,6 +138,9 @@ public Span create(String spanName, Span parent, SpanContextPropagationExtractor
span = builder.start();
}
span.name(spanName);
// Note: Micrometer determines span kind through Observation context types,
// not through direct span.kind() calls. The kind parameter is accepted
// for API compatibility but not used in this implementation.

return new MicrometerObservabilitySpanAdapter(span);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@
import org.apache.camel.spi.Configurer;
import org.apache.camel.spi.annotations.JdkService;
import org.apache.camel.support.CamelContextHelper;
import org.apache.camel.telemetry.Span;
import org.apache.camel.telemetry.SpanContextPropagationExtractor;
import org.apache.camel.telemetry.SpanContextPropagationInjector;
import org.apache.camel.telemetry.SpanLifecycleManager;
import org.apache.camel.telemetry.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -96,8 +93,11 @@ private OpentelemetrySpanLifecycleManager(Tracer tracer, ContextPropagators cont
}

@Override
public Span create(String spanName, Span parent, SpanContextPropagationExtractor extractor) {
public Span create(
String spanName, SpanKind kind, Span parent,
SpanContextPropagationExtractor extractor) {
SpanBuilder builder = tracer.spanBuilder(spanName);
builder = builder.setSpanKind(mapToSpanKind(kind));
Baggage baggage = null;

if (parent != null) {
Expand Down Expand Up @@ -171,6 +171,16 @@ public void inject(Span span, SpanContextPropagationInjector injector, boolean i
}
}

private io.opentelemetry.api.trace.SpanKind mapToSpanKind(org.apache.camel.telemetry.SpanKind kind) {
return switch (kind) {
case CLIENT -> io.opentelemetry.api.trace.SpanKind.CLIENT;
case SERVER -> io.opentelemetry.api.trace.SpanKind.SERVER;
case PRODUCER -> io.opentelemetry.api.trace.SpanKind.PRODUCER;
case CONSUMER -> io.opentelemetry.api.trace.SpanKind.CONSUMER;
case INTERNAL -> io.opentelemetry.api.trace.SpanKind.INTERNAL;
};
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.camel.telemetry.Span;
import org.apache.camel.telemetry.SpanContextPropagationExtractor;
import org.apache.camel.telemetry.SpanContextPropagationInjector;
import org.apache.camel.telemetry.SpanKind;
import org.apache.camel.telemetry.SpanLifecycleManager;
import org.apache.camel.telemetry.Tracer;
import org.slf4j.Logger;
Expand Down Expand Up @@ -87,7 +88,9 @@ private DevSpanLifecycleManager(String traceFormat) {
}

@Override
public Span create(String spanName, Span parent, SpanContextPropagationExtractor extractor) {
public Span create(
String spanName, SpanKind kind, Span parent,
SpanContextPropagationExtractor extractor) {
Span span = DevSpanAdapter.buildSpan(spanName);
String traceId = UUID.randomUUID().toString().replaceAll("-", "");
if (parent != null) {
Expand All @@ -101,6 +104,7 @@ public Span create(String spanName, Span parent, SpanContextPropagationExtractor
LOG.error("TRACE ERROR: wrong format, could not split traceparent {}", upstreamTraceParent);
span.setTag("traceid", traceId);
span.setTag("spanid", UUID.randomUUID().toString().replaceAll("-", ""));
span.setTag("kind", kind.toString());
return span;
}
traceId = split[0];
Expand All @@ -110,6 +114,7 @@ public Span create(String spanName, Span parent, SpanContextPropagationExtractor
}
span.setTag("traceid", traceId);
span.setTag("spanid", UUID.randomUUID().toString().replaceAll("-", ""));
span.setTag("kind", kind.toString());
return span;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.camel.telemetrydev;

import java.io.IOException;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -47,9 +46,9 @@ protected CamelContext createCamelContext() throws Exception {
}

@Test
void testProcessorsTraceRequest() throws IOException {
void testProcessorsTraceRequest() {
template.sendBody("direct:start", "my-body");
Map<String, DevTrace> traces = tracesFromLog();
Map<String, DevTrace> traces = awaitTracesFromLog(1);
assertEquals(1, traces.size());
checkTrace(traces.values().iterator().next());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.camel.telemetrydev;

import java.io.IOException;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -46,9 +45,9 @@ protected CamelContext createCamelContext() throws Exception {
}

@Test
void testProcessorsTraceRequest() throws IOException {
void testProcessorsTraceRequest() {
template.sendBody("direct:start", "my-body");
Map<String, DevTrace> traces = tracesFromLog();
Map<String, DevTrace> traces = awaitTracesFromLog(1);
assertEquals(1, traces.size());
checkTrace(traces.values().iterator().next());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.camel.telemetrydev;

import java.io.IOException;
import java.util.Map;

import org.apache.camel.CamelContext;
Expand Down Expand Up @@ -44,11 +43,11 @@ protected CamelContext createCamelContext() throws Exception {
}

@Test
void testProcessorsTraceRequest() throws InterruptedException, IOException {
void testProcessorsTraceRequest() throws InterruptedException {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMessageCount(1);
template.sendBody("direct:start", "my-body");
Map<String, DevTrace> traces = tracesFromLog();
Map<String, DevTrace> traces = awaitTracesFromLog(1);
assertEquals(1, traces.size());
mock.assertIsSatisfied();
Map<String, Object> headers = mock.getExchanges().get(0).getIn().getHeaders();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.camel.telemetrydev;

import java.io.IOException;
import java.util.List;
import java.util.Map;

Expand All @@ -41,10 +40,10 @@ protected CamelContext createCamelContext() throws Exception {
}

@Test
void testPropagateUpstreamTraceRequest() throws IOException {
void testPropagateUpstreamTraceRequest() {
template.requestBodyAndHeader("direct:start", "sample body",
"traceparent", "123456789-123456");
Map<String, DevTrace> traces = tracesFromLog();
Map<String, DevTrace> traces = awaitTracesFromLog(1);
assertEquals(1, traces.size());
checkTrace(traces.values().iterator().next());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.camel.telemetrydev;

import java.io.IOException;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -45,21 +44,21 @@ protected CamelContext createCamelContext() throws Exception {
}

@Test
void testRouteSingleRequest() throws IOException {
void testRouteSingleRequest() {
Exchange result = template.request("direct:start", null);
// Make sure the trace is propagated downstream
assertNotNull(result.getIn().getHeader("traceparent"));
Map<String, DevTrace> traces = tracesFromLog();
Map<String, DevTrace> traces = awaitTracesFromLog(1);
assertEquals(1, traces.size());
checkTrace(traces.values().iterator().next(), null);
}

@Test
void testRouteMultipleRequests() throws IOException {
void testRouteMultipleRequests() {
for (int i = 1; i <= 10; i++) {
context.createProducerTemplate().sendBody("direct:start", "Hello!");
}
Map<String, DevTrace> traces = tracesFromLog();
Map<String, DevTrace> traces = awaitTracesFromLog(10);
// Each trace should have a unique trace id. It is enough to assert that
// the number of elements in the map is the same of the requests to prove
// all traces have been generated uniquely.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.camel.telemetry.Op;
Expand All @@ -32,14 +33,32 @@
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.appender.RollingFileAppender;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

import static org.awaitility.Awaitility.await;

public class TelemetryDevTracerTestSupport extends ExchangeTestSupport {

private final ObjectMapper mapper = new ObjectMapper();

/*
* Clear the log traces before each test to prevent flaky tests from leftover data
*/
@BeforeEach
public synchronized void clearLogTracesBeforeTest() throws IOException {
final LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
RollingFileAppender appender = (RollingFileAppender) ctx.getConfiguration().getAppenders().get("file2");
if (appender != null) {
appender.getManager().rollover();
}
}

protected Map<String, DevTrace> tracesFromLog() throws IOException {
Map<String, DevTrace> answer = new HashMap<>();
Path path = Paths.get("target/telemetry-traces.log");
if (!Files.exists(path)) {
return answer;
}
List<String> allTraces = Files.readAllLines(path);
for (String trace : allTraces) {
DevTrace st = mapper.readValue(trace, DevTrace.class);
Expand All @@ -57,6 +76,28 @@ protected Map<String, DevTrace> tracesFromLog() throws IOException {
return answer;
}

/**
* Wait for the expected number of traces to be written to the log file. Uses Awaitility to avoid flaky tests from
* timing issues.
*/
protected Map<String, DevTrace> awaitTracesFromLog(int expectedCount) {
await().atMost(10, TimeUnit.SECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
.until(() -> {
try {
Map<String, DevTrace> traces = tracesFromLog();
return traces.size() >= expectedCount;
} catch (IOException e) {
return false;
}
});
try {
return tracesFromLog();
} catch (IOException e) {
throw new RuntimeException("Failed to read traces from log", e);
}
}

/*
* This one is required to rollover the log traces database file and make sure each test has its own
* set of fresh data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,12 @@ public interface SpanDecorator {

SpanContextPropagationInjector getInjector(Exchange exchange);

/**
* This method returns the SpanKind for a given operation.
*
* @param op The operation type
* @return The span kind to use for this operation
*/
SpanKind getSpanKind(Op op);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.telemetry;

/**
* Span kind constants for telemetry tracing.
* <p>
* These values describe the relationship between the span and its parent:
* <ul>
* <li>CLIENT - The span covers a client-side call to a remote service</li>
* <li>SERVER - The span covers server-side handling of a remote request</li>
* <li>PRODUCER - The span covers the production of a message to a remote system (e.g., message broker, queue, HTTP
* endpoint)</li>
* <li>CONSUMER - The span covers the consumption of a message from a remote system (e.g., message broker, queue, HTTP
* endpoint)</li>
* <li>INTERNAL - The span represents internal operations with no remote interaction</li>
* </ul>
*/
public enum SpanKind {
CLIENT,
SERVER,
PRODUCER,
CONSUMER,
INTERNAL
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*/
public interface SpanLifecycleManager {

Span create(String spanName, Span parent, SpanContextPropagationExtractor extractor);
Span create(String spanName, SpanKind kind, Span parent, SpanContextPropagationExtractor extractor);

void activate(Span span);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ protected void beginEventSpan(Exchange exchange, Endpoint endpoint, Op op) throw
SpanDecorator spanDecorator = spanDecoratorManager.get(endpoint);
Span parentSpan = spanStorageManager.peek(exchange);
String spanName = spanDecorator.getOperationName(exchange, endpoint);
Span span = spanLifecycleManager.create(spanName, parentSpan, spanDecorator.getExtractor(exchange));
SpanKind spanKind = spanDecorator.getSpanKind(op);
Span span = spanLifecycleManager.create(spanName, spanKind, parentSpan, spanDecorator.getExtractor(exchange));
span.setTag(TagConstants.OP, op.toString());
spanDecorator.beforeTracingEvent(span, exchange, endpoint);
spanLifecycleManager.activate(span);
Expand All @@ -286,7 +287,8 @@ protected void beginProcessorSpan(Exchange exchange, String processorName) throw
// there is some inconsistency
LOG.warn("Processor tracing parent should not be null!");
}
Span span = spanLifecycleManager.create(processorName, parentSpan, spanDecorator.getExtractor(exchange));
SpanKind spanKind = spanDecorator.getSpanKind(Op.EVENT_PROCESS);
Span span = spanLifecycleManager.create(processorName, spanKind, parentSpan, spanDecorator.getExtractor(exchange));
span.setTag(TagConstants.OP, Op.EVENT_PROCESS.toString());
spanDecorator.beforeTracingEvent(span, exchange, null);
spanLifecycleManager.activate(span);
Expand Down
Loading
Loading