Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][txn] Add getState in transaction for client API #17423

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1469,4 +1469,52 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
Assert.assertTrue(t instanceof BrokerServiceException.ServiceUnitNotReadyException);
}
}

@Test
public void testGetTxnState() throws Exception {
Transaction transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS)
.build().get();

// test OPEN and TIMEOUT
assertEquals(transaction.getState(), Transaction.State.OPEN);
Transaction timeoutTxn = transaction;
Awaitility.await().until(() -> timeoutTxn.getState() == Transaction.State.TIME_OUT);

// test abort
transaction = pulsarClient.newTransaction().withTransactionTimeout(3, TimeUnit.SECONDS)
.build().get();
transaction.abort().get();
assertEquals(transaction.getState(), Transaction.State.ABORTED);

// test commit
transaction = pulsarClient.newTransaction().withTransactionTimeout(3, TimeUnit.SECONDS)
.build().get();
transaction.commit().get();
assertEquals(transaction.getState(), Transaction.State.COMMITTED);

// test error
transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS)
.build().get();
pulsarServiceList.get(0).getTransactionMetadataStoreService()
.endTransaction(transaction.getTxnID(), 0, false);
transaction.commit();
Transaction errorTxn = transaction;
Awaitility.await().until(() -> errorTxn.getState() == Transaction.State.ERROR);

// test committing
transaction = pulsarClient.newTransaction().withTransactionTimeout(3, TimeUnit.SECONDS)
.build().get();
((TransactionImpl) transaction).registerSendOp(new CompletableFuture<>());
transaction.commit();
Transaction committingTxn = transaction;
Awaitility.await().until(() -> committingTxn.getState() == Transaction.State.COMMITTING);

// test aborting
transaction = pulsarClient.newTransaction().withTransactionTimeout(3, TimeUnit.SECONDS)
.build().get();
((TransactionImpl) transaction).registerSendOp(new CompletableFuture<>());
transaction.abort();
Transaction abortingTxn = transaction;
Awaitility.await().until(() -> abortingTxn.getState() == Transaction.State.ABORTING);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,7 @@ public void testTxnTimeOutInClient() throws Exception{
.build().get();
producer.newMessage().send();
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(((TransactionImpl)transaction).getState(), TransactionImpl.State.TIMEOUT);
Assert.assertEquals(((TransactionImpl)transaction).getState(), TransactionImpl.State.TIME_OUT);
});

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,53 @@
@InterfaceStability.Evolving
public interface Transaction {

enum State {

/**
* When the transaction is in the `OPEN` state, it can produce with transaction and ack with the transaction.
congbobo184 marked this conversation as resolved.
Show resolved Hide resolved
*
* When the transaction is in the `OPEN` state, it can commit or abort.
congbobo184 marked this conversation as resolved.
Show resolved Hide resolved
*/
OPEN,

/**
* When the client invokes commit, the state will change to `COMMITTING` from `OPEN`.
congbobo184 marked this conversation as resolved.
Show resolved Hide resolved
*/
COMMITTING,

/**
* When the client invokes abort, the state will change to `ABORTING` from `OPEN`.
congbobo184 marked this conversation as resolved.
Show resolved Hide resolved
*/
ABORTING,

/**
* When the client receives the response to the commit, the state will change to `COMMITTED` from `COMMITTING`.
congbobo184 marked this conversation as resolved.
Show resolved Hide resolved
*/
COMMITTED,

/**
* When the client receives the response to the abort, the state will change to `ABORTED` from `ABORTING`.
congbobo184 marked this conversation as resolved.
Show resolved Hide resolved
*/
ABORTED,

/**
* When the client invokes commit or abort but transaction not exist in coordinator,
congbobo184 marked this conversation as resolved.
Show resolved Hide resolved
* the state will change to `ERROR`.
congbobo184 marked this conversation as resolved.
Show resolved Hide resolved
*
* When the client invokes commit, but the transaction state in coordinator is committed or committing,
congbobo184 marked this conversation as resolved.
Show resolved Hide resolved
* the state will change to `ERROR`.
congbobo184 marked this conversation as resolved.
Show resolved Hide resolved
*
* When the client invokes abort, but the transaction state in coordinator is aborted or aborting,
congbobo184 marked this conversation as resolved.
Show resolved Hide resolved
* the state will change to `ERROR`.
congbobo184 marked this conversation as resolved.
Show resolved Hide resolved
*/
ERROR,
congbobo184 marked this conversation as resolved.
Show resolved Hide resolved

/**
* When the transaction timeout and the state is in `OPEN`, the state will change to `TIME_OUT` from `OPEN`.
congbobo184 marked this conversation as resolved.
Show resolved Hide resolved
*/
TIME_OUT
}

/**
* Commit the transaction.
*
Expand All @@ -48,4 +95,12 @@ public interface Transaction {
* @return {@link TxnID} the txnID.
*/
TxnID getTxnID();

/**
* Get transaction state.
*
* @return {@link State} the state of the transaction.
*/
State getState();

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,7 @@ public class TransactionImpl implements Transaction , TimerTask {

@Override
public void run(Timeout timeout) throws Exception {
STATE_UPDATE.compareAndSet(this, State.OPEN, State.TIMEOUT);
}

public enum State {
OPEN,
COMMITTING,
ABORTING,
COMMITTED,
ABORTED,
ERROR,
TIMEOUT
STATE_UPDATE.compareAndSet(this, State.OPEN, State.TIME_OUT);
}

TransactionImpl(PulsarClientImpl client,
Expand Down Expand Up @@ -215,6 +205,11 @@ public TxnID getTxnID() {
return new TxnID(txnIdMostBits, txnIdLeastBits);
}

@Override
public State getState() {
return state;
}

public <T> boolean checkIfOpen(CompletableFuture<T> completableFuture) {
if (state == State.OPEN) {
return true;
Expand Down