Skip to content

Commit

Permalink
[fix] [txn] Make txn metrics name conforms to the rule (#17905)
Browse files Browse the repository at this point in the history
Fixes: #17921

<strong>Note</strong>: 

This patch will change metrics names `s_bufferedwriter_batch_record_count` and `s_bufferedwriter_batch_oldest_record_delay_time_second`. These two names were first used in this PR #17701, and PR #17701 hasn't cherry-picked any branches yet, so this change will not cause any breaking changes.

### Motivation

https://github.com/poorbarcode/pulsar/actions/runs/3156649582/jobs/5136584463
https://github.com/apache/pulsar/actions/runs/3156649597/jobs/5136596447

#### Problem-1

If the `Prometheus-Colloctor` which typed `Counter` is named 'xxx_count',  then the output `metrics-api` will be named 'xxx_count_count'.

`TxnLogBufferedWriterMetricsStats` hits this error.

https://github.com/apache/pulsar/blob/fb7307d8f4998e42b18df3a4599fd7ec34cb04a9/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterMetricsStats.java#L105-L106


----

#### Problem-2

`PrometheusMetricsTest` defines the standard metrics name(see code below): 

```
["_sum", "_bucket", "_count", "_total", "_created"]
```

But the standard Prometheus name has three others( see: https://github.com/prometheus/client_java/blob/c28b901225e35e7c1df0eacae8b58fdfbb390162/simpleclient/src/main/java/io/prometheus/client/Collector.java#L152-L186 ):

```
["_info", "_gsum", "_gcount"]
```


https://github.com/apache/pulsar/blob/fb7307d8f4998e42b18df3a4599fd7ec34cb04a9/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java#L834-L861

----

### Modifications

- Make `PrometheusMetricsTest` run with transaction feature
- Make txn metrics name conforms to the rule. see: https://prometheus.io/docs/practices/naming/
- Make `PrometheusMetricsTest` support all suffix of prometheus metrics name

### Documentation

- [x] `doc-not-needed` 
(Please explain why)

### Matching PR in forked repository

PR in forked repository:

- poorbarcode#19
  • Loading branch information
poorbarcode authored Nov 16, 2022
1 parent 5d6a88e commit 3715934
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,38 @@
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import io.jsonwebtoken.SignatureAlgorithm;
import io.prometheus.client.Collector;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.math.RoundingMode;
import java.nio.charset.StandardCharsets;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.crypto.SecretKey;
import javax.naming.AuthenticationException;
import lombok.Cleanup;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
Expand Down Expand Up @@ -88,6 +94,9 @@ public class PrometheusMetricsTest extends BrokerTestBase {
protected void setup() throws Exception {
conf.setTopicLevelPoliciesEnabled(false);
conf.setSystemTopicEnabled(false);
conf.setTransactionCoordinatorEnabled(true);
conf.setTransactionLogBatchedWriteEnabled(true);
conf.setTransactionPendingAckBatchedWriteEnabled(true);
super.baseSetup();
AuthenticationProviderToken.resetMetrics();
}
Expand Down Expand Up @@ -773,6 +782,7 @@ public void testPerConsumerStats() throws Exception {
// Running the test twice to make sure types are present when generated multiple times
@Test(invocationCount = 2)
public void testDuplicateMetricTypeDefinitions() throws Exception {
Set<String> allPrometheusSuffixString = allPrometheusSuffixEnums();
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
for (int i = 0; i < 10; i++) {
Expand Down Expand Up @@ -831,32 +841,18 @@ public void testDuplicateMetricTypeDefinitions() throws Exception {

if (!typeDefs.containsKey(metricName)) {
// This may be OK if this is a _sum or _count metric from a summary
if (metricName.endsWith("_sum")) {
String summaryMetricName = metricName.substring(0, metricName.indexOf("_sum"));
if (!typeDefs.containsKey(summaryMetricName)) {
fail("Metric " + metricName + " does not have a corresponding summary type definition");
boolean isNorm = false;
for (String suffix : allPrometheusSuffixString){
if (metricName.endsWith(suffix)){
String summaryMetricName = metricName.substring(0, metricName.indexOf(suffix));
if (!typeDefs.containsKey(summaryMetricName)) {
fail("Metric " + metricName + " does not have a corresponding summary type definition");
}
isNorm = true;
break;
}
} else if (metricName.endsWith("_count")) {
String summaryMetricName = metricName.substring(0, metricName.indexOf("_count"));
if (!typeDefs.containsKey(summaryMetricName)) {
fail("Metric " + metricName + " does not have a corresponding summary type definition");
}
} else if (metricName.endsWith("_bucket")) {
String summaryMetricName = metricName.substring(0, metricName.indexOf("_bucket"));
if (!typeDefs.containsKey(summaryMetricName)) {
fail("Metric " + metricName + " does not have a corresponding summary type definition");
}
} else if (metricName.endsWith("_created")) {
String summaryMetricName = metricName.substring(0, metricName.indexOf("_created"));
if (!typeDefs.containsKey(summaryMetricName)) {
fail("Metric " + metricName + " does not have a corresponding summary type definition");
}
} else if (metricName.endsWith("_total")) {
String summaryMetricName = metricName.substring(0, metricName.indexOf("_total"));
if (!typeDefs.containsKey(summaryMetricName)) {
fail("Metric " + metricName + " does not have a corresponding counter type definition");
}
} else {
}
if (!isNorm){
fail("Metric " + metricName + " does not have a type definition");
}

Expand All @@ -867,6 +863,24 @@ public void testDuplicateMetricTypeDefinitions() throws Exception {
p2.close();
}

/***
* this method will return ["_sum", "_info", "_bucket", "_count", "_total", "_created", "_gsum", "_gcount"]
*/
public static Set<String> allPrometheusSuffixEnums(){
HashSet<String> result = new HashSet<>();
final String metricsName = "123";
for (Collector.Type type : Collector.Type.values()){
Collector.MetricFamilySamples metricFamilySamples =
new Collector.MetricFamilySamples(metricsName, type, "", new ArrayList<>());
result.addAll(Arrays.asList(metricFamilySamples.getNames()));
}
return result.stream()
.map(str -> str.substring(metricsName.length()))
.filter(str -> StringUtils.isNotBlank(str))
.collect(Collectors.toSet());
}


@Test
public void testManagedLedgerCacheStats() throws Exception {
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.TenantResources;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
Expand All @@ -56,6 +57,7 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand All @@ -75,6 +77,8 @@ public class TransactionBatchWriterMetricsTest extends MockedPulsarServiceBaseTe

@BeforeClass
public void setup() throws Exception {
MLTransactionMetadataStoreProvider.initBufferedWriterMetrics("localhost");
MLPendingAckStoreProvider.initBufferedWriterMetrics("localhost");
super.internalSetup();
}

Expand Down Expand Up @@ -130,7 +134,7 @@ public void testTransactionMetaLogMetrics() throws Exception{

// verify tc.
String metrics_key_txn_tc_record_count_sum =
"pulsar_txn_tc_bufferedwriter_batch_record_count_sum{cluster=\"%s\",broker=\"%s\"} ";
"pulsar_txn_tc_bufferedwriter_batch_records_sum{cluster=\"%s\",broker=\"%s\"} ";
Assert.assertTrue(searchMetricsValue(metricsLines,
String.format(metrics_key_txn_tc_record_count_sum, metricsLabelCluster, metricsLabelBroker))
> 0);
Expand All @@ -146,7 +150,7 @@ public void testTransactionMetaLogMetrics() throws Exception{
> 0);
// verify pending ack.
String metrics_key_txn_pending_ack_record_count_sum =
"pulsar_txn_pending_ack_store_bufferedwriter_batch_record_count_sum{cluster=\"%s\",broker=\"%s\"} ";
"pulsar_txn_pending_ack_store_bufferedwriter_batch_records_sum{cluster=\"%s\",broker=\"%s\"} ";
Assert.assertTrue(searchMetricsValue(metricsLines,
String.format(metrics_key_txn_pending_ack_record_count_sum, metricsLabelCluster, metricsLabelBroker))
> 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public TxnLogBufferedWriterMetricsStats(String metricsPrefix, String[] labelName
this.labelValues = labelValues.clone();

String recordsPerBatchMetricName =
String.format("%s_bufferedwriter_batch_record_count", metricsPrefix);
String.format("%s_bufferedwriter_batch_records", metricsPrefix);
recordsPerBatchMetric = new Histogram.Builder()
.name(recordsPerBatchMetricName)
.labelNames(this.labelNames)
Expand All @@ -122,7 +122,7 @@ public TxnLogBufferedWriterMetricsStats(String metricsPrefix, String[] labelName
batchSizeBytesHistogram = batchSizeBytesMetric.labels(this.labelValues);

String oldestRecordInBatchDelayTimeSecondsMetricName =
String.format("%s_bufferedwriter_batch_oldest_record_delay_time_second", metricsPrefix);
String.format("%s_bufferedwriter_batch_oldest_record_delay_seconds", metricsPrefix);
oldestRecordInBatchDelayTimeSecondsMetric = new Histogram.Builder()
.name(oldestRecordInBatchDelayTimeSecondsMetricName)
.labelNames(this.labelNames)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -997,17 +997,17 @@ private void verifyTheCounterMetrics(int triggeredByRecordCount, int triggeredBy
private void verifyTheHistogramMetrics(int batchFlushCount, int totalRecordsCount, int totalSize){
// Total flush count.
assertEquals(
getHistogramCount(String.format("%s_bufferedwriter_batch_record_count", metricsPrefix)),
getHistogramCount(String.format("%s_bufferedwriter_batch_records", metricsPrefix)),
batchFlushCount);
assertEquals(
getHistogramCount(String.format("%s_bufferedwriter_batch_size_bytes", metricsPrefix)),
batchFlushCount);
assertEquals(
getHistogramCount(String.format("%s_bufferedwriter_batch_oldest_record_delay_time_second", metricsPrefix)),
getHistogramCount(String.format("%s_bufferedwriter_batch_oldest_record_delay_seconds", metricsPrefix)),
batchFlushCount);
// Total records count.
assertEquals(
getHistogramSum(String.format("%s_bufferedwriter_batch_record_count", metricsPrefix)),
getHistogramSum(String.format("%s_bufferedwriter_batch_records", metricsPrefix)),
totalRecordsCount);
// Total data size.
assertEquals(
Expand Down

0 comments on commit 3715934

Please sign in to comment.