Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-196 Segmented transaction buffer snapshot #16913

Closed
liangyepianzhou opened this issue Aug 2, 2022 · 3 comments
Closed

PIP-196 Segmented transaction buffer snapshot #16913

liangyepianzhou opened this issue Aug 2, 2022 · 3 comments
Assignees
Labels
Milestone

Comments

@liangyepianzhou
Copy link
Contributor

liangyepianzhou commented Aug 2, 2022

Background

  1. We use TransactionBuffer to handle messages sent with transactions
  2. TransactionBuffer has provided the ability to check if the transaction is aborted and get the max read position of the topic for achieving transaction read committed isolation.
    a. The max read position is the safe read position of the topic; all the transactions before the max read position is closed.
    b. The transaction buffer has an aborted transaction list, which is able to filter out the uncommitted messages.
  3. In order to avoid building up the transaction buffer by replaying all the messages from the original topic, the transaction buffer snapshots are taken periodically.
  4. The snapshot will persist into a system topic under a namespace.

Motivation

In the background, we know that TransactionBuffer takes snapshots periodically. And the snapshot contains a position (maxReadPosition) and a LinkedMap (/aborts)/ which stores all the aborted transactions of the topic. But when the topic with long-term data retention and the topic has a lot of aborted transactions, a snapshot will gradually become a bottleneck that cannot accommodate enough aborted transactions, and the cost of snapshot updates will increase as the snapshot becomes larger.

The proposal will introduce the segmented transaction buffer snapshot which is able to split the snapshot into multiple parts and persist each part with an Entry to improve the above situation.

Goal

1 Support a large number of the abort transactions
2 Improve transaction buffer recovery speed.
3 Solve the problem of taking snapshots in system topic write amplification.

Approach Overview

We implement multiple snapshot segments through a secondary index design. Index and snapshot segments are stored in different compact topics. Find the snapshot segment by the index for TB to recover. After taking the snapshot segment, update the index in the index topic.

Snapshot segment

The snapshot segment is the immutable segment that contains the fixed number of aborted transactions and maxReadPosition identity of the position that this snapshot segment’s last abort transaction marker is persistent in the original topic. The size of the snapshot segment of the aborted transactions can be configured.

Snapshot topic

A new system topic will be used to store the snapshot segments. And the key of the snapshot is set to /multiple-sequenceID-topicName/. The sequence ID indicates the times of taking snapshots. When the transaction buffer takes snapshots, it will increase the sequence ID and use the sequence ID as a part of the key to taking snapshots. When the transaction buffer recovers, it obtains the max sequence ID in the indexes as its sequence ID. The sequence ID can be used to ensure the uniqueness of the corresponding snapshot, and to delete the snapshot by key = /multiple-sequenceID-topicName and value =null/.
The persistent deletion of the snapshot segment is to check whether the ledger of the max read position in the snapshot segment has been deleted in the original topic after a transaction marker is written into the original topic. If the ledger does not exist, we can delete the snapshot in the snapshot topic and update its index.
image
/Figure 2: snapshot topic/

Snapshot segment index

The snapshot index storing the snapshot index information will be written to a separate topic (/__transaction_buffer_snapshot_index/) so that we can read the index first instead of reading all the data of the transaction buffer snapshot segments in the snapshot topic.
We will store temporary snapshots (not reaching the fixed number of aborts stored in the snapshot segment) in the index. If we write in the snapshot topic, we also need to update the index, it will cause unnecessary overhead.
image
/Figure 2: snapshot segment workflow/

index topic

The index topic is a system topic shared by all topics under a namespace, and it is used to store the indexes of the snapshot segments. The indexes are updated after every time a snapshot segment is written into the snapshot topic or when needed to store the latest maxReadPosition and aborted transactions.
In the following example, the index topic stores the indexes for topic1 and topic2.
image
/Figure 3: snapshot index topic/

Workflow Overview

