Skip to content

Commit

Permalink
ARTEMIS-5280 metrics manager doesn't inspect temp queue namespace
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram authored and brusdev committed Feb 13, 2025
1 parent 4e5fd4a commit e0c63ca
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3297,7 +3297,7 @@ synchronized boolean initialisePart1(boolean scalingDown) throws Exception {
* are not required to be included in the OSGi bundle and the Micrometer jars apparently don't support OSGi.
*/
if (configuration.getMetricsConfiguration() != null && configuration.getMetricsConfiguration().getPlugin() != null) {
metricsManager = new MetricsManager(configuration.getName(), configuration.getMetricsConfiguration(), addressSettingsRepository, securityStore);
metricsManager = new MetricsManager(configuration.getName(), configuration.getMetricsConfiguration(), addressSettingsRepository, securityStore, temp -> getRuntimeTempQueueNamespace(temp));
}

postOffice = new PostOfficeImpl(this, storageManager, pagingManager, queueFactory, managementService, configuration.getMessageExpiryScanPeriod(), configuration.getAddressQueueScanPeriod(), configuration.getWildcardConfiguration(), configuration.getIDCacheSize(), configuration.isPersistIDCache(), addressSettingsRepository);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ private void registerQueueMeters(final Queue queue) {
if (messagingServer != null) { // messagingServer could be null on certain unit tests where metrics are not relevant
MetricsManager metricsManager = messagingServer.getMetricsManager();
if (metricsManager != null) {
metricsManager.registerQueueGauge(queue.getAddress().toString(), queue.getName().toString(), (builder) -> {
metricsManager.registerQueueGauge(queue.getAddress().toString(), queue.getName().toString(), queue.isTemporary(), (builder) -> {
builder.build(QueueMetricNames.MESSAGE_COUNT, queue, metrics -> (double) queue.getMessageCount(), QueueControl.MESSAGE_COUNT_DESCRIPTION, Collections.emptyList());
builder.build(QueueMetricNames.DURABLE_MESSAGE_COUNT, queue, metrics -> (double) queue.getDurableMessageCount(), QueueControl.DURABLE_MESSAGE_COUNT_DESCRIPTION, Collections.emptyList());
builder.build(QueueMetricNames.PERSISTENT_SIZE, queue, metrics -> (double) queue.getPersistentSize(), QueueControl.PERSISTENT_SIZE_DESCRIPTION, Collections.emptyList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.ToDoubleFunction;

import io.micrometer.core.instrument.Gauge;
Expand Down Expand Up @@ -66,14 +67,19 @@ public class MetricsManager {

private final HierarchicalRepository<AddressSettings> addressSettingsRepository;

// a function used to calculate the name of the queue when looking up address settings
private final Function<Boolean, String> queueNameFunction;

public MetricsManager(String brokerName,
MetricsConfiguration metricsConfiguration,
HierarchicalRepository<AddressSettings> addressSettingsRepository,
SecurityStore securityStore) {
SecurityStore securityStore,
Function<Boolean, String> queueNameFunction) {
this.brokerName = brokerName;
this.meterRegistry = metricsConfiguration.getPlugin().getRegistry();
this.addressSettingsRepository = addressSettingsRepository;
this.commonTags = Tags.of(BROKER_TAG_NAME, brokerName);
this.queueNameFunction = queueNameFunction;
if (meterRegistry != null) {
Metrics.globalRegistry.add(meterRegistry);
if (metricsConfiguration.isJvmMemory()) {
Expand Down Expand Up @@ -117,8 +123,8 @@ public interface MetricGaugeBuilder {
void build(String metricName, Object state, ToDoubleFunction<Object> f, String description, List<Tag> tags);
}

public void registerQueueGauge(String address, String queue, Consumer<MetricGaugeBuilder> builder) {
if (this.meterRegistry == null || !addressSettingsRepository.getMatch(address).isEnableMetrics()) {
public void registerQueueGauge(String address, String queue, boolean temporary, Consumer<MetricGaugeBuilder> builder) {
if (this.meterRegistry == null || !addressSettingsRepository.getMatch(queueNameFunction.apply(temporary) + queue).isEnableMetrics()) {
return;
}
final List<Builder<Object>> gaugeBuilders = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.metrics;

import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.activemq.artemis.core.config.MetricsConfiguration;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class MetricsManagerTest {

@Test
public void testQueueMetricsEnabled() throws Exception {
testQueueMetrics(true);
}

@Test
public void testQueueMetricsDisabled() throws Exception {
testQueueMetrics(false);
}

public void testQueueMetrics(boolean enableMetrics) throws Exception {
final String tempQueueNamespace = "temp.";
final long latchTimeout = 100;

HierarchicalRepository<AddressSettings> addressSettingsRepository = new HierarchicalObjectRepository<>(new WildcardConfiguration());

// configure metrics for temp queues
addressSettingsRepository.addMatch(tempQueueNamespace + "#", new AddressSettings().setEnableMetrics(enableMetrics));

// configure metrics for normal queues
addressSettingsRepository.addMatch("#", new AddressSettings().setEnableMetrics(enableMetrics));

MetricsConfiguration metricsConfiguration = new MetricsConfiguration();
metricsConfiguration.setPlugin(new SimpleMetricsPlugin().init(null));
MetricsManager metricsManager = new MetricsManager(RandomUtil.randomUUIDString(), metricsConfiguration, addressSettingsRepository, null, temp -> {
// this is a simplified version of org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl.getRuntimeTempQueueNamespace
if (temp) {
return tempQueueNamespace;
} else {
return "";
}
});

// test temp queue
AtomicBoolean tempTest = new AtomicBoolean(false);
metricsManager.registerQueueGauge(RandomUtil.randomUUIDString(), RandomUtil.randomUUIDString(), true, (builder) -> {
tempTest.set(true);
});
assertEquals(enableMetrics, tempTest.get());

// test normal queue
AtomicBoolean test = new AtomicBoolean(false);
metricsManager.registerQueueGauge(RandomUtil.randomUUIDString(), RandomUtil.randomUUIDString(), false, (builder) -> {
test.set(true);
});
assertEquals(enableMetrics, test.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,12 @@ public class MetricsPluginTest extends ActiveMQTestBase {
protected ClientSession session;
protected ClientSessionFactory sf;
protected ServerLocator locator;
private static final String TEMP_QUEUE_NAMESPACE = "temp";

protected void configureServer(ActiveMQServer server) {
server.getConfiguration().setMetricsConfiguration(new MetricsConfiguration().setPlugin(new SimpleMetricsPlugin().init(null)));
server.getConfiguration()
.setTemporaryQueueNamespace(TEMP_QUEUE_NAMESPACE)
.setMetricsConfiguration(new MetricsConfiguration().setPlugin(new SimpleMetricsPlugin().init(null)));
}

@Override
Expand Down Expand Up @@ -175,23 +178,33 @@ public int hashCode() {
}

@Test
public void testForBasicMetricsPresenceAndValue() throws Exception {
internalTestForBasicMetrics(true);
public void testForBasicQueueMetricsPresenceAndValue() throws Exception {
internalTestForBasicQueueMetrics(true, false);
}

@Test
public void testDisablingMetrics() throws Exception {
internalTestForBasicMetrics(false);
public void testDisablingQueueMetrics() throws Exception {
internalTestForBasicQueueMetrics(false, false);
}

private void internalTestForBasicMetrics(boolean enabled) throws Exception {
@Test
public void testForBasicTempQueueMetricsPresenceAndValue() throws Exception {
internalTestForBasicQueueMetrics(true, true);
}

@Test
public void testDisablingTempQueueMetrics() throws Exception {
internalTestForBasicQueueMetrics(false, true);
}

private void internalTestForBasicQueueMetrics(boolean enabled, boolean temp) throws Exception {
final String data = "Simple Text " + UUID.randomUUID().toString();
final String queueName = "simpleQueue";
final String addressName = "simpleAddress";

server.getAddressSettingsRepository().getMatch(addressName).setEnableMetrics(enabled);
server.getAddressSettingsRepository().getMatch(temp ? TEMP_QUEUE_NAMESPACE + "." + addressName : addressName).setEnableMetrics(enabled);

session.createQueue(QueueConfiguration.of(queueName).setAddress(addressName).setRoutingType(RoutingType.ANYCAST));
session.createQueue(QueueConfiguration.of(queueName).setAddress(addressName).setRoutingType(RoutingType.ANYCAST).setTemporary(temp).setDurable(!temp));
ClientProducer producer = session.createProducer(addressName);
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeString(data);
Expand All @@ -206,7 +219,9 @@ private void internalTestForBasicMetrics(boolean enabled) throws Exception {
checkMetric(metrics, "artemis.message.count", "queue", queueName, 1.0, enabled);
checkMetric(metrics, "artemis.messages.added", "queue", queueName, 1.0, enabled);
checkMetric(metrics, "artemis.messages.acknowledged", "queue", queueName, 0.0, enabled);
checkMetric(metrics, "artemis.durable.message.count", "queue", queueName, 1.0, enabled);
if (!temp) {
checkMetric(metrics, "artemis.durable.message.count", "queue", queueName, 1.0, enabled);
}
checkMetric(metrics, "artemis.delivering.message.count", "queue", queueName, 0.0, enabled);
checkMetric(metrics, "artemis.routed.message.count", "address", addressName, 1.0, enabled);
checkMetric(metrics, "artemis.unrouted.message.count", "address", addressName, 0.0, enabled);
Expand Down

0 comments on commit e0c63ca

Please sign in to comment.