Skip to content

Commit

Permalink
Map x-ray key/value pairs to baggage. (#3106)
Browse files Browse the repository at this point in the history
* Map x-ray key/value pairs to baggage.

* Ignore known keys in baggage

* 256 char limit
  • Loading branch information
Anuraag Agrawal authored Apr 7, 2021
1 parent 9a330d0 commit 768bde8
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package io.opentelemetry.extension.aws;

import io.opentelemetry.api.baggage.Baggage;
import io.opentelemetry.api.baggage.BaggageBuilder;
import io.opentelemetry.api.baggage.BaggageEntry;
import io.opentelemetry.api.internal.StringUtils;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
Expand All @@ -18,6 +21,7 @@
import io.opentelemetry.context.propagation.TextMapSetter;
import java.util.Collection;
import java.util.Collections;
import java.util.function.BiConsumer;
import java.util.logging.Logger;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -107,19 +111,49 @@ public <C> void inject(Context context, @Nullable C carrier, TextMapSetter<C> se
char samplingFlag = spanContext.isSampled() ? IS_SAMPLED : NOT_SAMPLED;
// TODO: Add OT trace state to the X-Ray trace header

String traceHeader =
TRACE_ID_KEY
+ KV_DELIMITER
+ xrayTraceId
+ TRACE_HEADER_DELIMITER
+ PARENT_ID_KEY
+ KV_DELIMITER
+ parentId
+ TRACE_HEADER_DELIMITER
+ SAMPLED_FLAG_KEY
+ KV_DELIMITER
+ samplingFlag;
setter.set(carrier, TRACE_HEADER_KEY, traceHeader);
StringBuilder traceHeader = new StringBuilder();
traceHeader
.append(TRACE_ID_KEY)
.append(KV_DELIMITER)
.append(xrayTraceId)
.append(TRACE_HEADER_DELIMITER)
.append(PARENT_ID_KEY)
.append(KV_DELIMITER)
.append(parentId)
.append(TRACE_HEADER_DELIMITER)
.append(SAMPLED_FLAG_KEY)
.append(KV_DELIMITER)
.append(samplingFlag);

Baggage baggage = Baggage.fromContext(context);
// Truncate baggage to 256 chars per X-Ray spec.
baggage.forEach(
new BiConsumer<String, BaggageEntry>() {

private int baggageWrittenBytes;

@Override
public void accept(String key, BaggageEntry entry) {
if (key.equals(TRACE_ID_KEY)
|| key.equals(PARENT_ID_KEY)
|| key.equals(SAMPLED_FLAG_KEY)) {
return;
}
// Size is key/value pair, excludes delimiter.
int size = key.length() + entry.getValue().length() + 1;
if (baggageWrittenBytes + size > 256) {
return;
}
traceHeader
.append(TRACE_HEADER_DELIMITER)
.append(key)
.append(KV_DELIMITER)
.append(entry.getValue());
baggageWrittenBytes += size;
}
});

setter.set(carrier, TRACE_HEADER_KEY, traceHeader.toString());
}

@Override
Expand All @@ -131,25 +165,23 @@ public <C> Context extract(Context context, @Nullable C carrier, TextMapGetter<C
return context;
}

SpanContext spanContext = getSpanContextFromHeader(carrier, getter);
if (!spanContext.isValid()) {
return context;
}

return context.with(Span.wrap(spanContext));
return getContextFromHeader(context, carrier, getter);
}

