Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][txn] PIP-160 Metrics stats of Transaction buffered writer #16758

Merged
merged 65 commits into from
Sep 14, 2022

Conversation

poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Jul 23, 2022

Master Issue: #15370

Motivation

see #15370

Modifications

I will complete proposal #15370 with these pull requests( current pull request is a part of step 7-1 ):

  1. Write the batch transaction log handler: TxnLogBufferedWriter
  2. Configuration changes and protocol changes.
  3. Transaction log store enables the batch feature.
  4. Pending ack log store enables the batch feature.
  5. Supports dynamic configuration.
  6. Append admin API for transaction batch log and docs( admin and configuration doc ).
    GET /admin/v3/transactions/coordinatorStats
    GET /admin/v3/transactions/pendingAckStats/:tenant/:namespace:/:topic:/:subName
  7. Append metrics support for transaction batch log.
    7-1. Metrics of Txn Buffered Writer.
    7-2. TransactionLog and PendingAckStore enables the Metrics of Txn Buffered Writer

The desired effect

TransactionLog should create TxnLogBufferedWriter with params:

{
  "metricsPrefix": "pulsar_txn_tc",
  "labelNames": "coordinatorId",
  "labelValues": "1"
}

The metrics output of TransactionLog will like this:

# A metrics for how many batches were triggered due to threshold "batchedWriteMaxRecords".
# TYPE pulsar_txn_tc_batched_log_batched_log_triggering_count_by_records Counter
pulsar_txn_tc_batched_log_batched_log_triggering_count_by_records{coordinatorId="1"} 15
...
...
...
# pulsar_txn_tc_batched_log_records_count_per_entry A metrics for how many records in per batch written by the component[pulsar_txn_tc] per batch.
# TYPE pulsar_txn_tc_batched_log_records_count_per_entry Histogram
pulsar_txn_tc_batched_log_records_count_per_entry_bucket{coordinatorId="1", le="10"} 1
pulsar_txn_tc_batched_log_records_count_per_entry_bucket{coordinatorId="1", le="50"} 3
pulsar_txn_tc_batched_log_records_count_per_entry_bucket{coordinatorId="1", le="100"} 5
pulsar_txn_tc_batched_log_records_count_per_entry_bucket{coordinatorId="1", le="500"} 10
pulsar_txn_tc_batched_log_records_count_per_entry_bucket{coordinatorId="1", le="1000"} 10
pulsar_txn_tc_batched_log_records_count_per_entry_bucket{coordinatorId="1", le="+Inf"} 10
pulsar_txn_tc_batched_log_records_count_per_entry_count{coordinatorId="1", le="+Inf"} 10
pulsar_txn_tc_batched_log_records_count_per_entry_sum{coordinatorId="1", le="+Inf"} 5432

PendingAckStore is the same. But all the PendingackStores will not differentiate the Subscription labels (because there are too many)


Manage the registered collectors ourselves.

To build Metrics Stat, we need to execute these two steps:

  1. Create Collector and register to CollectorRegistry, perhaps the Collector is Histogram or Counter
  2. Register labels to Collector and get Collector.child(holds by Metrics Stat). This step can also be omitted because we can execute collector.labels(labelValues) to get Collector.child.

In the Transaction log scenario, multiple Transaction Logs share the same Collector, and each has its own Collector.Child, so when we build metrics stat for each Transaction Log, we call collector.labels(labelValues) to get the Collector.Child. However, the CollectorRegistry does not provide an API like this:

public Collector getRegistedCollector(String name);

and it will throw IllegalArgumentException when we registering collector with the same name more than once, see:

https://github.com/prometheus/client_java/blob/1966186deaaa83aec496d88ff604a90795bad688/simpleclient/src/main/java/io/prometheus/client/CollectorRegistry.java#L49-L65

So we have to manage the registered collectors ourselves.


Holds the Collector.child by each Metrics stat instance

To save the overhead of collector.labels(labelValues), we make each Metrics Stat hold a reference of Collector.child, because this method is not light enough:

https://github.com/prometheus/client_java/blob/1966186deaaa83aec496d88ff604a90795bad688/simpleclient/src/main/java/io/prometheus/client/SimpleCollector.java#L63-L80


Code will be removed in the next PR (7-2)

In the complete design, we should have two implementations like UML blow, one for enabling the batch feature, and another for disabled:

uml

