Skip to content

Commit

Permalink
Polish support for new "requeued" count metric
Browse files Browse the repository at this point in the history
Ensure backward compatibility, use "requeued" name, update copyright
year, add test.

(cherry picked from commit 28c7d51)

Conflicts:
	src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java
  • Loading branch information
acogoluegnes committed Nov 12, 2024
1 parent 0bfd4a6 commit 9a1d6aa
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 31 deletions.
14 changes: 11 additions & 3 deletions src/main/java/com/rabbitmq/client/MetricsCollector.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand Down Expand Up @@ -58,9 +58,17 @@ default void basicPublishUnrouted(Channel channel) {

void basicAck(Channel channel, long deliveryTag, boolean multiple);

void basicNack(Channel channel, long deliveryTag, boolean requeue);
void basicNack(Channel channel, long deliveryTag);

void basicReject(Channel channel, long deliveryTag, boolean requeue);
default void basicNack(Channel channel, long deliveryTag, boolean requeue) {
this.basicNack(channel, deliveryTag);
}

void basicReject(Channel channel, long deliveryTag);

default void basicReject(Channel channel, long deliveryTag, boolean requeue) {
this.basicReject(channel, deliveryTag);
}

void basicConsume(Channel channel, String consumerTag, boolean autoAck);

Expand Down
12 changes: 11 additions & 1 deletion src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand Down Expand Up @@ -45,11 +45,21 @@ public void basicAck(Channel channel, long deliveryTag, boolean multiple) {

}

@Override
public void basicNack(Channel channel, long deliveryTag) {

}

@Override
public void basicNack(Channel channel, long deliveryTag, boolean requeue) {

}

@Override
public void basicReject(Channel channel, long deliveryTag) {

}

@Override
public void basicReject(Channel channel, long deliveryTag, boolean requeue) {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand All @@ -24,6 +24,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

/**
* Base class for {@link MetricsCollector}.
Expand All @@ -42,7 +43,13 @@ public abstract class AbstractMetricsCollector implements MetricsCollector {

private final Runnable markAcknowledgedMessageAction = () -> markAcknowledgedMessage();

private final Function<Boolean, Runnable> markRejectedMessageAction = requeue -> () -> markRejectedMessage(requeue);
private final Function<Boolean, Runnable> markRejectedMessageAction;

public AbstractMetricsCollector() {
Runnable rejectRequeue = () -> markRejectedMessage(true);
Runnable rejectNoRequeue = () -> markRejectedMessage(false);
this.markRejectedMessageAction = requeue -> requeue ? rejectRequeue : rejectNoRequeue;
}

@Override
public void newConnection(final Connection connection) {
Expand Down Expand Up @@ -225,6 +232,11 @@ public void basicAck(Channel channel, long deliveryTag, boolean multiple) {
}
}

@Override
public void basicNack(Channel channel, long deliveryTag) {
// replaced by #basicNack(Channel, long, boolean)
}

@Override
public void basicNack(Channel channel, long deliveryTag, boolean requeue) {
try {
Expand All @@ -234,6 +246,11 @@ public void basicNack(Channel channel, long deliveryTag, boolean requeue) {
}
}

@Override
public void basicReject(Channel channel, long deliveryTag) {
// replaced by #basicReject(Channel, long, boolean)
}

@Override
public void basicReject(Channel channel, long deliveryTag, boolean requeue) {
try {
Expand Down Expand Up @@ -392,10 +409,19 @@ private ChannelState(Channel channel) {
*/
protected abstract void markAcknowledgedMessage();

/**
* Marks the event of a rejected message.
*
* @deprecated Use {@link #markRejectedMessage(boolean)} instead
*/
protected abstract void markRejectedMessage();

/**
* Marks the event of a rejected message.
*/
protected abstract void markRejectedMessage(boolean requeue);
protected void markRejectedMessage(boolean requeue) {
this.markRejectedMessage();
}

/**
* Marks the event of a message publishing acknowledgement.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class MicrometerMetricsCollector extends AbstractMetricsCollector {

private final Counter rejectedMessages;

private final Counter requeuedPublishedMessages;
private final Counter requeuedMessages;

public MicrometerMetricsCollector(MeterRegistry registry) {
this(registry, "rabbitmq");
Expand Down Expand Up @@ -92,7 +92,7 @@ public MicrometerMetricsCollector(Function<Metrics, Object> metricsCreator) {
this.ackedPublishedMessages = (Counter) metricsCreator.apply(ACKED_PUBLISHED_MESSAGES);
this.nackedPublishedMessages = (Counter) metricsCreator.apply(NACKED_PUBLISHED_MESSAGES);
this.unroutedPublishedMessages = (Counter) metricsCreator.apply(UNROUTED_PUBLISHED_MESSAGES);
this.requeuedPublishedMessages = (Counter) metricsCreator.apply(REQUEUED_PUBLISHED_MESSAGES);
this.requeuedMessages = (Counter) metricsCreator.apply(REQUEUED_MESSAGES);
}

@Override
Expand Down Expand Up @@ -135,10 +135,16 @@ protected void markAcknowledgedMessage() {
acknowledgedMessages.increment();
}

@Override
@SuppressWarnings("deprecation")
protected void markRejectedMessage() {

}

@Override
protected void markRejectedMessage(boolean requeue) {
if (requeue) {
requeuedPublishedMessages.increment();
requeuedMessages.increment();
}
rejectedMessages.increment();
}
Expand Down Expand Up @@ -198,6 +204,10 @@ public Counter getRejectedMessages() {
return rejectedMessages;
}

public Counter getRequeuedMessages() {
return requeuedMessages;
}

public enum Metrics {
CONNECTIONS {
@Override
Expand Down Expand Up @@ -235,6 +245,12 @@ Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
return registry.counter(prefix + ".rejected", tags);
}
},
REQUEUED_MESSAGES {
@Override
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
return registry.counter(prefix + ".requeued", tags);
}
},
FAILED_TO_PUBLISH_MESSAGES {
@Override
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
Expand All @@ -258,12 +274,6 @@ Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
return registry.counter(prefix + ".unrouted_published", tags);
}
},
REQUEUED_PUBLISHED_MESSAGES {
@Override
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
return registry.counter(prefix + ".requeued_published", tags);
}
};

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
// Copyright (c) 2023-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand Down Expand Up @@ -46,7 +46,7 @@ public class OpenTelemetryMetricsCollector extends AbstractMetricsCollector {
private final LongCounter ackedPublishedMessagesCounter;
private final LongCounter nackedPublishedMessagesCounter;
private final LongCounter unroutedPublishedMessagesCounter;
private final LongCounter requeuedPublishedMessagesCounter;
private final LongCounter requeuedMessagesCounter;

public OpenTelemetryMetricsCollector(OpenTelemetry openTelemetry) {
this(openTelemetry, "rabbitmq");
Expand Down Expand Up @@ -101,6 +101,12 @@ public OpenTelemetryMetricsCollector(final OpenTelemetry openTelemetry, final St
.setDescription("The number of messages rejected from the RabbitMQ server")
.build();

// requeuedPublishedMessages
this.requeuedMessagesCounter = meter.counterBuilder(prefix + ".requeued")
.setUnit("{messages}")
.setDescription("The number of re-queued messages to the RabbitMQ server")
.build();

// failedToPublishMessages
this.failedToPublishMessagesCounter = meter.counterBuilder(prefix + ".failed_to_publish")
.setUnit("{messages}")
Expand All @@ -124,12 +130,6 @@ public OpenTelemetryMetricsCollector(final OpenTelemetry openTelemetry, final St
.setUnit("{messages}")
.setDescription("The number of un-routed published messages to the RabbitMQ server")
.build();

// requeuedPublishedMessages
this.requeuedPublishedMessagesCounter = meter.counterBuilder(prefix + ".requeued_published")
.setUnit("{messages}")
.setDescription("The number of re-queued published messages to the RabbitMQ server")
.build();
}

@Override
Expand Down Expand Up @@ -172,10 +172,16 @@ protected void markAcknowledgedMessage() {
acknowledgedMessagesCounter.add(1L, attributes);
}

@Override
@SuppressWarnings("deprecation")
protected void markRejectedMessage() {

}

@Override
protected void markRejectedMessage(boolean requeue) {
if (requeue) {
requeuedPublishedMessagesCounter.add(1L, attributes);
requeuedMessagesCounter.add(1L, attributes);
}
rejectedMessagesCounter.add(1L, attributes);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand Down Expand Up @@ -41,12 +41,11 @@ public class StandardMetricsCollector extends AbstractMetricsCollector {
private final Meter consumedMessages;
private final Meter acknowledgedMessages;
private final Meter rejectedMessages;
private final Meter requeuedMessages;
private final Meter failedToPublishMessages;
private final Meter publishAcknowledgedMessages;
private final Meter publishNacknowledgedMessages;
private final Meter publishUnroutedMessages;
private final Meter requeuedPublishedMessages;


public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) {
this.registry = registry;
Expand All @@ -60,7 +59,7 @@ public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) {
this.consumedMessages = registry.meter(metricsPrefix+".consumed");
this.acknowledgedMessages = registry.meter(metricsPrefix+".acknowledged");
this.rejectedMessages = registry.meter(metricsPrefix+".rejected");
this.requeuedPublishedMessages = registry.meter(metricsPrefix+".requeued_published");
this.requeuedMessages = registry.meter(metricsPrefix+".requeued");
}

public StandardMetricsCollector() {
Expand Down Expand Up @@ -111,8 +110,17 @@ protected void markAcknowledgedMessage() {
acknowledgedMessages.mark();
}

@Override
@SuppressWarnings("deprecation")
protected void markRejectedMessage() {

}

@Override
protected void markRejectedMessage(boolean requeue) {
if (requeue) {
requeuedMessages.mark();
}
rejectedMessages.mark();
}

Expand Down Expand Up @@ -159,6 +167,10 @@ public Meter getRejectedMessages() {
return rejectedMessages;
}

public Meter getRequeuedMessages() {
return this.requeuedMessages;
}

public Meter getFailedToPublishMessages() {
return failedToPublishMessages;
}
Expand Down
Loading

0 comments on commit 9a1d6aa

Please sign in to comment.