In the following figure, the aborted transaction divides into 4 segments and every segment has its max read position. The latest segment only has 3 aborted transactions, but its max read position has changed to (10:600). So the latest segment will be written into an index, and be persistent in the index topic.
When we need to check whether a transaction message is sent by an aborted transaction, we can use the position of the message to get the corresponding segment and check whether the transaction ID exists in the segment. When the transaction buffer recovers, it reads the index topic to rebuild the indexes map and uses the index to read the snapshots into memory. After recovering the transaction buffer with snapshots, the transaction buffer will read the original topic from the max read position of the latest segment to LAC to continue recovering aborted transactions and the max read position.
!image
/Figure 3: transaction buffer segmented snapshot workflow/

Configuration

Configuration Description Already exist
transactionBuffeSnapshotSegmentSize = 256 kb To determine the size of the snapshot segment. false
transactionBufferSegmentedSnapshotEnabled = false Whether to enable segmented transaction buffer snapshot which is able to handle a large number of aborted transactions. false
transactionBufferSnapshotMaxTransactionCount Transaction buffer updates the snapshot index after the number of transaction operations reaches this value. true
transactionBufferSnapshotMinTimeInMillis Transaction buffer updates the snapshot index interval time. true

Implementation

The implementation of the transaction buffer snapshot segment index can be found below. It contains the indexes of the snapshot segment and the latest unsealed snapshot segment.

public class TransactionBufferSnapshotIndexes{
public enum Type{
Indexes,
UnsealedSnapshot
}

private StringtopicName;

private Typetype;

private List<TransactionBufferSnapshotIndex> indexList;

private TransactionBufferSnapshotsnapshot;

@builder
@DaTa
@AllArgsConstructor
@NoArgsConstructor
public static class TransactionBufferSnapshotIndex{
public longsequenceID;
public longmaxReadPositionLedgerID;
public longmaxReadPositionEntryID;
public longpersistentPositionLedgerID;
public longpersistentPositionEntryID;
}

@DaTa
@AllArgsConstructor
@NoArgsConstructor
public static class TransactionBufferSnapshot{
private StringtopicName;
private longsequenceId;
private longmaxReadPositionLedgerId;
private longmaxReadPositionEntryId;
private List<TxnID> aborts;
}
}

Metrics Change

We need to add some metrics to AbortedTxnProcesor to help users adjust the configuration.
Mainly add the following two configurations:

Name Labels Type Description
pulsar_txn_tb_snapshot_segment_op_total op=“add_del_read” Counter This metric is the count of operations for the pulsar transaction buffer snapshot segment. The operation can be add, delete or read.
pulsar_txn_tb_snapshot_index_op_total op=“add_del_read” Counter This metric is the count of operations for the pulsar transaction buffer snapshot index. The operation can be add, delete or read.
pulsar_txn_tb_snapshot_segment_total null Gauge This metric records the number of the snapshot segments maintained in the pulsar transaction buffer.
pulsar_txn_tb_snapshot_index_entry_bytes null Histogram This metric records the size of the snapshot index entry maintained in the pulsar transaction buffer.

compatibility

### Upgrade

We keep the original snapshot topic and implement a new solution on the new snapshot topic and index topic. In this way, we can try new solutions without affecting the data in the original snapshot topic. After the TransactionBufferEnableSnapshotSegment is enabled for the first time, then when the transaction buffer recovers, it reads the index topic first. If the index topic has its index, the transaction buffer will be recovered with the new implementation. Otherwise, the transaction buffer will be recovered from the old snapshot topic, then the old snapshot will be written to the new snapshot topic as the first snapshot segment and its index will be written to the snapshot index topic.

  1. Enable transaction buffer snapshot segment by configuring TransactionBufferEnableSnapshotSegment = true.
  2. Configure the size of the snapshot segment according to the specific business environment. The transactionBuffeSnapshotSegmentSize defaults to 256kb.
  3. Restart the broker cluster.

### Downgrade

TransactionBufferEnableSnapshotSegment can also be turned off if the user does not wish to continue using the snapshot segment. Then the transaction buffer will continue to use the original snapshot topic and the original code logic for recovery.

  1. Close the transaction buffer snapshot segment by configuring TransactionBufferEnableSnapshotSegment = false.
  2. Restart the broker cluster.

