diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index c69c6e2418f7c..2b4032490e65e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -112,6 +112,7 @@ import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl; import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider; +import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider; import org.apache.pulsar.broker.validator.MultipleListenerValidator; import org.apache.pulsar.broker.validator.TransactionBatchedWriteValidator; import org.apache.pulsar.broker.web.WebService; @@ -168,6 +169,7 @@ import org.apache.pulsar.packages.management.core.impl.PackagesManagementImpl; import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider; +import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider; import org.apache.pulsar.websocket.WebSocketConsumerServlet; import org.apache.pulsar.websocket.WebSocketPingPongServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; @@ -822,6 +824,9 @@ public void start() throws PulsarServerException { // Register pulsar system namespaces and start transaction meta store service if (config.isTransactionCoordinatorEnabled()) { + MLTransactionMetadataStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress()); + MLPendingAckStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress()); + this.transactionBufferSnapshotService = new SystemTopicBaseTxnBufferSnapshotService(getClient()); this.transactionTimer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer")); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java index 3ff4ed8e76808..8b1e0a4e94123 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java @@ -61,6 +61,7 @@ import org.apache.pulsar.transaction.coordinator.impl.TxnBatchedPositionImpl; import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriter; import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig; +import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterMetricsStats; import org.jctools.queues.MessagePassingQueue; import org.jctools.queues.SpscArrayQueue; import org.slf4j.Logger; @@ -119,7 +120,7 @@ public class MLPendingAckStore implements PendingAckStore { public MLPendingAckStore(ManagedLedger managedLedger, ManagedCursor cursor, ManagedCursor subManagedCursor, long transactionPendingAckLogIndexMinLag, TxnLogBufferedWriterConfig bufferedWriterConfig, - Timer timer) { + Timer timer, TxnLogBufferedWriterMetricsStats bufferedWriterMetrics) { this.managedLedger = managedLedger; this.cursor = cursor; this.currentLoadPosition = (PositionImpl) this.cursor.getMarkDeletedPosition(); @@ -132,7 +133,8 @@ public MLPendingAckStore(ManagedLedger managedLedger, ManagedCursor cursor, this.bufferedWriter = new TxnLogBufferedWriter(managedLedger, ((ManagedLedgerImpl) managedLedger).getExecutor(), timer, PendingAckLogSerializer.INSTANCE, bufferedWriterConfig.getBatchedWriteMaxRecords(), bufferedWriterConfig.getBatchedWriteMaxSize(), - bufferedWriterConfig.getBatchedWriteMaxDelayInMillis(), bufferedWriterConfig.isBatchEnabled()); + bufferedWriterConfig.getBatchedWriteMaxDelayInMillis(), bufferedWriterConfig.isBatchEnabled(), + bufferedWriterMetrics); this.batchedPendingAckLogsWaitingForHandle = new ArrayList<>(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java index fdff9f59146d4..bf2771abaa65a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.transaction.pendingack.impl; import io.netty.util.Timer; +import io.prometheus.client.CollectorRegistry; import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.AsyncCallbacks; @@ -34,7 +35,9 @@ import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats; import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig; +import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterMetricsStats; /** @@ -43,6 +46,21 @@ @Slf4j public class MLPendingAckStoreProvider implements TransactionPendingAckStoreProvider { + private static volatile TxnLogBufferedWriterMetricsStats bufferedWriterMetrics = + DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS; + + public static void initBufferedWriterMetrics(String brokerAdvertisedAddress){ + if (bufferedWriterMetrics != DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS){ + return; + } + synchronized (MLPendingAckStoreProvider.class){ + if (bufferedWriterMetrics != DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS){ + return; + } + bufferedWriterMetrics = new MLTxnPendingAckLogBufferedWriterMetrics(brokerAdvertisedAddress); + } + } + @Override public CompletableFuture newPendingAckStore(PersistentSubscription subscription) { CompletableFuture pendingAckStoreFuture = new CompletableFuture<>(); @@ -105,7 +123,7 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) { .getConfiguration() .getTransactionPendingAckLogIndexMinLag(), txnLogBufferedWriterConfig, - brokerClientSharedTimer)); + brokerClientSharedTimer, bufferedWriterMetrics)); if (log.isDebugEnabled()) { log.debug("{},{} open MLPendingAckStore cursor success", originPersistentTopic.getName(), @@ -151,4 +169,14 @@ public CompletableFuture checkInitializedBefore(PersistentSubscription return originPersistentTopic.getBrokerService().getManagedLedgerFactory() .asyncExists(TopicName.get(pendingAckTopicName).getPersistenceNamingEncoding()); } + + private static class MLTxnPendingAckLogBufferedWriterMetrics extends TxnLogBufferedWriterMetricsStats{ + + private MLTxnPendingAckLogBufferedWriterMetrics(String brokerAdvertisedAddress) { + super("pulsar_txn_pending_ack_store", + new String[]{"broker"}, + new String[]{brokerAdvertisedAddress}, + CollectorRegistry.defaultRegistry); + } + } } \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java index b448f00a8ff16..cd07b7f6e25b9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.stats; +import static org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS; import io.netty.util.HashedWheelTimer; import io.netty.util.concurrent.DefaultThreadFactory; import java.util.List; @@ -109,7 +110,7 @@ public void testTransactionTopic() throws Exception { managedLedgerConfig.setMaxEntriesPerLedger(2); MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(TransactionCoordinatorID.get(0), pulsar.getManagedLedgerFactory(), managedLedgerConfig, txnLogBufferedWriterConfig, - transactionTimer); + transactionTimer, DISABLED_BUFFERED_WRITER_METRICS); mlTransactionLog.initialize().get(2, TimeUnit.SECONDS); ManagedLedgerMetrics metrics = new ManagedLedgerMetrics(pulsar); metrics.generate(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionBatchWriterMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionBatchWriterMetricsTest.java new file mode 100644 index 0000000000000..ceed05884df2e --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionBatchWriterMetricsTest.java @@ -0,0 +1,289 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles; +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.resources.ClusterResources; +import org.apache.pulsar.broker.resources.NamespaceResources; +import org.apache.pulsar.broker.resources.TenantResources; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * Test for consuming transaction messages. + */ +@Slf4j +@Test(groups = "broker") +public class TransactionBatchWriterMetricsTest extends MockedPulsarServiceBaseTest { + + private final String clusterName = "test"; + public static final NamespaceName DEFAULT_NAMESPACE = NamespaceName.get("public/default"); + private final String topicNameSuffix = "t-rest-topic"; + private final String topicName = DEFAULT_NAMESPACE.getPersistentTopicName(topicNameSuffix); + + @BeforeClass + public void setup() throws Exception { + super.internalSetup(); + } + + @AfterClass + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + // enable transaction. + conf.setSystemTopicEnabled(true); + conf.setTransactionCoordinatorEnabled(true); + // enabled batch writer. + conf.setTransactionPendingAckBatchedWriteEnabled(true); + conf.setTransactionPendingAckBatchedWriteMaxRecords(10); + conf.setTransactionLogBatchedWriteEnabled(true); + conf.setTransactionLogBatchedWriteMaxRecords(10); + } + + @Override + protected PulsarService startBroker(ServiceConfiguration conf) throws Exception { + PulsarService pulsar = startBrokerWithoutAuthorization(conf); + ensureClusterExists(pulsar, clusterName); + ensureTenantExists(pulsar.getPulsarResources().getTenantResources(), TopicName.PUBLIC_TENANT, clusterName); + ensureNamespaceExists(pulsar.getPulsarResources().getNamespaceResources(), DEFAULT_NAMESPACE, + clusterName); + ensureNamespaceExists(pulsar.getPulsarResources().getNamespaceResources(), NamespaceName.SYSTEM_NAMESPACE, + clusterName); + ensureTopicExists(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources(), + SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, 16); + return pulsar; + } + + @Test + public void testTransactionMetaLogMetrics() throws Exception{ + String metricsLabelCluster = clusterName; + String metricsLabelBroker = pulsar.getAdvertisedAddress().split(":")[0]; + admin.topics().createNonPartitionedTopic(topicName); + + sendAndAckSomeMessages(); + + // call metrics + Client client = ClientBuilder.newClient(); + WebTarget target = client.target(brokerUrl + "/metrics/get"); + Response response = target.request(MediaType.APPLICATION_JSON_TYPE).buildGet().invoke(); + Assert.assertTrue(response.getStatus() / 200 == 1); + List metricsLines = parseResponseEntityToList(response); + + metricsLines = metricsLines.stream().filter(s -> !s.startsWith("#") && s.contains("bufferedwriter")).collect( + Collectors.toList()); + + // verify tc. + String metrics_key_txn_tc_record_count_sum = + "pulsar_txn_tc_bufferedwriter_batch_record_count_sum{cluster=\"%s\",broker=\"%s\"} "; + Assert.assertTrue(searchMetricsValue(metricsLines, + String.format(metrics_key_txn_tc_record_count_sum, metricsLabelCluster, metricsLabelBroker)) + > 0); + String metrics_key_txn_tc_max_delay = + "pulsar_txn_tc_bufferedwriter_flush_trigger_max_delay_total{cluster=\"%s\",broker=\"%s\"} "; + Assert.assertTrue(searchMetricsValue(metricsLines, + String.format(metrics_key_txn_tc_max_delay, metricsLabelCluster, metricsLabelBroker)) + > 0); + String metrics_key_txn_tc_bytes_size = + "pulsar_txn_tc_bufferedwriter_batch_size_bytes_sum{cluster=\"%s\",broker=\"%s\"} "; + Assert.assertTrue(searchMetricsValue(metricsLines, + String.format(metrics_key_txn_tc_bytes_size, metricsLabelCluster, metricsLabelBroker)) + > 0); + // verify pending ack. + String metrics_key_txn_pending_ack_record_count_sum = + "pulsar_txn_pending_ack_store_bufferedwriter_batch_record_count_sum{cluster=\"%s\",broker=\"%s\"} "; + Assert.assertTrue(searchMetricsValue(metricsLines, + String.format(metrics_key_txn_pending_ack_record_count_sum, metricsLabelCluster, metricsLabelBroker)) + > 0); + String metrics_key_txn_pending_ack_max_delay = + "pulsar_txn_pending_ack_store_bufferedwriter_flush_trigger_max_delay_total{cluster=\"%s\",broker=\"%s\"} "; + Assert.assertTrue(searchMetricsValue(metricsLines, + String.format(metrics_key_txn_pending_ack_max_delay, metricsLabelCluster, metricsLabelBroker)) + > 0); + String metrics_key_txn_pending_ack_bytes_size = + "pulsar_txn_pending_ack_store_bufferedwriter_batch_size_bytes_sum{cluster=\"%s\",broker=\"%s\"} "; + Assert.assertTrue(searchMetricsValue(metricsLines, + String.format(metrics_key_txn_pending_ack_bytes_size, metricsLabelCluster, metricsLabelBroker)) + > 0); + + // cleanup. + response.close(); + client.close(); + admin.topics().delete(topicName, true); + } + + private static Double searchMetricsValue(List metricsLines, String key){ + for (int i = 0; i < metricsLines.size(); i++){ + String metricsLine = metricsLines.get(i); + if (metricsLine.startsWith("#")){ + continue; + } + if (metricsLine.startsWith(key)){ + return Double.valueOf(metricsLine.split(" ")[1]); + } + } + return null; + } + + private void sendAndAckSomeMessages() throws Exception { + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topicName) + .sendTimeout(0, TimeUnit.SECONDS) + .enableBatching(false) + .batchingMaxMessages(2) + .create(); + Consumer consumer = pulsarClient.newConsumer() + .subscriptionType(SubscriptionType.Shared) + .topic(topicName) + .isAckReceiptEnabled(true) + .acknowledgmentGroupTime(0, TimeUnit.SECONDS) + .subscriptionName("my-subscription") + .subscribe(); + producer.sendAsync("normal message x".getBytes()).get(); + for (int i = 0; i < 100; i++){ + Transaction transaction = + pulsarClient.newTransaction().withTransactionTimeout(10, TimeUnit.SECONDS).build().get(); + Message msg = consumer.receive(); + producer.newMessage(transaction).value(("tx msg a-" + i).getBytes(StandardCharsets.UTF_8)).sendAsync(); + producer.newMessage(transaction).value(("tx msg b-" + i).getBytes(StandardCharsets.UTF_8)).sendAsync(); + consumer.acknowledgeAsync(msg.getMessageId(), transaction); + transaction.commit().get(); + } + } + + private static void ensureClusterExists(PulsarService pulsar, String cluster) throws Exception { + ClusterResources clusterResources = pulsar.getPulsarResources().getClusterResources(); + ClusterData clusterData = ClusterData.builder() + .serviceUrl(pulsar.getWebServiceAddress()) + .serviceUrlTls(pulsar.getWebServiceAddress()) + .brokerServiceUrl(pulsar.getBrokerServiceUrl()) + .brokerServiceUrlTls(pulsar.getBrokerServiceUrl()) + .build(); + if (!clusterResources.clusterExists(cluster)) { + clusterResources.createCluster(cluster, clusterData); + } + } + + private static void ensureTopicExists(NamespaceResources.PartitionedTopicResources partitionedTopicResources, + TopicName topicName, int numPartitions) throws Exception { + Optional getResult = + partitionedTopicResources.getPartitionedTopicMetadataAsync(topicName).get(); + if (!getResult.isPresent()) { + partitionedTopicResources.createPartitionedTopic(topicName, new PartitionedTopicMetadata(numPartitions)); + } else { + PartitionedTopicMetadata existsMeta = getResult.get(); + if (existsMeta.partitions < numPartitions) { + partitionedTopicResources.updatePartitionedTopicAsync(topicName, + __ -> new PartitionedTopicMetadata(numPartitions)).get(); + } + } + } + + private static void ensureNamespaceExists(NamespaceResources namespaceResources, NamespaceName namespaceName, + String cluster) throws Exception { + if (!namespaceResources.namespaceExists(namespaceName)) { + Policies policies = new Policies(); + policies.bundles = getBundles(16); + policies.replication_clusters = Collections.singleton(cluster); + namespaceResources.createPolicies(namespaceName, policies); + } else { + namespaceResources.setPolicies(namespaceName, policies -> { + policies.replication_clusters.add(cluster); + return policies; + }); + } + } + + private static void ensureTenantExists(TenantResources tenantResources, String tenant, String cluster) + throws Exception { + if (!tenantResources.tenantExists(tenant)) { + TenantInfoImpl publicTenant = new TenantInfoImpl(Collections.emptySet(), Collections.singleton(cluster)); + tenantResources.createTenant(tenant, publicTenant); + } else { + tenantResources.updateTenantAsync(tenant, ti -> { + ti.getAllowedClusters().add(cluster); + return ti; + }).get(); + } + } + + private List parseResponseEntityToList(Response response) throws Exception { + InputStream inputStream = (InputStream) response.getEntity(); + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); + List list = new ArrayList<>(); + while (true){ + String str = bufferedReader.readLine(); + if (str == null){ + break; + } + list.add(str); + } + return list; + } + + protected PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException { + org.apache.pulsar.client.api.ClientBuilder clientBuilder = + PulsarClient.builder() + .serviceUrl(url) + .enableTransaction(true) + .statsInterval(intervalInSecs, TimeUnit.SECONDS); + customizeNewPulsarClientBuilder(clientBuilder); + return createNewPulsarClient(clientBuilder); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index c309f69fd566d..6a078fc19a73d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -21,6 +21,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.pulsar.common.naming.SystemTopicNames.PENDING_ACK_STORE_SUFFIX; import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX; +import static org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doAnswer; @@ -691,7 +692,8 @@ public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{ TransactionPendingAckStoreProvider pendingAckStoreProvider = mock(TransactionPendingAckStoreProvider.class); doReturn(CompletableFuture.completedFuture( new MLPendingAckStore(persistentTopic.getManagedLedger(), managedCursor, null, - 500, bufferedWriterConfig, transactionTimer))) + 500, bufferedWriterConfig, transactionTimer, + DISABLED_BUFFERED_WRITER_METRICS))) .when(pendingAckStoreProvider).newPendingAckStore(any()); doReturn(CompletableFuture.completedFuture(true)).when(pendingAckStoreProvider).checkInitializedBefore(any()); @@ -757,7 +759,7 @@ public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{ MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(new TransactionCoordinatorID(1), null, persistentTopic.getManagedLedger().getConfig(), new TxnLogBufferedWriterConfig(), - transactionTimer); + transactionTimer, DISABLED_BUFFERED_WRITER_METRICS); Class mlTransactionLogClass = MLTransactionLogImpl.class; Field field = mlTransactionLogClass.getDeclaredField("cursor"); field.setAccessible(true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckMetadataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckMetadataTest.java index fe5c7fa196977..0b14f6592fa82 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckMetadataTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckMetadataTest.java @@ -41,6 +41,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State.WriteFailed; +import static org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS; import static org.testng.Assert.assertTrue; import static org.testng.AssertJUnit.fail; @@ -81,7 +82,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { ManagedCursor subCursor = completableFuture.get().openCursor("test"); MLPendingAckStore pendingAckStore = new MLPendingAckStore(completableFuture.get(), cursor, subCursor, 500, - bufferedWriterConfig, transactionTimer); + bufferedWriterConfig, transactionTimer, DISABLED_BUFFERED_WRITER_METRICS); Field field = MLPendingAckStore.class.getDeclaredField("managedLedger"); field.setAccessible(true); diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/DisabledTxnLogBufferedWriterMetricsStats.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/DisabledTxnLogBufferedWriterMetricsStats.java new file mode 100644 index 0000000000000..5a6238a95e491 --- /dev/null +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/DisabledTxnLogBufferedWriterMetricsStats.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + +import io.prometheus.client.Collector; +import io.prometheus.client.CollectorRegistry; + +public class DisabledTxnLogBufferedWriterMetricsStats extends TxnLogBufferedWriterMetricsStats { + + public static final DisabledTxnLogBufferedWriterMetricsStats DISABLED_BUFFERED_WRITER_METRICS = + new DisabledTxnLogBufferedWriterMetricsStats(); + + private static class DisabledCollectorRegistry extends CollectorRegistry { + + private static final DisabledCollectorRegistry INSTANCE = new DisabledCollectorRegistry(); + + public void register(Collector m) { + } + public void unregister(Collector m) { + } + } + + private DisabledTxnLogBufferedWriterMetricsStats() { + super("disabled", new String[0], new String[0], DisabledCollectorRegistry.INSTANCE); + } + + public void close() { + } + + public void triggerFlushByRecordsCount(int recordCount, long bytesSize, long delayMillis) { + } + + public void triggerFlushByBytesSize(int recordCount, long bytesSize, long delayMillis) { + } + + public void triggerFlushByByMaxDelay(int recordCount, long bytesSize, long delayMillis) { + } + + public void triggerFlushByLargeSingleData(int recordCount, long bytesSize, long delayMillis) { + } +} diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java index 7b4b07ee652b6..67c0b54504bf0 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java @@ -82,11 +82,14 @@ public class MLTransactionLogImpl implements TransactionLog { private final TxnLogBufferedWriterConfig txnLogBufferedWriterConfig; + private final TxnLogBufferedWriterMetricsStats bufferedWriterMetrics; + public MLTransactionLogImpl(TransactionCoordinatorID tcID, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig, TxnLogBufferedWriterConfig txnLogBufferedWriterConfig, - Timer timer) { + Timer timer, + TxnLogBufferedWriterMetricsStats bufferedWriterMetrics) { this.topicName = getMLTransactionLogName(tcID); this.tcId = tcID.getId(); this.managedLedgerFactory = managedLedgerFactory; @@ -97,6 +100,7 @@ public MLTransactionLogImpl(TransactionCoordinatorID tcID, this.managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(true); } this.entryQueue = new SpscArrayQueue<>(2000); + this.bufferedWriterMetrics = bufferedWriterMetrics; } public static TopicName getMLTransactionLogName(TransactionCoordinatorID tcID) { @@ -119,7 +123,8 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { txnLogBufferedWriterConfig.getBatchedWriteMaxRecords(), txnLogBufferedWriterConfig.getBatchedWriteMaxSize(), txnLogBufferedWriterConfig.getBatchedWriteMaxDelayInMillis(), - txnLogBufferedWriterConfig.isBatchEnabled()); + txnLogBufferedWriterConfig.isBatchEnabled(), + bufferedWriterMetrics); managedLedger.asyncOpenCursor(TRANSACTION_SUBSCRIPTION_NAME, CommandSubscribe.InitialPosition.Earliest, new AsyncCallbacks.OpenCursorCallback() { diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java index 7e6a7059125d0..c11e422d27a8f 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java @@ -19,6 +19,7 @@ package org.apache.pulsar.transaction.coordinator.impl; import io.netty.util.Timer; +import io.prometheus.client.CollectorRegistry; import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; @@ -33,6 +34,22 @@ */ public class MLTransactionMetadataStoreProvider implements TransactionMetadataStoreProvider { + + private static volatile TxnLogBufferedWriterMetricsStats bufferedWriterMetrics = + DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS; + + public static void initBufferedWriterMetrics(String brokerAdvertisedAddress){ + if (bufferedWriterMetrics != DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS){ + return; + } + synchronized (MLTransactionMetadataStoreProvider.class){ + if (bufferedWriterMetrics != DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS){ + return; + } + bufferedWriterMetrics = new MLTransactionMetadataStoreBufferedWriterMetrics(brokerAdvertisedAddress); + } + } + @Override public CompletableFuture openStore(TransactionCoordinatorID transactionCoordinatorId, ManagedLedgerFactory managedLedgerFactory, @@ -45,11 +62,21 @@ public CompletableFuture openStore(TransactionCoordina MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator(); managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator); MLTransactionLogImpl txnLog = new MLTransactionLogImpl(transactionCoordinatorId, - managedLedgerFactory, managedLedgerConfig, txnLogBufferedWriterConfig, timer); + managedLedgerFactory, managedLedgerConfig, txnLogBufferedWriterConfig, timer, bufferedWriterMetrics); // MLTransactionLogInterceptor will init sequenceId and update the sequenceId to managedLedger properties. return txnLog.initialize().thenCompose(__ -> new MLTransactionMetadataStore(transactionCoordinatorId, txnLog, timeoutTracker, mlTransactionSequenceIdGenerator, maxActiveTransactionsPerCoordinator).init(recoverTracker)); } + + private static class MLTransactionMetadataStoreBufferedWriterMetrics extends TxnLogBufferedWriterMetricsStats { + + private MLTransactionMetadataStoreBufferedWriterMetrics(String brokerAdvertisedAddress) { + super("pulsar_txn_tc", + new String[]{"broker"}, + new String[]{brokerAdvertisedAddress}, + CollectorRegistry.defaultRegistry); + } + } } \ No newline at end of file diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java index df9cb613e1c32..2346ebb3a8024 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java @@ -128,14 +128,6 @@ public class TxnLogBufferedWriter { trigFlushByTimingTask(); }; - public TxnLogBufferedWriter(ManagedLedger managedLedger, OrderedExecutor orderedExecutor, Timer timer, - DataSerializer dataSerializer, - int batchedWriteMaxRecords, int batchedWriteMaxSize, int batchedWriteMaxDelayInMillis, - boolean batchEnabled){ - this(managedLedger, orderedExecutor, timer, dataSerializer, batchedWriteMaxRecords, batchedWriteMaxSize, - batchedWriteMaxDelayInMillis, batchEnabled, null); - } - /** * Constructor. * @param dataSerializer The serializer for the object which called by {@link #asyncAddData}. @@ -174,6 +166,9 @@ public TxnLogBufferedWriter(ManagedLedger managedLedger, OrderedExecutor ordered this.flushContext = FlushContext.newInstance(); this.dataArray = new ArrayList<>(); STATE_UPDATER.set(this, State.OPEN); + if (metrics == null){ + throw new IllegalArgumentException("Build TxnLogBufferedWriter error: param metrics can not be null"); + } this.metrics = metrics; this.timer = timer; // scheduler task. @@ -284,10 +279,8 @@ private void trigFlushByTimingTask(){ if (flushContext.asyncAddArgsList.isEmpty()) { return; } - if (metrics != null) { - metrics.triggerFlushByByMaxDelay(flushContext.asyncAddArgsList.size(), bytesSize, - System.currentTimeMillis() - flushContext.asyncAddArgsList.get(0).addedTime); - } + metrics.triggerFlushByByMaxDelay(flushContext.asyncAddArgsList.size(), bytesSize, + System.currentTimeMillis() - flushContext.asyncAddArgsList.get(0).addedTime); doFlush(); } catch (Exception e){ log.error("Trig flush by timing task fail.", e); @@ -303,18 +296,14 @@ private void trigFlushByTimingTask(){ */ private void trigFlushIfReachMaxRecordsOrMaxSize(){ if (flushContext.asyncAddArgsList.size() >= batchedWriteMaxRecords) { - if (metrics != null) { - metrics.triggerFlushByRecordsCount(flushContext.asyncAddArgsList.size(), bytesSize, - System.currentTimeMillis() - flushContext.asyncAddArgsList.get(0).addedTime); - } + metrics.triggerFlushByRecordsCount(flushContext.asyncAddArgsList.size(), bytesSize, + System.currentTimeMillis() - flushContext.asyncAddArgsList.get(0).addedTime); doFlush(); return; } if (bytesSize >= batchedWriteMaxSize) { - if (metrics != null) { - metrics.triggerFlushByBytesSize(flushContext.asyncAddArgsList.size(), bytesSize, - System.currentTimeMillis() - flushContext.asyncAddArgsList.get(0).addedTime); - } + metrics.triggerFlushByBytesSize(flushContext.asyncAddArgsList.size(), bytesSize, + System.currentTimeMillis() - flushContext.asyncAddArgsList.get(0).addedTime); doFlush(); } } @@ -323,10 +312,8 @@ private void trigFlushByLargeSingleData(){ if (flushContext.asyncAddArgsList.isEmpty()) { return; } - if (metrics != null) { - metrics.triggerFlushByLargeSingleData(this.flushContext.asyncAddArgsList.size(), this.bytesSize, - System.currentTimeMillis() - flushContext.asyncAddArgsList.get(0).addedTime); - } + metrics.triggerFlushByLargeSingleData(this.flushContext.asyncAddArgsList.size(), this.bytesSize, + System.currentTimeMillis() - flushContext.asyncAddArgsList.get(0).addedTime); doFlush(); } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterMetricsStats.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterMetricsStats.java index fd8f53b72ed07..6eafe79638c4a 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterMetricsStats.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterMetricsStats.java @@ -22,6 +22,7 @@ import io.prometheus.client.Counter; import io.prometheus.client.Histogram; import java.io.Closeable; +import java.util.concurrent.atomic.AtomicBoolean; import lombok.Getter; /*** @@ -46,6 +47,7 @@ public class TxnLogBufferedWriterMetricsStats implements Closeable { static final double[] MAX_DELAY_TIME_BUCKETS = {1, 5, 10}; + private final CollectorRegistry collectorRegistry; @Getter private final String metricsPrefix; @@ -87,12 +89,15 @@ public class TxnLogBufferedWriterMetricsStats implements Closeable { private final Counter batchFlushTriggeredByLargeSingleDataMetric; private final Counter.Child batchFlushTriggeredByLargeSingleDataCounter; + private final AtomicBoolean closed; + /** * Users needs to ensure that the {@link TxnLogBufferedWriterMetricsStats} of the same {@param metricsPrefix} can * only create once, otherwise an IllegalArgumentException will be thrown. */ public TxnLogBufferedWriterMetricsStats(String metricsPrefix, String[] labelNames, String[] labelValues, CollectorRegistry registry) { + this.collectorRegistry = registry; this.metricsPrefix = metricsPrefix; this.labelNames = labelNames.clone(); this.labelValues = labelValues.clone(); @@ -101,79 +106,85 @@ public TxnLogBufferedWriterMetricsStats(String metricsPrefix, String[] labelName String.format("%s_bufferedwriter_batch_record_count", metricsPrefix); recordsPerBatchMetric = new Histogram.Builder() .name(recordsPerBatchMetricName) - .labelNames(labelNames) + .labelNames(this.labelNames) .help("Records per batch histogram") .buckets(RECORD_COUNT_PER_ENTRY_BUCKETS) - .register(registry); - recordsPerBatchHistogram = recordsPerBatchMetric.labels(labelValues); + .register(collectorRegistry); + recordsPerBatchHistogram = recordsPerBatchMetric.labels(this.labelValues); String batchSizeBytesMetricName = String.format("%s_bufferedwriter_batch_size_bytes", metricsPrefix); batchSizeBytesMetric = new Histogram.Builder() .name(batchSizeBytesMetricName) - .labelNames(labelNames) + .labelNames(this.labelNames) .help("Batch size in bytes histogram") .buckets(BYTES_SIZE_PER_ENTRY_BUCKETS) - .register(registry); - batchSizeBytesHistogram = batchSizeBytesMetric.labels(labelValues); + .register(collectorRegistry); + batchSizeBytesHistogram = batchSizeBytesMetric.labels(this.labelValues); String oldestRecordInBatchDelayTimeSecondsMetricName = String.format("%s_bufferedwriter_batch_oldest_record_delay_time_second", metricsPrefix); oldestRecordInBatchDelayTimeSecondsMetric = new Histogram.Builder() .name(oldestRecordInBatchDelayTimeSecondsMetricName) - .labelNames(labelNames) + .labelNames(this.labelNames) .help("Max record latency in batch histogram") .buckets(MAX_DELAY_TIME_BUCKETS) - .register(registry); + .register(collectorRegistry); oldestRecordInBatchDelayTimeSecondsHistogram = - oldestRecordInBatchDelayTimeSecondsMetric.labels(labelValues); + oldestRecordInBatchDelayTimeSecondsMetric.labels(this.labelValues); String batchFlushTriggeringByMaxRecordsMetricName = String.format("%s_bufferedwriter_flush_trigger_max_records", metricsPrefix); batchFlushTriggeredByMaxRecordsMetric = new Counter.Builder() .name(batchFlushTriggeringByMaxRecordsMetricName) - .labelNames(labelNames) + .labelNames(this.labelNames) .help("Event count of batch flush triggered by max records count") - .register(registry); - batchFlushTriggeredByMaxRecordsCounter = batchFlushTriggeredByMaxRecordsMetric.labels(labelValues); + .register(collectorRegistry); + batchFlushTriggeredByMaxRecordsCounter = batchFlushTriggeredByMaxRecordsMetric.labels(this.labelValues); String batchFlushTriggeringByMaxSizeMetricName = String.format("%s_bufferedwriter_flush_trigger_max_size", metricsPrefix); batchFlushTriggeredByMaxSizeMetric = new Counter.Builder() .name(batchFlushTriggeringByMaxSizeMetricName) - .labelNames(labelNames) + .labelNames(this.labelNames) .help("Event count of batch flush triggered by max bytes size") - .register(registry); - batchFlushTriggeredByMaxSizeCounter = batchFlushTriggeredByMaxSizeMetric.labels(labelValues); + .register(collectorRegistry); + batchFlushTriggeredByMaxSizeCounter = batchFlushTriggeredByMaxSizeMetric.labels(this.labelValues); String batchFlushTriggeringByMaxDelayMetricName = String.format("%s_bufferedwriter_flush_trigger_max_delay", metricsPrefix); batchFlushTriggeredByMaxDelayMetric = new Counter.Builder() .name(batchFlushTriggeringByMaxDelayMetricName) - .labelNames(labelNames) + .labelNames(this.labelNames) .help("Event count of batch flush triggered by max delay time") - .register(registry); + .register(collectorRegistry); batchFlushTriggeredByMaxDelayCounter = - batchFlushTriggeredByMaxDelayMetric.labels(labelValues); + batchFlushTriggeredByMaxDelayMetric.labels(this.labelValues); String batchFlushTriggeringByLargeSingleDataMetricName = String.format("%s_bufferedwriter_flush_trigger_large_data", metricsPrefix); batchFlushTriggeredByLargeSingleDataMetric = new Counter.Builder() .name(batchFlushTriggeringByLargeSingleDataMetricName) - .labelNames(labelNames) + .labelNames(this.labelNames) .help("Event count of batch flush triggered by the single large data write") - .register(registry); + .register(collectorRegistry); batchFlushTriggeredByLargeSingleDataCounter = - batchFlushTriggeredByLargeSingleDataMetric.labels(labelValues); + batchFlushTriggeredByLargeSingleDataMetric.labels(this.labelValues); + + this.closed = new AtomicBoolean(false); } public void close() { - recordsPerBatchMetric.remove(labelValues); - batchSizeBytesMetric.remove(labelValues); - oldestRecordInBatchDelayTimeSecondsMetric.remove(labelValues); - batchFlushTriggeredByMaxRecordsMetric.remove(labelValues); - batchFlushTriggeredByMaxSizeMetric.remove(labelValues); - batchFlushTriggeredByMaxDelayMetric.remove(labelValues); - batchFlushTriggeredByLargeSingleDataMetric.remove(labelValues); + // Doing unregister more than once will throw exception, so avoid repeating close. + if (!closed.compareAndSet(false, true)){ + return; + } + collectorRegistry.unregister(recordsPerBatchMetric); + collectorRegistry.unregister(batchSizeBytesMetric); + collectorRegistry.unregister(oldestRecordInBatchDelayTimeSecondsMetric); + collectorRegistry.unregister(batchFlushTriggeredByMaxRecordsMetric); + collectorRegistry.unregister(batchFlushTriggeredByMaxSizeMetric); + collectorRegistry.unregister(batchFlushTriggeredByMaxDelayMetric); + collectorRegistry.unregister(batchFlushTriggeredByLargeSingleDataMetric); } public void triggerFlushByRecordsCount(int recordCount, long bytesSize, long delayMillis) { diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java index 41a2552d04f1f..1d5d9708a275f 100644 --- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java +++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java @@ -54,6 +54,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State.WriteFailed; +import static org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -84,7 +85,7 @@ public void testTransactionOperation(TxnLogBufferedWriterConfig txnLogBufferedWr MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator(); managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator); MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, - managedLedgerConfig, txnLogBufferedWriterConfig, transactionTimer); + managedLedgerConfig, txnLogBufferedWriterConfig, transactionTimer, DISABLED_BUFFERED_WRITER_METRICS); mlTransactionLog.initialize().get(2, TimeUnit.SECONDS); MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, @@ -172,7 +173,7 @@ public void testRecoverSequenceId(boolean isUseManagedLedgerProperties) throws E managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator); managedLedgerConfig.setMaxEntriesPerLedger(3); MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, - managedLedgerConfig, disabledBufferedWriter, transactionTimer); + managedLedgerConfig, disabledBufferedWriter, transactionTimer, DISABLED_BUFFERED_WRITER_METRICS); mlTransactionLog.initialize().get(2, TimeUnit.SECONDS); MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, @@ -201,7 +202,7 @@ public void testRecoverSequenceId(boolean isUseManagedLedgerProperties) throws E } mlTransactionLog.closeAsync().get(2, TimeUnit.SECONDS); mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, - managedLedgerConfig, disabledBufferedWriter, transactionTimer); + managedLedgerConfig, disabledBufferedWriter, transactionTimer, DISABLED_BUFFERED_WRITER_METRICS); mlTransactionLog.initialize().get(2, TimeUnit.SECONDS); transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, @@ -230,7 +231,7 @@ public void testInitTransactionReader(TxnLogBufferedWriterConfig txnLogBufferedW MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator(); managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator); MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, - managedLedgerConfig, txnLogBufferedWriterConfig, transactionTimer); + managedLedgerConfig, txnLogBufferedWriterConfig, transactionTimer, DISABLED_BUFFERED_WRITER_METRICS); mlTransactionLog.initialize().get(2, TimeUnit.SECONDS); MLTransactionMetadataStore transactionMetadataStore = @@ -282,7 +283,8 @@ public void testInitTransactionReader(TxnLogBufferedWriterConfig txnLogBufferedW transactionMetadataStore.closeAsync(); MLTransactionLogImpl txnLog2 = new MLTransactionLogImpl(transactionCoordinatorID, factory, - managedLedgerConfig, txnLogBufferedWriterConfig, transactionTimer); + managedLedgerConfig, txnLogBufferedWriterConfig, transactionTimer, + DISABLED_BUFFERED_WRITER_METRICS); txnLog2.initialize().get(2, TimeUnit.SECONDS); MLTransactionMetadataStore transactionMetadataStoreTest = @@ -356,7 +358,7 @@ public void testDeleteLog(TxnLogBufferedWriterConfig txnLogBufferedWriterConfig) MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator(); managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator); MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, - managedLedgerConfig, txnLogBufferedWriterConfig, transactionTimer); + managedLedgerConfig, txnLogBufferedWriterConfig, transactionTimer, DISABLED_BUFFERED_WRITER_METRICS); mlTransactionLog.initialize().get(2, TimeUnit.SECONDS); MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, @@ -436,7 +438,7 @@ public void testRecoverWhenDeleteFromCursor(TxnLogBufferedWriterConfig txnLogBuf MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator(); managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator); MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, - managedLedgerConfig, txnLogBufferedWriterConfig, transactionTimer); + managedLedgerConfig, txnLogBufferedWriterConfig, transactionTimer, DISABLED_BUFFERED_WRITER_METRICS); mlTransactionLog.initialize().get(2, TimeUnit.SECONDS); MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, @@ -454,7 +456,7 @@ public void testRecoverWhenDeleteFromCursor(TxnLogBufferedWriterConfig txnLogBuf transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.ABORTED, TxnStatus.ABORTING, false).get(); mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, - managedLedgerConfig, txnLogBufferedWriterConfig, transactionTimer); + managedLedgerConfig, txnLogBufferedWriterConfig, transactionTimer, DISABLED_BUFFERED_WRITER_METRICS); mlTransactionLog.initialize().get(2, TimeUnit.SECONDS); transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, @@ -476,7 +478,7 @@ public void testManageLedgerWriteFailState(TxnLogBufferedWriterConfig txnLogBuff MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator(); managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator); MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, - managedLedgerConfig, txnLogBufferedWriterConfig, transactionTimer); + managedLedgerConfig, txnLogBufferedWriterConfig, transactionTimer, DISABLED_BUFFERED_WRITER_METRICS); mlTransactionLog.initialize().get(2, TimeUnit.SECONDS); MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java index 5c38a8c52df4f..235e8b42514c8 100644 --- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java +++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java @@ -46,6 +46,7 @@ import org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry; import org.apache.pulsar.transaction.coordinator.proto.TxnStatus; import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase; +import static org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS; import static org.mockito.Mockito.*; import org.awaitility.Awaitility; import org.testng.Assert; @@ -82,7 +83,7 @@ public void testMainProcess(boolean writeWithBatch, boolean readWithBatch) throw bufferedWriterConfigForWrite.setBatchEnabled(writeWithBatch); TransactionCoordinatorID transactionCoordinatorID = TransactionCoordinatorID.get(0); MLTransactionLogImpl mlTransactionLogForWrite = new MLTransactionLogImpl(TransactionCoordinatorID.get(0), factory, - new ManagedLedgerConfig(), bufferedWriterConfigForWrite, transactionTimer); + new ManagedLedgerConfig(), bufferedWriterConfigForWrite, transactionTimer, DISABLED_BUFFERED_WRITER_METRICS); mlTransactionLogForWrite.initialize().get(3, TimeUnit.SECONDS); Map>> expectedMapping = new HashMap<>(); /** @@ -156,8 +157,9 @@ public void testMainProcess(boolean writeWithBatch, boolean readWithBatch) throw bufferedWriterConfigForRecover.setBatchedWriteMaxDelayInMillis(1000 * 3600); bufferedWriterConfigForRecover.setBatchedWriteMaxRecords(3); bufferedWriterConfigForRecover.setBatchEnabled(readWithBatch); - MLTransactionLogImpl mlTransactionLogForRecover = new MLTransactionLogImpl(TransactionCoordinatorID.get(0), factory, - new ManagedLedgerConfig(), bufferedWriterConfigForRecover, transactionTimer); + MLTransactionLogImpl mlTransactionLogForRecover = new MLTransactionLogImpl(TransactionCoordinatorID.get(0), + factory, new ManagedLedgerConfig(), bufferedWriterConfigForRecover, transactionTimer, + DISABLED_BUFFERED_WRITER_METRICS); mlTransactionLogForRecover.initialize().get(3, TimeUnit.SECONDS); // Recover and verify the txnID and position mappings. TransactionTimeoutTracker timeoutTracker = mock(TransactionTimeoutTracker.class); @@ -234,8 +236,9 @@ public void testMainProcess(boolean writeWithBatch, boolean readWithBatch) throw * the cursor-batch-indexes is expected. */ // Create another transaction log for recover. - MLTransactionLogImpl mlTransactionLogForDelete = new MLTransactionLogImpl(TransactionCoordinatorID.get(0), factory, - new ManagedLedgerConfig(), bufferedWriterConfigForRecover, transactionTimer); + MLTransactionLogImpl mlTransactionLogForDelete = new MLTransactionLogImpl(TransactionCoordinatorID.get(0), + factory, new ManagedLedgerConfig(), bufferedWriterConfigForRecover, transactionTimer, + DISABLED_BUFFERED_WRITER_METRICS); mlTransactionLogForDelete.initialize().get(3, TimeUnit.SECONDS); MLTransactionMetadataStore transactionMetadataStoreForDelete = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLogForDelete, timeoutTracker, sequenceIdGenerator, Integer.MAX_VALUE); diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java index e2151e7388138..5f254eb45b70d 100644 --- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java +++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java @@ -60,6 +60,7 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import static org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS; import static org.testng.Assert.*; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -189,7 +190,7 @@ public void testMainProcess(int batchedWriteMaxRecords, int batchedWriteMaxSize, TxnLogBufferedWriter txnLogBufferedWriter = new TxnLogBufferedWriter( managedLedger, orderedExecutor, transactionTimer, dataSerializer, batchedWriteMaxRecords, batchedWriteMaxSize, - batchedWriteMaxDelayInMillis, batchEnabled); + batchedWriteMaxDelayInMillis, batchEnabled, DISABLED_BUFFERED_WRITER_METRICS); // Store the param-context, param-position, param-exception of callback function and complete-count for verify. List contextArrayOfCallback = Collections.synchronizedList(new ArrayList<>()); List exceptionArrayOfCallback = Collections.synchronizedList(new ArrayList<>()); @@ -394,7 +395,8 @@ public Object answer(InvocationOnMock invocation) throws Throwable { }).when(managedLedger).asyncAddEntry(Mockito.any(ByteBuf.class), Mockito.any(), Mockito.any()); // Test threshold: writeMaxDelayInMillis (use timer). TxnLogBufferedWriter txnLogBufferedWriter1 = new TxnLogBufferedWriter<>(managedLedger, orderedExecutor, - transactionTimer, dataSerializer, 32, 1024 * 4, 100, true); + transactionTimer, dataSerializer, 32, 1024 * 4, + 100, true, DISABLED_BUFFERED_WRITER_METRICS); TxnLogBufferedWriter.AddDataCallback callback = Mockito.mock(TxnLogBufferedWriter.AddDataCallback.class); txnLogBufferedWriter1.asyncAddData(100, callback, 100); Thread.sleep(90); @@ -406,7 +408,8 @@ public Object answer(InvocationOnMock invocation) throws Throwable { // Test threshold: batchedWriteMaxRecords. TxnLogBufferedWriter txnLogBufferedWriter2 = new TxnLogBufferedWriter<>(managedLedger, orderedExecutor, - transactionTimer, dataSerializer, 32, 1024 * 4, 10000, true); + transactionTimer, dataSerializer, 32, 1024 * 4, + 10000, true, DISABLED_BUFFERED_WRITER_METRICS); for (int i = 0; i < 32; i++){ txnLogBufferedWriter2.asyncAddData(1, callback, 1); } @@ -416,7 +419,8 @@ public Object answer(InvocationOnMock invocation) throws Throwable { // Test threshold: batchedWriteMaxSize. TxnLogBufferedWriter txnLogBufferedWriter3 = new TxnLogBufferedWriter<>(managedLedger, orderedExecutor, - transactionTimer, dataSerializer, 1024, 64 * 4, 10000, true); + transactionTimer, dataSerializer, 1024, 64 * 4, + 10000, true, DISABLED_BUFFERED_WRITER_METRICS); for (int i = 0; i < 64; i++){ txnLogBufferedWriter3.asyncAddData(1, callback, 1); } @@ -444,7 +448,8 @@ public void testPendingScheduleTriggerTaskCount() throws Exception { // Create components. OrderedExecutor orderedExecutor = Mockito.mock(OrderedExecutor.class); ArrayBlockingQueue workQueue = new ArrayBlockingQueue<>(65536 * 2); - ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, workQueue); + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 5, + TimeUnit.SECONDS, workQueue); Mockito.when(orderedExecutor.chooseThread(Mockito.anyString())).thenReturn(threadPoolExecutor); HashedWheelTimer transactionTimer = new HashedWheelTimer(new DefaultThreadFactory("transaction-timer"), 1, TimeUnit.MILLISECONDS); @@ -452,8 +457,9 @@ public void testPendingScheduleTriggerTaskCount() throws Exception { // Mock managed ledger and write counter. MockedManagedLedger mockedManagedLedger = mockManagedLedgerWithWriteCounter(mlName); // Start tests. - TxnLogBufferedWriter txnLogBufferedWriter = new TxnLogBufferedWriter<>(mockedManagedLedger.managedLedger, orderedExecutor, - transactionTimer, dataSerializer, 2, 1024 * 4, 1, true); + TxnLogBufferedWriter txnLogBufferedWriter = new TxnLogBufferedWriter<>(mockedManagedLedger.managedLedger, + orderedExecutor, transactionTimer, dataSerializer, 2, 1024 * 4, + 1, true, DISABLED_BUFFERED_WRITER_METRICS); TxnLogBufferedWriter.AddDataCallback callback = Mockito.mock(TxnLogBufferedWriter.AddDataCallback.class); // Append heavier tasks to the Ledger thread. final ExecutorService executorService = orderedExecutor.chooseThread(mlName);