Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,38 +16,82 @@
*/
package org.apache.camel.opentelemetry2;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import io.opentelemetry.api.baggage.Baggage;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import org.apache.camel.telemetry.Op;
import org.apache.camel.telemetry.TagConstants;

public class OpenTelemetrySpanAdapter implements org.apache.camel.telemetry.Span {

private static final String DEFAULT_EVENT_NAME = "log";
static final String BAGGAGE_CAMEL_FLAG = "camelScope";

private final Span otelSpan;
private Span otelSpan;
private final Baggage baggage;
private Scope scope;
private Scope baggageScope;

protected OpenTelemetrySpanAdapter(Span otelSpan, Baggage baggage) {
this.otelSpan = otelSpan;
// We store an important flag in the baggage in order to verify if the
// root span was generated internally or from a third party dependency.
this.baggage = baggage.toBuilder().put(BAGGAGE_CAMEL_FLAG, "true").build();
// For deferred span creation
private SpanBuilder spanBuilder;
private final Map<String, String> pendingTags = new HashMap<>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should these map/list be concurrent variants in case multi threads calls at the same time

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discussed the changes with @squakez and I'll open the new PR which does not need all this code for defferring span creation

private final List<Map<String, String>> pendingLogs = new ArrayList<>();
private boolean pendingError;
private String pendingComponent;

protected OpenTelemetrySpanAdapter(SpanBuilder spanBuilder, Baggage baggage) {
this.spanBuilder = spanBuilder;
this.baggage = baggage != null
? baggage.toBuilder().put(BAGGAGE_CAMEL_FLAG, "true").build()
: Baggage.current().toBuilder().put(BAGGAGE_CAMEL_FLAG, "true").build();
}

protected Span getSpan() {
return this.otelSpan;
}

protected void makeCurrent() {
// Start the span if it was deferred
if (spanBuilder != null) {
// Determine and apply span kind
try {
spanBuilder.setSpanKind(SpanKind.valueOf(determineKind()));
} catch (IllegalArgumentException e) {
// Invalid kind, use default
spanBuilder.setSpanKind(SpanKind.INTERNAL);
}
// Start the span
this.otelSpan = spanBuilder.startSpan();
this.spanBuilder = null;

// Apply pending operations
if (pendingComponent != null) {
this.otelSpan.setAttribute(TagConstants.COMPONENT, pendingComponent);
}
for (Map.Entry<String, String> entry : pendingTags.entrySet()) {
this.otelSpan.setAttribute(entry.getKey(), entry.getValue());
}
for (Map<String, String> logFields : pendingLogs) {
this.otelSpan.addEvent(getEventNameFromFields(logFields), convertToAttributes(logFields));
}
if (pendingError) {
this.otelSpan.setStatus(StatusCode.ERROR);
}

// Clear pending state
pendingTags.clear();
pendingLogs.clear();
}
this.scope = this.otelSpan.makeCurrent();
this.baggageScope = this.baggage.makeCurrent();
}
Expand All @@ -69,25 +113,58 @@ protected Baggage getBaggage() {
return this.baggage;
}

private String determineKind() {
var kind = pendingTags.get("kind");
if (kind != null) {
return kind;
}
var operation = pendingTags.get(TagConstants.OP);
boolean isMessaging = pendingTags.containsKey(TagConstants.MESSAGE_BUS_DESTINATION);
boolean isHttp = pendingTags.containsKey(TagConstants.HTTP_METHOD);
return switch (Op.valueOf(operation)) {
case EVENT_RECEIVED -> isMessaging ? "CONSUMER" : isHttp ? "SERVER" : "INTERNAL";
case EVENT_SENT -> isMessaging ? "PRODUCER" : isHttp ? "CLIENT" : "INTERNAL";
default -> "INTERNAL";
};

}

@Override
public void log(Map<String, String> fields) {
this.otelSpan.addEvent(getEventNameFromFields(fields), convertToAttributes(fields));
if (otelSpan != null) {
this.otelSpan.addEvent(getEventNameFromFields(fields), convertToAttributes(fields));
} else {
pendingLogs.add(new HashMap<>(fields));
}
}

@Override
public void setTag(String key, String value) {
this.otelSpan.setAttribute(key, value);
if (otelSpan != null) {
this.otelSpan.setAttribute(key, value);
} else {
pendingTags.put(key, value);
}
}

@Override
public void setComponent(String component) {
this.setTag(TagConstants.COMPONENT, component);
if (otelSpan != null) {
this.otelSpan.setAttribute(TagConstants.COMPONENT, component);
} else {
pendingComponent = component;
}
}

@Override
public void setError(boolean isError) {
this.setTag(TagConstants.ERROR, "" + isError);
this.otelSpan.setStatus(isError ? StatusCode.ERROR : StatusCode.OK);
if (otelSpan != null) {
this.otelSpan.setAttribute(TagConstants.ERROR, "" + isError);
this.otelSpan.setStatus(isError ? StatusCode.ERROR : StatusCode.OK);
} else {
pendingTags.put(TagConstants.ERROR, "" + isError);
pendingError = isError;
}
}

private String getEventNameFromFields(Map<String, ?> fields) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public String get(SpanContextPropagationExtractor carrier, String key) {
baggage = Baggage.fromContext(ctx);
}

return new OpenTelemetrySpanAdapter(builder.startSpan(), baggage);
return new OpenTelemetrySpanAdapter(builder, baggage);
}

@Override
Expand Down
Loading
Loading