private static <C> SpanContext getSpanContextFromHeader(
@Nullable C carrier, TextMapGetter<C> getter) {
private static <C> Context getContextFromHeader(
Context context, @Nullable C carrier, TextMapGetter<C> getter) {
String traceHeader = getter.get(carrier, TRACE_HEADER_KEY);
if (traceHeader == null || traceHeader.isEmpty()) {
return SpanContext.getInvalid();
return context;
}

String traceId = TraceId.getInvalid();
String spanId = SpanId.getInvalid();
Boolean isSampled = false;

BaggageBuilder baggage = null;
int baggageReadBytes = 0;

int pos = 0;
while (pos < traceHeader.length()) {
int delimiterIndex = traceHeader.indexOf(TRACE_HEADER_DELIMITER, pos);
Expand All @@ -165,11 +197,8 @@ private static <C> SpanContext getSpanContextFromHeader(
String trimmedPart = part.trim();
int equalsIndex = trimmedPart.indexOf(KV_DELIMITER);
if (equalsIndex < 0) {
logger.fine(
"Error parsing X-Ray trace header. Invalid key value pair: "
+ part
+ " Returning INVALID span context.");
return SpanContext.getInvalid();
logger.fine("Error parsing X-Ray trace header. Invalid key value pair: " + part);
return context;
}

String value = trimmedPart.substring(equalsIndex + 1);
Expand All @@ -180,24 +209,36 @@ private static <C> SpanContext getSpanContextFromHeader(
spanId = parseSpanId(value);
} else if (trimmedPart.startsWith(SAMPLED_FLAG_KEY)) {
isSampled = parseTraceFlag(value);
} else if (baggageReadBytes + trimmedPart.length() <= 256) {
if (baggage == null) {
baggage = Baggage.builder();
}
baggage.put(trimmedPart.substring(0, equalsIndex), value);
baggageReadBytes += trimmedPart.length();
}
// TODO: Put the arbitrary TraceHeader keys in OT trace state
}
if (isSampled == null) {
logger.fine(
"Invalid Sampling flag in X-Ray trace header: '"
+ TRACE_HEADER_KEY
+ "' with value "
+ traceHeader
+ "'. Returning INVALID span context.");
return SpanContext.getInvalid();
+ "'.");
return context;
}

return SpanContext.createFromRemoteParent(
StringUtils.padLeft(traceId, TraceId.getLength()),
spanId,
isSampled ? TraceFlags.getSampled() : TraceFlags.getDefault(),
TraceState.getDefault());
SpanContext spanContext =
SpanContext.createFromRemoteParent(
StringUtils.padLeft(traceId, TraceId.getLength()),
spanId,
isSampled ? TraceFlags.getSampled() : TraceFlags.getDefault(),
TraceState.getDefault());

context = context.with(Span.wrap(spanContext));
if (baggage != null) {
context = context.with(baggage.build());
}
return context;
}

private static String parseTraceId(String xrayTraceId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static io.opentelemetry.extension.aws.AwsXrayPropagator.TRACE_HEADER_KEY;
import static org.assertj.core.api.Assertions.assertThat;

import io.opentelemetry.api.baggage.Baggage;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.TraceFlags;
Expand All @@ -18,6 +19,8 @@
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -74,6 +77,63 @@ void inject_NotSampledContext() {
"Root=1-8a3c60f7-d188f8fa79d48a391a778fa6;Parent=53995c3f42cd8ad8;Sampled=0");
}

@Test
void inject_WithBaggage() {
Map<String, String> carrier = new LinkedHashMap<>();
xrayPropagator.inject(
withSpanContext(
SpanContext.create(
TRACE_ID, SPAN_ID, TraceFlags.getDefault(), TraceState.getDefault()),
Context.current())
.with(
Baggage.builder()
.put("cat", "meow")
.put("dog", "bark")
.put("Root", "ignored")
.put("Parent", "ignored")
.put("Sampled", "ignored")
.build()),
carrier,
setter);

assertThat(carrier)
.containsEntry(
TRACE_HEADER_KEY,
"Root=1-8a3c60f7-d188f8fa79d48a391a778fa6;Parent=53995c3f42cd8ad8;Sampled=0;"
+ "cat=meow;dog=bark");
}

@Test
void inject_WithBaggage_LimitTruncates() {
Map<String, String> carrier = new LinkedHashMap<>();
// Limit is 256 characters for all baggage. We add a 254-character key/value pair and a
// 3 character key value pair.
String key1 = Stream.generate(() -> "a").limit(252).collect(Collectors.joining());
String value1 = "a"; // 252 + 1 (=) + 1 = 254

String key2 = "b";
String value2 = "b"; // 1 + 1 (=) + 1 = 3

Baggage baggage = Baggage.builder().put(key1, value1).put(key2, value2).build();

xrayPropagator.inject(
withSpanContext(
SpanContext.create(
TRACE_ID, SPAN_ID, TraceFlags.getDefault(), TraceState.getDefault()),
Context.current())
.with(baggage),
carrier,
setter);

assertThat(carrier)
.containsEntry(
TRACE_HEADER_KEY,
"Root=1-8a3c60f7-d188f8fa79d48a391a778fa6;Parent=53995c3f42cd8ad8;Sampled=0;"
+ key1
+ '='
+ value1);
}

@Test
void inject_WithTraceState() {
Map<String, String> carrier = new LinkedHashMap<>();
Expand All @@ -88,7 +148,8 @@ void inject_WithTraceState() {
carrier,
setter);

// TODO: assert trace state when the propagator supports it
// TODO: assert trace state when the propagator supports it, for general key/value pairs we are
// mapping with baggage.
assertThat(carrier)
.containsEntry(
TRACE_HEADER_KEY,
Expand Down Expand Up @@ -168,11 +229,43 @@ void extract_AdditionalFields() {
TRACE_HEADER_KEY,
"Root=1-8a3c60f7-d188f8fa79d48a391a778fa6;Parent=53995c3f42cd8ad8;Sampled=1;Foo=Bar");

// TODO: assert additional fields when the propagator supports it
assertThat(getSpanContext(xrayPropagator.extract(Context.current(), carrier, getter)))
Context context = xrayPropagator.extract(Context.current(), carrier, getter);
assertThat(getSpanContext(context))
.isEqualTo(
SpanContext.createFromRemoteParent(
TRACE_ID, SPAN_ID, TraceFlags.getSampled(), TraceState.getDefault()));
assertThat(Baggage.fromContext(context).getEntryValue("Foo")).isEqualTo("Bar");
}

@Test
void extract_Baggage_LimitTruncates() {
// Limit is 256 characters for all baggage. We add a 254-character key/value pair and a
// 3 character key value pair.
String key1 = Stream.generate(() -> "a").limit(252).collect(Collectors.joining());
String value1 = "a"; // 252 + 1 (=) + 1 = 254

String key2 = "b";
String value2 = "b"; // 1 + 1 (=) + 1 = 3

Map<String, String> carrier = new LinkedHashMap<>();
carrier.put(
TRACE_HEADER_KEY,
"Root=1-8a3c60f7-d188f8fa79d48a391a778fa6;Parent=53995c3f42cd8ad8;Sampled=1;"
+ key1
+ '='
+ value1
+ ';'
+ key2
+ '='
+ value2);

Context context = xrayPropagator.extract(Context.current(), carrier, getter);
assertThat(getSpanContext(context))
.isEqualTo(
SpanContext.createFromRemoteParent(
TRACE_ID, SPAN_ID, TraceFlags.getSampled(), TraceState.getDefault()));
assertThat(Baggage.fromContext(context).getEntryValue(key1)).isEqualTo(value1);
assertThat(Baggage.fromContext(context).getEntryValue(key2)).isNull();
}

@Test
Expand Down

0 comments on commit 768bde8

Please sign in to comment.