Skip to content

Commit

Permalink
[improve][broker] Reduce number of OpenTelemetry consumer attributes (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
dragosvictor authored Jun 4, 2024
1 parent bb95b85 commit 8276f21
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.util.concurrent.AtomicDouble;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.opentelemetry.api.common.Attributes;
import java.time.Instant;
import java.util.ArrayList;
import java.util.BitSet;
Expand All @@ -35,6 +36,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import lombok.Getter;
Expand Down Expand Up @@ -69,6 +71,7 @@
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap.LongPair;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -158,6 +161,10 @@ public class Consumer {
@Getter
private final Instant connectedSince = Instant.now();

private volatile Attributes openTelemetryAttributes;
private static final AtomicReferenceFieldUpdater<Consumer, Attributes> OPEN_TELEMETRY_ATTRIBUTES_FIELD_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(Consumer.class, Attributes.class, "openTelemetryAttributes");

public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId,
int priorityLevel, String consumerName,
boolean isDurable, TransportCnx cnx, String appId,
Expand Down Expand Up @@ -231,6 +238,8 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
.getPulsar().getConfiguration().isAcknowledgmentAtBatchIndexLevelEnabled();

this.schemaType = schemaType;

OPEN_TELEMETRY_ATTRIBUTES_FIELD_UPDATER.set(this, null);
}

@VisibleForTesting
Expand Down Expand Up @@ -263,6 +272,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.isAcknowledgmentAtBatchIndexLevelEnabled = false;
this.schemaType = null;
MESSAGE_PERMITS_UPDATER.set(this, availablePermits);
OPEN_TELEMETRY_ATTRIBUTES_FIELD_UPDATER.set(this, null);
}

