Skip to content

Commit

Permalink
[feat][txn]Implement transactionImpl (#984)
Browse files Browse the repository at this point in the history
Master Issue:#932
### Motivation
Implement transaction coordinator client.
### Modifications
1. Implement transaction coordinator
2. implement transactionImpl
3. Implement transaction in producer and consumer API
  • Loading branch information
liangyepianzhou authored Mar 24, 2023
1 parent 20291f5 commit 09dea66
Show file tree
Hide file tree
Showing 5 changed files with 461 additions and 26 deletions.
28 changes: 21 additions & 7 deletions pulsar/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package pulsar

import "fmt"
import (
"fmt"

proto "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
)

// Result used to represent pulsar processing is an alias of type int.
type Result int
Expand Down Expand Up @@ -103,14 +107,11 @@ const (
ProducerClosed
// SchemaFailure means the payload could not be encoded using the Schema
SchemaFailure

// ReachMaxPendingOps means the pending operations in transaction_impl coordinator reach the maximum.
ReachMaxPendingOps
// InvalidStatus means the component status is not as expected.
InvalidStatus
// TransactionError means this is a transaction related error
TransactionError

// TransactionNoFoundError The transaction is not exist in the transaction coordinator, It may be an error txn
// or already ended.
TransactionNoFoundError
// ClientMemoryBufferIsFull client limit buffer is full
ClientMemoryBufferIsFull
)
Expand Down Expand Up @@ -221,7 +222,20 @@ func getResultStr(r Result) string {
return "SchemaFailure"
case ClientMemoryBufferIsFull:
return "ClientMemoryBufferIsFull"
case TransactionNoFoundError:
return "TransactionNoFoundError"
default:
return fmt.Sprintf("Result(%d)", r)
}
}

func getErrorFromServerError(serverError *proto.ServerError) error {
switch *serverError {
case proto.ServerError_TransactionNotFound:
return newError(TransactionNoFoundError, serverError.String())
case proto.ServerError_InvalidTxnStatus:
return newError(InvalidStatus, serverError.String())
default:
return newError(UnknownError, serverError.String())
}
}
48 changes: 47 additions & 1 deletion pulsar/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,53 @@

package pulsar

import (
"context"
)

// TxnState The state of the transaction. Check the state of the transaction before executing some operation
// with the transaction is necessary.
type TxnState int32

const (
_ TxnState = iota
// TxnOpen The transaction in TxnOpen state can be used to send/ack messages.
TxnOpen
// TxnCommitting The state of the transaction will be TxnCommitting after the commit method is called.
// The transaction in TxnCommitting state can be committed again.
TxnCommitting
// TxnAborting The state of the transaction will be TxnAborting after the abort method is called.
// The transaction in TxnAborting state can be aborted again.
TxnAborting
// TxnCommitted The state of the transaction will be TxnCommitted after the commit method is executed success.
// This means that all the operations with the transaction are success.
TxnCommitted
// TxnAborted The state of the transaction will be TxnAborted after the abort method is executed success.
// This means that all the operations with the transaction are aborted.
TxnAborted
// TxnError The state of the transaction will be TxnError after the operation of transaction get a non-retryable error.
TxnError
// TxnTimeout The state of the transaction will be TxnTimeout after the transaction timeout.
TxnTimeout
)

// TxnID An identifier for representing a transaction.
type TxnID struct {
mostSigBits uint64
// mostSigBits The most significant 64 bits of this TxnID.
mostSigBits uint64
// leastSigBits The least significant 64 bits of this TxnID.
leastSigBits uint64
}

// Transaction used to guarantee exactly-once
type Transaction interface {
//Commit You can commit the transaction after all the sending/acknowledging operations with the transaction success.
Commit(context.Context) error
//Abort You can abort the transaction when you want to abort all the sending/acknowledging operations
// with the transaction.
Abort(context.Context) error
//GetState Get the state of the transaction.
GetState() TxnState
//GetTxnID Get the identified ID of the transaction.
GetTxnID() TxnID
}
35 changes: 26 additions & 9 deletions pulsar/transaction_coordinator_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,16 @@ func (tc *transactionCoordinatorClient) newTransaction(timeout time.Duration) (*
TxnTtlSeconds: proto.Uint64(uint64(timeout.Milliseconds())),
}

cnx, err := tc.client.rpcClient.RequestOnCnx(tc.cons[nextTcID], requestID, pb.BaseCommand_NEW_TXN, cmdNewTxn)
res, err := tc.client.rpcClient.RequestOnCnx(tc.cons[nextTcID], requestID, pb.BaseCommand_NEW_TXN, cmdNewTxn)
tc.semaphore.Release()
if err != nil {
return nil, err
} else if res.Response.NewTxnResponse.Error != nil {
return nil, getErrorFromServerError(res.Response.NewTxnResponse.Error)
}

return &TxnID{*cnx.Response.NewTxnResponse.TxnidMostBits,
*cnx.Response.NewTxnResponse.TxnidLeastBits}, nil
return &TxnID{*res.Response.NewTxnResponse.TxnidMostBits,
*res.Response.NewTxnResponse.TxnidLeastBits}, nil
}

// addPublishPartitionToTxn register the partitions which published messages with the transactionImpl.
Expand All @@ -137,10 +139,15 @@ func (tc *transactionCoordinatorClient) addPublishPartitionToTxn(id *TxnID, part
TxnidLeastBits: proto.Uint64(id.leastSigBits),
Partitions: partitions,
}
_, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits], requestID,
res, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits], requestID,
pb.BaseCommand_ADD_PARTITION_TO_TXN, cmdAddPartitions)
tc.semaphore.Release()
return err
if err != nil {
return err
} else if res.Response.AddPartitionToTxnResponse.Error != nil {
return getErrorFromServerError(res.Response.AddPartitionToTxnResponse.Error)
}
return nil
}

// addSubscriptionToTxn register the subscription which acked messages with the transactionImpl.
Expand All @@ -160,10 +167,15 @@ func (tc *transactionCoordinatorClient) addSubscriptionToTxn(id *TxnID, topic st
TxnidLeastBits: proto.Uint64(id.leastSigBits),
Subscription: []*pb.Subscription{sub},
}
_, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits], requestID,
res, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits], requestID,
pb.BaseCommand_ADD_SUBSCRIPTION_TO_TXN, cmdAddSubscription)
tc.semaphore.Release()
return err
if err != nil {
return err
} else if res.Response.AddSubscriptionToTxnResponse.Error != nil {
return getErrorFromServerError(res.Response.AddSubscriptionToTxnResponse.Error)
}
return nil
}

// endTxn commit or abort the transactionImpl.
Expand All @@ -178,9 +190,14 @@ func (tc *transactionCoordinatorClient) endTxn(id *TxnID, action pb.TxnAction) e
TxnidMostBits: proto.Uint64(id.mostSigBits),
TxnidLeastBits: proto.Uint64(id.leastSigBits),
}
_, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits], requestID, pb.BaseCommand_END_TXN, cmdEndTxn)
res, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits], requestID, pb.BaseCommand_END_TXN, cmdEndTxn)
tc.semaphore.Release()
return err
if err != nil {
return err
} else if res.Response.EndTxnResponse.Error != nil {
return getErrorFromServerError(res.Response.EndTxnResponse.Error)
}
return nil
}

func getTCAssignTopicName(partition uint64) string {
Expand Down
Loading

0 comments on commit 09dea66

Please sign in to comment.