To reduce later maintenance costs, I'd like to ditch the 'DisabledMetricsStat' and we'll always use the implementation 'MetricsStatimpl' even if the Txn buffer writer disables the batch feature. This constructor without 'param-metricsStats' and these' null checks' will be removed in the next PR. This is compatible only with split PR, making each PR have less code

Documentation

  • doc-required

  • doc-not-needed

  • doc

  • doc-complete

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jul 23, 2022
@poorbarcode poorbarcode force-pushed the pip/160-15 branch 2 times, most recently from f28469a to 5bfbd31 Compare July 27, 2022 00:38
Copy link
Contributor

@tjiuming tjiuming left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see where the TxnLogBufferedWriterMetricsDefinition.java's labelNames and labelValues has been set, except in the tests.

@tjiuming
Copy link
Contributor

@asafm Could you please help review the PR?

@poorbarcode
Copy link
Contributor Author

poorbarcode commented Jul 28, 2022

Hi @tjiuming

I don't see where the TxnLogBufferedWriterMetricsDefinition.java's labelNames and labelValues has been set, except in the tests.

  • TxnLogBufferedWriterMetricsDefinition will create by MlTransactionLogImpl and MlPendingAckStore in next PR.
  • In tests TxnLogBufferedWriterTest, labelNames and labelValues is set by Initializes block which is at top of the class.

@poorbarcode
Copy link
Contributor Author

/pulsarbot rerun-failure-checks

@asafm
Copy link
Contributor

asafm commented Jul 31, 2022

I started writing a few comments then I realized I have some profound suggestions I thought maybe it's a better idea to raise them first.

I think currently this PR in my opinion quite complicated. It took me 2 hours to get the hang of it all.
Here is what I suggest IMO to make it a bit simpler:

  1. There is a lot of logic related to removing labels once the buffered writer is closing. It's even a bit more complicated as some buffered writers share the same label values, so they have one child of the histogram collector for example, but many usages. It's very complicated.

BufferedWriter is used on two occasions: Pending Ack stores, and Transaction Log.
Transaction Log doesn't need that complicated logic as it creates a single Buffered Writer.
Pending Ack Stores does. Since it has a provider class, we can make this logic "applicative" and place it in the MLPendingAckStoreProvider.

How?
BufferedWriter will get a BufferedWriterMetrics in the constructor.
BufferedWriterMetrics will get BufferedWriterMetricsConfiguration in its constructor.

BufferedWriterMetricsConfiguration is the same as the definition you have:
labelNames, labelValues, metricsPrefix (you called it component) and enabledMetrics.

Since you said the pending ack stores will share the same labels, but won't be differentiated by the label values, you can create a single BufferedWriterMetrics instance and use it whenever you create a new Buffered writer.

When a ledger is closed, its buffered writer is closed.
BufferedWriterMetrics will be closed by its creator: MLPendingAckStoreProvider will know when to close it's the only instance since it can easily keep track or already have the number of open ack stores. TransactionLog will keep its only instance and will close it upon closing the managed ledger / buffered writer.

  1. I wouldn't bother with unregistering the metric. It's only relevant when there are no pending ack stores. The only cost in this case: is 2 lines emitted in the Prometheus output (help and type) since there are no samples to print.

  2. In BufferedWriter Metrics init, I would use the same optimization trick I saw at FunctionStatsManager:
    step 1: create all metrics: histogram, collectors, etc. mainly supplying labels names.
    step 2: create the child of each collector, by supplying the label values. Save it as a variable and use it.

  3. In close(), just remove the labels from the collector.

  4. One larger change I would do: have BufferedWriterMetrics class have an action method: batchFlushedTriggedByForceFlush(), and its arguments containing anything you need for your metrics update. hide everything inside, including the appendistogram you have there.

  5. I would get rid of the disabled static instance, and simply do nothing upon each action method in BufferedWriterMetrics if metricsEnabled is false. Encapsulate it.

I have many more comments, but I thought it's best to discuss the big ones first.

@poorbarcode
Copy link
Contributor Author

Hi @asafm

Here is what I suggest IMO to make it a bit simpler: 1-6

I've taken care of all the suggestions, could you review this PR again?

Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also update the metrics documentation