public SubType subType() {
Expand Down Expand Up @@ -1203,4 +1213,30 @@ private int getStickyKeyHash(Entry entry) {
}

private static final Logger log = LoggerFactory.getLogger(Consumer.class);

public Attributes getOpenTelemetryAttributes() {
if (openTelemetryAttributes != null) {
return openTelemetryAttributes;
}
return OPEN_TELEMETRY_ATTRIBUTES_FIELD_UPDATER.updateAndGet(this, oldValue -> {
if (oldValue != null) {
return oldValue;
}
var topicName = TopicName.get(subscription.getTopic().getName());

var builder = Attributes.builder()
.put(OpenTelemetryAttributes.PULSAR_CONSUMER_NAME, consumerName)
.put(OpenTelemetryAttributes.PULSAR_CONSUMER_ID, consumerId)
.put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_NAME, subscription.getName())
.put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_TYPE, subType.toString())
.put(OpenTelemetryAttributes.PULSAR_DOMAIN, topicName.getDomain().toString())
.put(OpenTelemetryAttributes.PULSAR_TENANT, topicName.getTenant())
.put(OpenTelemetryAttributes.PULSAR_NAMESPACE, topicName.getNamespace())
.put(OpenTelemetryAttributes.PULSAR_TOPIC, topicName.getPartitionedTopicName());
if (topicName.isPartitioned()) {
builder.put(OpenTelemetryAttributes.PULSAR_PARTITION_INDEX, topicName.getPartitionIndex());
}
return builder.build();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.stats;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.BatchCallback;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import java.util.Collection;
Expand All @@ -27,8 +26,6 @@
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;

public class OpenTelemetryConsumerStats implements AutoCloseable {

Expand All @@ -52,6 +49,9 @@ public class OpenTelemetryConsumerStats implements AutoCloseable {
public static final String MESSAGE_UNACKNOWLEDGED_COUNTER = "pulsar.broker.consumer.message.unack.count";
private final ObservableLongMeasurement messageUnacknowledgedCounter;

public static final String CONSUMER_BLOCKED_COUNTER = "pulsar.broker.consumer.blocked";
private final ObservableLongMeasurement consumerBlockedCounter;

// Replaces pulsar_consumer_available_permits
public static final String MESSAGE_PERMITS_COUNTER = "pulsar.broker.consumer.permit.count";
private final ObservableLongMeasurement messagePermitsCounter;
Expand Down Expand Up @@ -91,6 +91,12 @@ public OpenTelemetryConsumerStats(PulsarService pulsar) {
.setDescription("The total number of messages unacknowledged by this consumer.")
.buildObserver();

consumerBlockedCounter = meter
.upDownCounterBuilder(CONSUMER_BLOCKED_COUNTER)
.setUnit("1")
.setDescription("Indicates whether the consumer is currently blocked due to unacknowledged messages.")
.buildObserver();

messagePermitsCounter = meter
.upDownCounterBuilder(MESSAGE_PERMITS_COUNTER)
.setUnit("{permit}")
Expand All @@ -114,6 +120,7 @@ public OpenTelemetryConsumerStats(PulsarService pulsar) {
messageAckCounter,
messageRedeliverCounter,
messageUnacknowledgedCounter,
consumerBlockedCounter,
messagePermitsCounter);
}

Expand All @@ -123,48 +130,13 @@ public void close() {
}

private void recordMetricsForConsumer(Consumer consumer) {
var subscription = consumer.getSubscription();
var topicName = TopicName.get(subscription.getTopic().getName());

var builder = Attributes.builder()
.put(OpenTelemetryAttributes.PULSAR_CONSUMER_NAME, consumer.consumerName())
.put(OpenTelemetryAttributes.PULSAR_CONSUMER_ID, consumer.consumerId())
.put(OpenTelemetryAttributes.PULSAR_CONSUMER_CONNECTED_SINCE,
consumer.getConnectedSince().getEpochSecond())
.put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_NAME, subscription.getName())
.put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_TYPE, consumer.subType().toString())
.put(OpenTelemetryAttributes.PULSAR_DOMAIN, topicName.getDomain().toString())
.put(OpenTelemetryAttributes.PULSAR_TENANT, topicName.getTenant())
.put(OpenTelemetryAttributes.PULSAR_NAMESPACE, topicName.getNamespace())
.put(OpenTelemetryAttributes.PULSAR_TOPIC, topicName.getPartitionedTopicName());
if (topicName.isPartitioned()) {
builder.put(OpenTelemetryAttributes.PULSAR_PARTITION_INDEX, topicName.getPartitionIndex());
}
var clientAddress = consumer.getClientAddressAndPort();
if (clientAddress != null) {
builder.put(OpenTelemetryAttributes.PULSAR_CLIENT_ADDRESS, clientAddress);
}
var clientVersion = consumer.getClientVersion();
if (clientVersion != null) {
builder.put(OpenTelemetryAttributes.PULSAR_CLIENT_VERSION, clientVersion);
}
var metadataList = consumer.getMetadata()
.entrySet()
.stream()
.map(e -> String.format("%s:%s", e.getKey(), e.getValue()))
.toList();
builder.put(OpenTelemetryAttributes.PULSAR_CONSUMER_METADATA, metadataList);
var attributes = builder.build();

var attributes = consumer.getOpenTelemetryAttributes();
messageOutCounter.record(consumer.getMsgOutCounter(), attributes);
bytesOutCounter.record(consumer.getBytesOutCounter(), attributes);
messageAckCounter.record(consumer.getMessageAckCounter(), attributes);
messageRedeliverCounter.record(consumer.getMessageRedeliverCounter(), attributes);
messageUnacknowledgedCounter.record(consumer.getUnackedMessages(),
Attributes.builder()
.putAll(attributes)
.put(OpenTelemetryAttributes.PULSAR_CONSUMER_BLOCKED, consumer.isBlocked())
.build());
messageUnacknowledgedCounter.record(consumer.getUnackedMessages(), attributes);
consumerBlockedCounter.record(consumer.isBlocked() ? 1 : 0, attributes);
messagePermitsCounter.record(consumer.getAvailablePermits(), attributes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,25 @@

import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.doAnswer;
import io.opentelemetry.api.common.Attributes;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class OpenTelemetryConsumerStatsTest extends BrokerTestBase {

private BrokerInterceptor brokerInterceptor;

@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
brokerInterceptor =
Mockito.mock(BrokerInterceptor.class, Mockito.withSettings().defaultAnswer(Mockito.CALLS_REAL_METHODS));
super.baseSetup();
}

Expand All @@ -64,7 +52,6 @@ protected void cleanup() throws Exception {
protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) {
super.customizeMainPulsarTestContextBuilder(builder);
builder.enableOpenTelemetry(true);
builder.brokerInterceptor(brokerInterceptor);
}

@Test(timeOut = 30_000)
Expand All @@ -78,14 +65,6 @@ public void testMessagingMetrics() throws Exception {
var subscriptionName = BrokerTestUtil.newUniqueName("test");
var receiverQueueSize = 100;

// Intercept calls to create consumer, in order to fetch client information.
var consumerRef = new AtomicReference<Consumer>();
doAnswer(invocation -> {
consumerRef.compareAndSet(null, invocation.getArgument(1));
return null;
}).when(brokerInterceptor)
.consumerCreated(any(), argThat(arg -> arg.getSubscription().getName().equals(subscriptionName)), any());

@Cleanup
var consumer = pulsarClient.newConsumer()
.topic(topicName)
Expand All @@ -94,12 +73,8 @@ public void testMessagingMetrics() throws Exception {
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.receiverQueueSize(receiverQueueSize)
.property("prop1", "value1")
.subscribe();

Awaitility.await().until(() -> consumerRef.get() != null);
var serverConsumer = consumerRef.get();

@Cleanup
var producer = pulsarClient.newProducer()
.topic(topicName)
Expand All @@ -121,11 +96,6 @@ public void testMessagingMetrics() throws Exception {
.put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_TYPE, SubscriptionType.Shared.toString())
.put(OpenTelemetryAttributes.PULSAR_CONSUMER_NAME, consumer.getConsumerName())
.put(OpenTelemetryAttributes.PULSAR_CONSUMER_ID, 0)
.put(OpenTelemetryAttributes.PULSAR_CONSUMER_CONNECTED_SINCE,
serverConsumer.getConnectedSince().getEpochSecond())
.put(OpenTelemetryAttributes.PULSAR_CLIENT_ADDRESS, serverConsumer.getClientAddressAndPort())
.put(OpenTelemetryAttributes.PULSAR_CLIENT_VERSION, serverConsumer.getClientVersion())
.put(OpenTelemetryAttributes.PULSAR_CONSUMER_METADATA, List.of("prop1:value1"))
.build();

Awaitility.await().untilAsserted(() -> {
Expand All @@ -141,9 +111,9 @@ public void testMessagingMetrics() throws Exception {
actual -> assertThat(actual).isGreaterThanOrEqualTo(receiverQueueSize - messageCount - ackCount));

var unAckCount = messageCount - ackCount;
assertMetricLongSumValue(metrics, OpenTelemetryConsumerStats.MESSAGE_UNACKNOWLEDGED_COUNTER,
attributes.toBuilder().put(OpenTelemetryAttributes.PULSAR_CONSUMER_BLOCKED, false).build(),
assertMetricLongSumValue(metrics, OpenTelemetryConsumerStats.MESSAGE_UNACKNOWLEDGED_COUNTER, attributes,
unAckCount);
assertMetricLongSumValue(metrics, OpenTelemetryConsumerStats.CONSUMER_BLOCKED_COUNTER, attributes, 0);
assertMetricLongSumValue(metrics, OpenTelemetryConsumerStats.MESSAGE_REDELIVER_COUNTER, attributes,
actual -> assertThat(actual).isGreaterThanOrEqualTo(unAckCount));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,6 @@ public interface OpenTelemetryAttributes {
*/
AttributeKey<Long> PULSAR_CONSUMER_ID = AttributeKey.longKey("pulsar.consumer.id");

/**
* Indicates whether the consumer is currently blocked on unacknowledged messages or not.
*/
AttributeKey<Boolean> PULSAR_CONSUMER_BLOCKED = AttributeKey.booleanKey("pulsar.consumer.blocked");

/**
* The consumer metadata properties, as a list of "key:value" pairs.
*/
Expand Down

0 comments on commit 8276f21

Please sign in to comment.