Implement Step

  1. Add configurations.
  2. Implement a system topic client for snapshot topic and index topic.
  3. Implement a new processor to handle transaction buffer aborted transactions.
  4. Add transaction buffer snapshot metrics.

Reject approaches

optimize the data structure that stores the aborted transactions

The transaction buffer uses a LinkedMap<TxnID, Position> to store aborted transactions. This position is where the transaction commits or abort marker is written.

We have tried using RoaringBitmap to store TxnID, but RoaringBitmap does not compress well for huge data intervals. And transaction. In normal business scenarios, there may be an aborted transaction every 10W transactions. In this case, RoaringBitmap will use more memory.

The test report can be found here .

Change to managerLedger implementation

Since the shared system topic has a series of disadvantages mentioned above, we consider the implementation of one managerLedger per topic transaction buffer.

In this case, the topic transaction buffer can easily delete useless data, and the two questions above can be easily resolved.

But this solution will increase the metadata used by the topic.

@liangyepianzhou
Copy link
Contributor Author

The latest proposal can be found here.

liangyepianzhou added a commit that referenced this issue Sep 20, 2022
…ation (#16917)

* [improve][txn][PIP-196] Segmented transaction buffer snapshot
Master #16913
### Motivation
1. Add configurations.
2. Implement the system topic client for snapshot topic and index topic.
3. Implement AbortedTxnProcessor.
4. Add transaction buffer snapshot metrics.
### Modification
Add configuration for segment snaopshot.
@github-actions
Copy link

The issue had no activity for 30 days, mark with Stale label.

@github-actions github-actions bot added the Stale label Oct 11, 2022
liangyepianzhou added a commit that referenced this issue Oct 24, 2022
…x system topic (#16931)

Master Issue: #16913
### Motivation
Implement system topic client for snapshot segment topic and index topic to send segment snapshots or indexes.
The configuration `transactionBufferSegmentedSnapshotEnabled` is used in the Transaction Buffer to determine which `AbortedTxnProcessor` is adopted by this TB.
### Modification

In the new implementation of the Transaction Buffer Snapshot System topic, because the system topic that needs to be processed has changed from the original one to three with different schemes, we have added generics to the TransactionBufferSnapshotBaseSystemTopicClient class and the SystemTopicTxnBufferSnapshotService<T> class.
And Pulsar Service maintains a factory class TransactionBufferSnapshotServiceFactory used to obtain SystemTopicTxnBufferSnapshotService.
This way, we can obtain the required System topic client through pulsarService to read and send snapshots.
<img width="1336" alt="image" src="https://user-images.githubusercontent.com/55571188/197467173-9028e58a-79cc-4fe4-81e2-c299c568caee.png">
congbobo184 pushed a commit that referenced this issue Oct 31, 2022
…17847)

Master Issue: #16913

### Motivation

Implement an abortedTxnProcessor to handle the storage of the aborted transaction ID.
### Modifications
The structure overview: 
![image](https://user-images.githubusercontent.com/55571188/197683651-6ccb106d-1e71-4841-9da7-2644275a401a.png)

The main idea is to move the logic of the operation of checking and persistent aborted transaction IDs(take snapshots)  and the operation of updating maxReadPosition into the AbortedTxnProcessor.
And the AbortedTxnProcessor can be implemented in different designs.

**Add `persistentWorker` to handle snapshot persistenting** :
<img width="1003" alt="image" src="https://user-images.githubusercontent.com/55571188/198528131-3cde19bc-2034-4693-a8b1-4d6345e6db36.png">
The first four items below are the corresponding four tasks in the figure. The fifth item is not strictly a task, but a part of the first two tasks.
* takeSnapshotSegmentAsync -> writeSnapshotSegmentAsync
    * These two method is used to persist the snapshot segment.
* deleteSnapshotSegment
    * This method is used to delete the snapshot segment.

* updateIndexMetadataForTheLastSnapshot
    * Using to update index metadata (the latest snapshot).

* clearSnapshotSegmentAndIndexes
    * Delete all segments and then delete the index of this topic.


* updateSnapshotIndex
    * Called by the deleteSnapshotSegment and writeSnapshotSegmentAsync. Do update the index after writing snapshot segment.
    * Called by recovery as a compensation mechanism for updating the index.

### Documentation

<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->

- [x] `doc-not-needed` 
(Please explain why)


### Matching PR in the forked repository

PR in forked repository: liangyepianzhou#7
congbobo184 pushed a commit that referenced this issue Feb 13, 2023
Master Issue: #16913

### Motivation

Implement an abortedTxnProcessor to handle the storage of the aborted transaction ID.
### Modifications
The structure overview: 
![image](https://user-images.githubusercontent.com/55571188/197683651-6ccb106d-1e71-4841-9da7-2644275a401a.png)

The main idea is to move the logic of the operation of checking and persistent aborted transaction IDs(take snapshots)  and the operation of updating maxReadPosition into the AbortedTxnProcessor.
And the AbortedTxnProcessor can be implemented in different designs.

**Add `persistentWorker` to handle snapshot persistenting** :
<img width="1003" alt="image" src="https://user-images.githubusercontent.com/55571188/198528131-3cde19bc-2034-4693-a8b1-4d6345e6db36.png">
The first four items below are the corresponding four tasks in the figure. The fifth item is not strictly a task, but a part of the first two tasks.
* takeSnapshotSegmentAsync -> writeSnapshotSegmentAsync
    * These two method is used to persist the snapshot segment.
* deleteSnapshotSegment
    * This method is used to delete the snapshot segment.

* clearSnapshotSegmentAndIndexes
    * Delete all segments and then delete the index of this topic.

* updateSnapshotIndex
    * Called by the deleteSnapshotSegment and writeSnapshotSegmentAsync. Do update the index after writing the snapshot segment.
    * Called to update index snapshot by `takeSnapshotByChangeTimes` and `takeSnapshotByTimeout`.
    * Called by recovery as a compensation mechanism for updating the index.
@RobertIndie
Copy link
Member

Confirmed with @liangyepianzhou . There is still work left for metric and admin. Move this PIP to the 3.1.0 milestone to track.

@RobertIndie RobertIndie modified the milestones: 3.0.0, 3.1.0 Apr 11, 2023
@github-actions github-actions bot removed the Stale label Apr 12, 2023
congbobo184 pushed a commit that referenced this issue May 16, 2023
…napshot feature upgrade (#20235)

master #16913
## Motivation:
The transaction buffer segmented snapshot feature aims to improve the transaction buffer's performance by segmenting the snapshot and managing it more efficiently. However, for existing topics that were created before this feature was introduced, we need to ensure a seamless transition and compatibility when enabling the segmented snapshot feature.

## Modifications:
1. Updated the `recoverFromSnapshot()` method to read from another topic if the `persistentSnapshotIndexes` is null. This ensures that the appropriate snapshot data is fetched during the recovery process when upgrading to the segmented snapshot feature.
2. Created a new test `testSnapshotProcessorUpdate()` that verifies the compatibility of the transaction

### Verifying this change

- [ ] Make sure that the change passes the CI checks.

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads (10MB)*
  - *Extended integration test for recovery after broker failure*
congbobo184 pushed a commit that referenced this issue May 18, 2023
…napshot feature upgrade (#20235)

master #16913
## Motivation:
The transaction buffer segmented snapshot feature aims to improve the transaction buffer's performance by segmenting the snapshot and managing it more efficiently. However, for existing topics that were created before this feature was introduced, we need to ensure a seamless transition and compatibility when enabling the segmented snapshot feature.

## Modifications:
1. Updated the `recoverFromSnapshot()` method to read from another topic if the `persistentSnapshotIndexes` is null. This ensures that the appropriate snapshot data is fetched during the recovery process when upgrading to the segmented snapshot feature.
2. Created a new test `testSnapshotProcessorUpdate()` that verifies the compatibility of the transaction

### Verifying this change

- [ ] Make sure that the change passes the CI checks.

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads (10MB)*
  - *Extended integration test for recovery after broker failure*

(cherry picked from commit 8b929e6)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants