Skip to content

Commit

Permalink
[improve][txn] Add getState in transaction for client API (apache#17423)
Browse files Browse the repository at this point in the history
### Motivation
now `org.apache.pulsar.client.api.transaction.Transaction` dont have a interface for user to get the transaction state.

user can get the transaction state to do user's own op.
### Modifications
1. add the interface in `org.apache.pulsar.client.api.transaction.Transaction`     `getState`
2. TransactionImpl implement the interface
```
     * Get transaction state.
     *
     * @return {@link State} the state of the transaction.
     */
    State getState();
```
### Verifying this change
add the test

(cherry picked from commit a9531db)
(cherry picked from commit db4fbfb)
  • Loading branch information
congbobo184 authored and nicoloboschi committed Dec 6, 2022
1 parent c3747d8 commit 3e65e9f
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1264,4 +1264,52 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
Assert.assertTrue(t instanceof BrokerServiceException.ServiceUnitNotReadyException);
}
}

@Test
public void testGetTxnState() throws Exception {
Transaction transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS)
.build().get();

// test OPEN and TIMEOUT
assertEquals(transaction.getState(), Transaction.State.OPEN);
Transaction timeoutTxn = transaction;
Awaitility.await().until(() -> timeoutTxn.getState() == Transaction.State.TIME_OUT);

// test abort
transaction = pulsarClient.newTransaction().withTransactionTimeout(3, TimeUnit.SECONDS)
.build().get();
transaction.abort().get();
assertEquals(transaction.getState(), Transaction.State.ABORTED);

// test commit
transaction = pulsarClient.newTransaction().withTransactionTimeout(3, TimeUnit.SECONDS)
.build().get();
transaction.commit().get();
assertEquals(transaction.getState(), Transaction.State.COMMITTED);

// test error
transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS)
.build().get();
pulsarServiceList.get(0).getTransactionMetadataStoreService()
.endTransaction(transaction.getTxnID(), 0, false);
transaction.commit();
Transaction errorTxn = transaction;
Awaitility.await().until(() -> errorTxn.getState() == Transaction.State.ERROR);

// test committing
transaction = pulsarClient.newTransaction().withTransactionTimeout(3, TimeUnit.SECONDS)
.build().get();
((TransactionImpl) transaction).registerSendOp(new CompletableFuture<>());
transaction.commit();
Transaction committingTxn = transaction;
Awaitility.await().until(() -> committingTxn.getState() == Transaction.State.COMMITTING);

// test aborting
transaction = pulsarClient.newTransaction().withTransactionTimeout(3, TimeUnit.SECONDS)
.build().get();
((TransactionImpl) transaction).registerSendOp(new CompletableFuture<>());
transaction.abort();
Transaction abortingTxn = transaction;
Awaitility.await().until(() -> abortingTxn.getState() == Transaction.State.ABORTING);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,7 @@ public void testTxnTimeOutInClient() throws Exception{
.build().get();
producer.newMessage().send();
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(((TransactionImpl)transaction).getState(), TransactionImpl.State.TIMEOUT);
Assert.assertEquals(((TransactionImpl)transaction).getState(), TransactionImpl.State.TIME_OUT);
});

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,55 @@
@InterfaceStability.Evolving
public interface Transaction {

enum State {

/**
* When a transaction is in the `OPEN` state, messages can be produced and acked with this transaction.
*
* When a transaction is in the `OPEN` state, it can commit or abort.
*/
OPEN,

/**
* When a client invokes a commit, the transaction state is changed from `OPEN` to `COMMITTING`.
*/
COMMITTING,

/**
* When a client invokes an abort, the transaction state is changed from `OPEN` to `ABORTING`.
*/
ABORTING,

/**
* When a client receives a response to a commit, the transaction state is changed from
* `COMMITTING` to `COMMITTED`.
*/
COMMITTED,

/**
* When a client receives a response to an abort, the transaction state is changed from `ABORTING` to `ABORTED`.
*/
ABORTED,

/**
* When a client invokes a commit or an abort, but a transaction does not exist in a coordinator,
* then the state is changed to `ERROR`.
*
* When a client invokes a commit, but the transaction state in a coordinator is `ABORTED` or `ABORTING`,
* then the state is changed to `ERROR`.
*
* When a client invokes an abort, but the transaction state in a coordinator is `COMMITTED` or `COMMITTING`,
* then the state is changed to `ERROR`.
*/
ERROR,

/**
* When a transaction is timed out and the transaction state is `OPEN`,
* then the transaction state is changed from `OPEN` to `TIME_OUT`.
*/
TIME_OUT
}

/**
* Commit the transaction.
*
Expand All @@ -48,4 +97,12 @@ public interface Transaction {
* @return {@link TxnID} the txnID.
*/
TxnID getTxnID();

/**
* Get transaction state.
*
* @return {@link State} the state of the transaction.
*/
State getState();

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,7 @@ public class TransactionImpl implements Transaction , TimerTask {

@Override
public void run(Timeout timeout) throws Exception {
STATE_UPDATE.compareAndSet(this, State.OPEN, State.TIMEOUT);
}

public enum State {
OPEN,
COMMITTING,
ABORTING,
COMMITTED,
ABORTED,
ERROR,
TIMEOUT
STATE_UPDATE.compareAndSet(this, State.OPEN, State.TIME_OUT);
}

TransactionImpl(PulsarClientImpl client,
Expand Down Expand Up @@ -236,6 +226,11 @@ public TxnID getTxnID() {
return new TxnID(txnIdMostBits, txnIdLeastBits);
}

@Override
public State getState() {
return state;
}

public <T> boolean checkIfOpen(CompletableFuture<T> completableFuture) {
if (state == State.OPEN) {
return true;
Expand Down

0 comments on commit 3e65e9f

Please sign in to comment.