Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tracing support for Service Bus processor #17684

Merged
merged 4 commits into from
Nov 20, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -646,9 +646,10 @@ public final class ServiceBusSessionProcessorClientBuilder {

private ServiceBusSessionProcessorClientBuilder() {
sessionReceiverClientBuilder = new ServiceBusSessionReceiverClientBuilder();
processorClientOptions = new ServiceBusProcessorClientOptions();
processorClientOptions = new ServiceBusProcessorClientOptions()
.setMaxConcurrentCalls(1)
.setTracerProvider(tracerProvider);
sessionReceiverClientBuilder.maxConcurrentSessions(1);
processorClientOptions.setMaxConcurrentCalls(1);
}

/**
Expand Down Expand Up @@ -1100,7 +1101,9 @@ public final class ServiceBusProcessorClientBuilder {

private ServiceBusProcessorClientBuilder() {
serviceBusReceiverClientBuilder = new ServiceBusReceiverClientBuilder();
processorClientOptions = new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(1);
processorClientOptions = new ServiceBusProcessorClientOptions()
.setMaxConcurrentCalls(1)
.setTracerProvider(tracerProvider);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,38 @@

package com.azure.messaging.servicebus;

import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.ProcessKind;
import com.azure.messaging.servicebus.implementation.models.ServiceBusProcessorClientOptions;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Signal;
import reactor.core.scheduler.Schedulers;

import java.io.Closeable;
import java.io.IOException;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY;
import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY;
import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY;
import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY;
import static com.azure.core.util.tracing.Tracer.MESSAGE_ENQUEUED_TIME;
import static com.azure.core.util.tracing.Tracer.SCOPE_KEY;
import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY;
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_NAMESPACE_VALUE;
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_SERVICE_NAME;

/**
* The processor client for processing Service Bus messages. {@link ServiceBusProcessorClient
* ServiceBusProcessorClients} provides a push-based mechanism that invokes the message processing callback when a
Expand Down Expand Up @@ -44,6 +62,7 @@ public final class ServiceBusProcessorClient implements AutoCloseable {
private final AtomicReference<Subscription> receiverSubscription = new AtomicReference<>();
private final AtomicReference<ServiceBusReceiverAsyncClient> asyncClient = new AtomicReference<>();
private final AtomicBoolean isRunning = new AtomicBoolean();
private final TracerProvider tracerProvider;
private ScheduledExecutorService scheduledExecutor;

/**
Expand All @@ -65,6 +84,7 @@ public final class ServiceBusProcessorClient implements AutoCloseable {
this.processorOptions = Objects.requireNonNull(processorOptions, "'processorOptions' cannot be null");
this.asyncClient.set(sessionReceiverBuilder.buildAsyncClientForProcessor());
this.receiverBuilder = null;
this.tracerProvider = processorOptions.getTracerProvider();
}

/**
Expand All @@ -84,6 +104,7 @@ public final class ServiceBusProcessorClient implements AutoCloseable {
this.processorOptions = Objects.requireNonNull(processorOptions, "'processorOptions' cannot be null");
this.asyncClient.set(receiverBuilder.buildAsyncClient());
this.sessionReceiverBuilder = null;
this.tracerProvider = processorOptions.getTracerProvider();
}

/**
Expand Down Expand Up @@ -164,12 +185,22 @@ public void onNext(ServiceBusMessageContext serviceBusMessageContext) {
if (serviceBusMessageContext.hasError()) {
handleError(serviceBusMessageContext.getThrowable());
} else {
Context processSpanContext = null;
try {
ServiceBusReceivedMessageContext serviceBusReceivedMessageContext =
new ServiceBusReceivedMessageContext(receiverClient, serviceBusMessageContext);

processSpanContext =
startProcessTracingSpan(serviceBusMessageContext.getMessage(),
receiverClient.getEntityPath(), receiverClient.getFullyQualifiedNamespace());
if (processSpanContext.getData(SPAN_CONTEXT_KEY).isPresent()) {
serviceBusMessageContext.getMessage().addContext(SPAN_CONTEXT_KEY, processSpanContext);
}
processMessage.accept(serviceBusReceivedMessageContext);
endProcessTracingSpan(processSpanContext, Signal.complete());
} catch (Exception ex) {
handleError(new ServiceBusException(ex, ServiceBusErrorSource.USER_CALLBACK));
endProcessTracingSpan(processSpanContext, Signal.error(ex));
if (!processorOptions.isDisableAutoComplete()) {
logger.warning("Error when processing message. Abandoning message.", ex);
abandonMessage(serviceBusMessageContext, receiverClient);
Expand Down Expand Up @@ -201,6 +232,54 @@ public void onComplete() {
});
}

private void endProcessTracingSpan(Context processSpanContext, Signal<Void> signal) {
if (processSpanContext == null) {
return;
}

Optional<Object> spanScope = processSpanContext.getData(SCOPE_KEY);
// Disposes of the scope when the trace span closes.
if (!spanScope.isPresent() || !tracerProvider.isEnabled()) {
return;
}
if (spanScope.get() instanceof Closeable) {
Closeable close = (Closeable) processSpanContext.getData(SCOPE_KEY).get();
try {
close.close();
tracerProvider.endSpan(processSpanContext, signal);
} catch (IOException ioException) {
logger.error("endTracingSpan().close() failed with an error %s", ioException);
}

} else {
logger.warning(String.format(Locale.US,
"Process span scope type is not of type Closeable, but type: %s. Not closing the scope and span",
spanScope.get() != null ? spanScope.getClass() : "null"));
}
}

private Context startProcessTracingSpan(ServiceBusReceivedMessage receivedMessage, String entityPath,
srnagar marked this conversation as resolved.
Show resolved Hide resolved
String fullyQualifiedNamespace) {

Object diagnosticId = receivedMessage.getApplicationProperties().get(DIAGNOSTIC_ID_KEY);
if (diagnosticId == null || !tracerProvider.isEnabled()) {
return Context.NONE;
}

Context spanContext = tracerProvider.extractContext(diagnosticId.toString(), Context.NONE);

spanContext = spanContext
.addData(ENTITY_PATH_KEY, entityPath)
.addData(HOST_NAME_KEY, fullyQualifiedNamespace)
.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);
srnagar marked this conversation as resolved.
Show resolved Hide resolved
spanContext = receivedMessage.getEnqueuedTime() == null
? spanContext
: spanContext.addData(MESSAGE_ENQUEUED_TIME,
receivedMessage.getEnqueuedTime().toInstant().getEpochSecond());

return tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, spanContext, ProcessKind.PROCESS);
}

private void abandonMessage(ServiceBusMessageContext serviceBusMessageContext,
ServiceBusReceiverAsyncClient receiverClient) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.azure.core.amqp.models.AmqpMessageBodyType;
import com.azure.core.amqp.models.AmqpMessageId;
import com.azure.core.experimental.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;

Expand Down Expand Up @@ -40,6 +41,7 @@ public final class ServiceBusReceivedMessage {
private final AmqpAnnotatedMessage amqpAnnotatedMessage;
private UUID lockToken;
private boolean isSettled = false;
private Context context;

ServiceBusReceivedMessage(BinaryData body) {
Objects.requireNonNull(body, "'body' cannot be null.");
Expand Down Expand Up @@ -438,6 +440,22 @@ public String getTo() {
return to;
}

/**
* Adds a new key value pair to the existing context on Message.
*
* @param key The key for this context object
* @param value The value for this context object.
*
* @return The updated {@link ServiceBusMessage}.
* @throws NullPointerException if {@code key} or {@code value} is null.
*/
ServiceBusReceivedMessage addContext(String key, Object value) {
Objects.requireNonNull(key, "The 'key' parameter cannot be null.");
Objects.requireNonNull(value, "The 'value' parameter cannot be null.");
this.context = context.addData(key, value);
return this;
}

/**
* Gets whether the message has been settled.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.messaging.servicebus.implementation.models;

import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.annotation.Fluent;
import com.azure.messaging.servicebus.ServiceBusProcessorClient;

Expand All @@ -15,6 +16,8 @@ public final class ServiceBusProcessorClientOptions {
private int maxConcurrentCalls = 1;
private boolean disableAutoComplete;

private TracerProvider tracerProvider;

/**
* Returns true if the auto-complete and auto-abandon feature is disabled.
* @return true if the auto-complete and auto-abandon feature is disabled.
Expand Down Expand Up @@ -50,4 +53,24 @@ public ServiceBusProcessorClientOptions setMaxConcurrentCalls(int maxConcurrentC
this.maxConcurrentCalls = maxConcurrentCalls;
return this;
}

/**
* Returns the {@link TracerProvider} instance that is used in {@link ServiceBusProcessorClient}.
*
* @return The {@link TracerProvider} instance that is used in {@link ServiceBusProcessorClient}.
*/
public TracerProvider getTracerProvider() {
return tracerProvider;
}

/**
* Sets the {@link TracerProvider} instance to use in {@link ServiceBusProcessorClient}.
*
* @param tracerProvider The {@link TracerProvider} instance to use in {@link ServiceBusProcessorClient}.
* @return The updated instance of {@link ServiceBusProcessorClientOptions}.
*/
public ServiceBusProcessorClientOptions setTracerProvider(TracerProvider tracerProvider) {
this.tracerProvider = tracerProvider;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,38 @@

package com.azure.messaging.servicebus;

import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.experimental.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.tracing.ProcessKind;
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.servicebus.implementation.models.ServiceBusProcessorClientOptions;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

import java.io.Closeable;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY;
import static com.azure.core.util.tracing.Tracer.MESSAGE_ENQUEUED_TIME;
import static com.azure.core.util.tracing.Tracer.PARENT_SPAN_KEY;
import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -319,6 +332,66 @@ public void testUserMessageHandlerErrorWithAutoCompleteDisabled() throws Interru
verify(asyncClient, never()).abandon(any(ServiceBusReceivedMessage.class));
}

@Test
public void testProcessorWithTracingEnabled() throws InterruptedException {
final Tracer tracer = mock(Tracer.class);
final List<Tracer> tracers = Collections.singletonList(tracer);
TracerProvider tracerProvider = new TracerProvider(tracers);

String diagnosticId = "00-08ee063508037b1719dddcbf248e30e2-1365c684eb25daed-01";

when(tracer.extractContext(eq(diagnosticId), any())).thenAnswer(
invocation -> {
Context passed = invocation.getArgument(1, Context.class);
return passed.addData(SPAN_CONTEXT_KEY, "value");
}
);
when(tracer.start(eq("ServiceBus.process"), any(), eq(ProcessKind.PROCESS))).thenAnswer(
invocation -> {
Context passed = invocation.getArgument(1, Context.class);
assertTrue(passed.getData(MESSAGE_ENQUEUED_TIME).isPresent());
return passed.addData(SPAN_CONTEXT_KEY, "value1").addData("scope", (Closeable) () -> {
return;
}).addData(PARENT_SPAN_KEY, "value2");
}
);
Flux<ServiceBusMessageContext> messageFlux =
Flux.create(emitter -> {
for (int i = 0; i < 5; i++) {
ServiceBusReceivedMessage serviceBusReceivedMessage =
new ServiceBusReceivedMessage(BinaryData.fromString("hello"));
serviceBusReceivedMessage.setMessageId(String.valueOf(i));
serviceBusReceivedMessage.setEnqueuedTime(OffsetDateTime.now());
serviceBusReceivedMessage.getApplicationProperties().put(DIAGNOSTIC_ID_KEY, diagnosticId);
ServiceBusMessageContext serviceBusMessageContext =
new ServiceBusMessageContext(serviceBusReceivedMessage);
emitter.next(serviceBusMessageContext);
}
});

ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverBuilder = getBuilder(messageFlux);

AtomicInteger messageId = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(5);
ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder,
messageContext -> {
assertEquals(String.valueOf(messageId.getAndIncrement()), messageContext.getMessage().getMessageId());
countDownLatch.countDown();
},
error -> Assertions.fail("Error occurred when receiving messages from the processor"),
new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(1).setTracerProvider(tracerProvider));

serviceBusProcessorClient.start();
boolean success = countDownLatch.await(5, TimeUnit.SECONDS);
serviceBusProcessorClient.close();

assertTrue(success, "Failed to receive all expected messages");
verify(tracer, times(5)).extractContext(eq(diagnosticId), any());
verify(tracer, times(5)).start(eq("ServiceBus.process"), any(), eq(ProcessKind.PROCESS));
verify(tracer, times(5)).end(eq("success"), isNull(), any());

}

private ServiceBusClientBuilder.ServiceBusReceiverClientBuilder getBuilder(
Flux<ServiceBusMessageContext> messageFlux) {

Expand Down