@@ -272,23 +290,43 @@ private void doTrigFlush(boolean force, boolean byScheduleThreads){
return;
}
if (force) {
if (metricsStats != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add a default TxnLogBufferedWriterMetricsStatsDisabled implementation to avoid many null checks.

Copy link
Contributor Author

@poorbarcode poorbarcode Aug 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add a default TxnLogBufferedWriterMetricsStatsDisabled implementation to avoid many null checks.

Good Idea. In the complete design, we should have two implementations like UML blow, one for enabling the batch feature, and another for disabled:

uml

Sorry, I should have added some code comments in this PR(It's now written in Modifications).
To reduce later maintenance costs, I'd like to ditch the 'DisabledMetricsStat' and we'll always use the implementation 'MetricsStatimpl' even if the Txn buffer writer disables batch feature. This constructor without 'param-metricsStats' and these' null checks' will be removed in the next PR( 7-2 in Modifications ). This is compatible only with split PR, making each PR have less code

* {@link TxnLogBufferedWriterMetricsStats}, such as The Transaction Coordinator using coordinatorId as label and
* The Transaction Pending Ack Store using subscriptionName as label.
*/
private static final HashMap<String, Collector> COLLECTOR_CACHE = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a map here?
Looks like it only used in the constructor.

Copy link
Contributor Author

@poorbarcode poorbarcode Aug 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a map here? Looks like it is only used in the constructor.

Yes, we need it.

To build Metrics Stat, we need execute these two steps:

  1. Create Collector and register to CollectorRegistry, perhaps the Collector is Histogram or Counter
  2. Register labels to Collector and get Collector.child(holds by Metrics Stat). This step can also be omitted, because we can execute collector.labels(labelValues) to get Collector.child.

In the Transaction log scenario, multiple Transaction Log share the same Collector, and each has its own Collector.Child, so when we build metrics stat for each Transaction Log, we call collector.labels(labelValues) to get the Collector.Child.

( High light)
However, the CollectorRegistry does not provide an API like below, and it will throw IllegalArgumentException when we registering collector with the same name more than once.

public Collector getRegistedCollector(String name);

Throws IllegalArgumentException at this line:

https://github.com/prometheus/client_java/blob/1966186deaaa83aec496d88ff604a90795bad688/simpleclient/src/main/java/io/prometheus/client/CollectorRegistry.java#L49-L65

So we have to manage the registered collectors ourselves.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a way to solve this issue by making sure we're not defining metrics twice:
We know that we plan to create only one Metrics instance per metic-prefix. So in that case, both TxLog and PendingAckStoreProvider will create one with its own prefix, and that's it. No need to verify it was created before. In the event a future developer will make a mistake, it will fail in the constructor in some test right since CollectorRegistry.register() will fail on a duplicate.

Copy link
Contributor Author

@poorbarcode poorbarcode Aug 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @asafm

That's a good way to do it, but this can make the code confuse:

  • When the Transaction Log Provider opens a Transaction Log, passed the Histogram to Transaction Log. That is OK.
  • When the Transaction Log close, remove the labels. That is ok too.

But the Transaction Log Provider will hold all the Collector of Txn Buffered Writer, this is confusing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the Transaction Log Provider will hold all the Collector of Txn Buffered Writer; this is confusing

Can you explain this further? I didn't understand how the provider would hold all the Collectors (metrics). It should only be to a single instance of the metric class.

Copy link
Contributor Author

@poorbarcode poorbarcode Aug 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @asafm

Can you explain this further? I didn't understand how the provider would hold all the Collectors (metrics). It should only be to a single instance of the metric class.

When MLTransactionMetadataStoreProvider initialized, we create Collector like this:

public MLTransactionMetadataStoreProvider(){
   this.recordsPerBatchMetric = ...
   this.batchSizeBytesMetric = ...
   this.oldestRecordInBatchDelayTimeSecondsMetric = ...
   this.batchFlushTriggeredByMaxRecordsMetric = ...
   this.batchFlushTriggeredByMaxSizeMetric = ...
   this.batchFlushTriggeredByMaxDelayMetric = ...
}

And when creating MlTransactionLogImpl, pass these Collector to MlTransactionLogImpl like this:

public class MLTransactionMetadataStoreProvider{
  public TransactionMetadataStore openStore(...){
    TransactionMetadataStore store = ...;
    setMetrics(store);
    return store;
  }

  private void setMetrics(TransactionMetadataStore store) {
    store.recordsPerBatchMetric = this.recordsPerBatchMetric;
    store.batchSizeBytesMetric = this.batchSizeBytesMetric;
    store.oldestRecordInBatchDelayTimeSecondsMetric = this.oldestRecordInBatchDelayTimeSecondsMetric;
    ...
  }
}

The MLTransactionMetadataStoreProvider will hold all the Collector of Txn Buffered Writer, this is confusing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either what you wrote is completely the wrong direction or I completely misunderstood you :)

I thought that in MLTransactionLogImp, when open a ledger you also create a buffered writer, thus in here you will also pass an instance of TxnLogBufferedWriterMetricsStats.

                    public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
                        MLTransactionLogImpl.this.managedLedger = ledger;
                        MLTransactionLogImpl.this.bufferedWriter = new TxnLogBufferedWriter<>(
                                managedLedger, ((ManagedLedgerImpl) managedLedger).getExecutor(),
                                timer, TransactionLogDataSerializer.INSTANCE,
                                txnLogBufferedWriterConfig.getBatchedWriteMaxRecords(),
                                txnLogBufferedWriterConfig.getBatchedWriteMaxSize(),
                                txnLogBufferedWriterConfig.getBatchedWriteMaxDelayInMillis(),
                                txnLogBufferedWriterConfig.isBatchEnabled());

As I understand, this only happens once per instance of MLTransactionLogImpl. So I don't understand why you want to pass metric by metric. Something doesn't add up here.

I guess my main question is: how many MLTransactionLogImpl are there in a single broker process? More than 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, maybe more than one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, if more than one, then the design must change. I thought the whole idea was that you have a single instance of metrics per metric prefix.
If not, after much thought I suggest the following:

abstract class BufferMetrics {
	protected abstract void observeRecordsPerBatch(int)
	protected abstract void incFlushTriggeredByMaxRecords(int)
}

MLTransactionMetadataStoreBufferedWriterMetrics extends BufferMetrics {
	static private Histogram recordsPerBatchMetric = new Histogram.Builder()
                        .name("pulsar_tx_store_bufferedwriter_batch_record_count")
                        .labelNames(new String[]{"txCoordinatorId"})
                        .help("Records per batch histogram")
                        .buckets(RECORD_COUNT_PER_ENTRY_BUCKETS)
                        .register(registry));
	
    private Histogram.Child recordsPerBatchHistogram;                      


	public MLTransactionMetadataStoreBufferedWriterMetrics(String txCoordinatorId) {
	    recordsPerBatchHistogram = recordsPerBatchHistogram.labels(txCoordinatorId)

	}
     
       protected observeRecordsPerBatch(value) {
            recordsPerBatchHistogram.observe(value)       
       }

}

Another approach which I disliked a bit, but it's still ok:
Add to Pulsar Common:

class PrometheusRegistryChecker {
     static defaultMetricRegistryNameToCollector = new HashMap<String, Collector>()

     static Collector registerIfNotExists(collector) {}
}

Like FunctionCollectorRegistryImpl

I prefer the second implementation because if we need to define many variables of Collector type. And Map is really just another representation of multiple variables.

Now common has not the dependency: prometheus-client

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I can leave with that but just to be on the safe side I asked also for @tjiuming advice here.

@codelipenghui codelipenghui removed this from the 2.11.0 milestone Aug 2, 2022
@poorbarcode
Copy link
Contributor Author

rebase master

@congbobo184 congbobo184 merged commit 57317d6 into apache:master Sep 14, 2022
@poorbarcode poorbarcode deleted the pip/160-15 branch September 17, 2022 02:51
congbobo184 pushed a commit that referenced this pull request Sep 29, 2022
…trics (#17701)

Master Issue: #15370

### Modifications

- Make transaction `MLTransactionMetadataStoreProvider` & `MLPendingAckStoreProvider` support buffered writer metrics.
  - Motivation: #15370

----

- Delete constructor of `TxnLogBufferedWriter` without parameter `metrics`.
  - Motivation: it is unnecessary.

---- 

- Add a default `DisabledTxnLogBufferedWriterMetricsStats` implementation.

----

- Previous PR remaining code to optimize: remove the check code `if (metrics != null)`. The motivation see:
  - Motivation: #16758 (comment)

----

- Make transaction log buffered writer only create by the `MLTransactionMetadataStoreProvider` & `MLPendingAckStoreProvider`. 
  - Motivation: #16758 (comment)

### Documentation

- [ ] `doc-required` 

- [x] `doc-not-needed` 

- [ ] `doc` 

- [ ] `doc-complete`


### Matching PR in forked repository

PR in forked repository: 

- poorbarcode#3
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/metrics area/transaction doc-not-needed Your PR changes do not impact docs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants