Skip to content

Commit

Permalink
Move replication related code to subfolder under history (#3204)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 authored Apr 20, 2020
1 parent 8f77569 commit 24aa94d
Show file tree
Hide file tree
Showing 18 changed files with 490 additions and 473 deletions.
4 changes: 2 additions & 2 deletions common/client/versionChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ const (
JavaWorkerStickyQueryVersion = "1.0.0"
// GoWorkerConsistentQueryVersion indicates the minimum client version of the go worker which supports ConsistentQuery
GoWorkerConsistentQueryVersion = "1.5.0"
// JavaWorkerSendRawWorkflowHistoryVersion indicates the minimum client version of the java worker which supports RawHistoryQuery
// JavaWorkerRawHistoryQueryVersion indicates the minimum client version of the java worker which supports RawHistoryQuery
JavaWorkerRawHistoryQueryVersion = "1.3.0"
// GoWorkerSendRawWorkflowHistoryVersion indicates the minimum client version of the go worker which supports RawHistoryQuery
// GoWorkerRawHistoryQueryVersion indicates the minimum client version of the go worker which supports RawHistoryQuery
GoWorkerRawHistoryQueryVersion = "1.6.0"
// CLIRawHistoryQueryVersion indicates the minimum CLI version of the go worker which supports RawHistoryQuery
// Note: cli uses go client feature version
Expand Down
2 changes: 2 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ const (
LastBlobNextPageToken = -1
// EndMessageID is the id of the end message, here we use the int64 max
EndMessageID int64 = 1<<63 - 1
// EmptyMessageID is the default start message ID for replication level
EmptyMessageID = -1
)

const (
Expand Down
5 changes: 3 additions & 2 deletions service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/uber/cadence/service/history/config"
"github.com/uber/cadence/service/history/engine"
"github.com/uber/cadence/service/history/events"
"github.com/uber/cadence/service/history/replication"
"github.com/uber/cadence/service/history/shard"
"github.com/uber/cadence/service/history/task"
)
Expand All @@ -66,7 +67,7 @@ type (
historyEventNotifier events.Notifier
publisher messaging.Producer
rateLimiter quotas.Limiter
replicationTaskFetchers ReplicationTaskFetchers
replicationTaskFetchers replication.TaskFetchers
queueTaskProcessor task.Processor
}
)
Expand Down Expand Up @@ -125,7 +126,7 @@ func (h *Handler) Start() {
}
}

h.replicationTaskFetchers = NewReplicationTaskFetchers(
h.replicationTaskFetchers = replication.NewTaskFetchers(
h.GetLogger(),
h.config,
h.GetClusterMetadata().GetReplicationConsumerConfig(),
Expand Down
22 changes: 12 additions & 10 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/history/ndc"
"github.com/uber/cadence/service/history/query"
"github.com/uber/cadence/service/history/replication"
"github.com/uber/cadence/service/history/reset"
"github.com/uber/cadence/service/history/shard"
"github.com/uber/cadence/service/history/task"
Expand All @@ -70,6 +71,7 @@ const (
activityCancellationMsgActivityNotStarted = "ACTIVITY_ID_NOT_STARTED"
defaultQueryFirstDecisionTaskWaitTime = time.Second
queryFirstDecisionTaskCheckInterval = 200 * time.Millisecond
replicationTimeout = 30 * time.Second
)

type (
Expand Down Expand Up @@ -99,13 +101,13 @@ type (
resetor reset.WorkflowResetor
workflowResetter reset.WorkflowResetter
queueTaskProcessor task.Processor
replicationTaskProcessors []ReplicationTaskProcessor
replicationTaskProcessors []replication.TaskProcessor
publicClient workflowserviceclient.Interface
eventsReapplier nDCEventsReapplier
matchingClient matching.Client
rawMatchingClient matching.Client
clientChecker client.VersionChecker
replicationDLQHandler replicationDLQHandler
replicationDLQHandler replication.DLQHandler
}
)

Expand Down Expand Up @@ -161,7 +163,7 @@ func NewEngineWithShardContext(
historyEventNotifier events.Notifier,
publisher messaging.Producer,
config *config.Config,
replicationTaskFetchers ReplicationTaskFetchers,
replicationTaskFetchers replication.TaskFetchers,
rawMatchingClient matching.Client,
queueTaskProcessor task.Processor,
) engine.Engine {
Expand Down Expand Up @@ -265,7 +267,7 @@ func NewEngineWithShardContext(
nil,
shard.GetLogger(),
)
replicationTaskExecutor := newReplicationTaskExecutor(
replicationTaskExecutor := replication.NewTaskExecutor(
currentClusterName,
shard.GetDomainCache(),
nDCHistoryResender,
Expand All @@ -274,9 +276,9 @@ func NewEngineWithShardContext(
shard.GetMetricsClient(),
shard.GetLogger(),
)
var replicationTaskProcessors []ReplicationTaskProcessor
var replicationTaskProcessors []replication.TaskProcessor
for _, replicationTaskFetcher := range replicationTaskFetchers.GetFetchers() {
replicationTaskProcessor := NewReplicationTaskProcessor(
replicationTaskProcessor := replication.NewTaskProcessor(
shard,
historyEngImpl,
config,
Expand All @@ -287,7 +289,7 @@ func NewEngineWithShardContext(
replicationTaskProcessors = append(replicationTaskProcessors, replicationTaskProcessor)
}
historyEngImpl.replicationTaskProcessors = replicationTaskProcessors
replicationMessageHandler := newReplicationDLQHandler(shard, replicationTaskExecutor)
replicationMessageHandler := replication.NewDLQHandler(shard, replicationTaskExecutor)
historyEngImpl.replicationDLQHandler = replicationMessageHandler

shard.SetEngine(historyEngImpl)
Expand Down Expand Up @@ -2944,7 +2946,7 @@ func (e *historyEngineImpl) ReadDLQMessages(
request *r.ReadDLQMessagesRequest,
) (*r.ReadDLQMessagesResponse, error) {

tasks, token, err := e.replicationDLQHandler.readMessages(
tasks, token, err := e.replicationDLQHandler.ReadMessages(
ctx,
request.GetSourceCluster(),
request.GetInclusiveEndMessageID(),
Expand All @@ -2966,7 +2968,7 @@ func (e *historyEngineImpl) PurgeDLQMessages(
request *r.PurgeDLQMessagesRequest,
) error {

return e.replicationDLQHandler.purgeMessages(
return e.replicationDLQHandler.PurgeMessages(
request.GetSourceCluster(),
request.GetInclusiveEndMessageID(),
)
Expand All @@ -2977,7 +2979,7 @@ func (e *historyEngineImpl) MergeDLQMessages(
request *r.MergeDLQMessagesRequest,
) (*r.MergeDLQMessagesResponse, error) {

token, err := e.replicationDLQHandler.mergeMessages(
token, err := e.replicationDLQHandler.MergeMessages(
ctx,
request.GetSourceCluster(),
request.GetInclusiveEndMessageID(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination replicationDLQHandler_mock.go -self_package github.com/uber/cadence/service/history
//go:generate mockgen -copyright_file ../../../LICENSE -package $GOPACKAGE -source $GOFILE -destination dlq_handler_mock.go

package history
package replication

import (
"context"
Expand All @@ -34,20 +34,20 @@ import (
)

type (
// replicationDLQHandler is the interface handles replication DLQ messages
replicationDLQHandler interface {
readMessages(
// DLQHandler is the interface handles replication DLQ messages
DLQHandler interface {
ReadMessages(
ctx context.Context,
sourceCluster string,
lastMessageID int64,
pageSize int,
pageToken []byte,
) ([]*replicator.ReplicationTask, []byte, error)
purgeMessages(
PurgeMessages(
sourceCluster string,
lastMessageID int64,
) error
mergeMessages(
MergeMessages(
ctx context.Context,
sourceCluster string,
lastMessageID int64,
Expand All @@ -56,26 +56,29 @@ type (
) ([]byte, error)
}

replicationDLQHandlerImpl struct {
replicationTaskExecutor replicationTaskExecutor
shard shard.Context
logger log.Logger
dlqHandlerImpl struct {
taskExecutor TaskExecutor
shard shard.Context
logger log.Logger
}
)

func newReplicationDLQHandler(
var _ DLQHandler = (*dlqHandlerImpl)(nil)

// NewDLQHandler initialize the replication message DLQ handler
func NewDLQHandler(
shard shard.Context,
replicationTaskExecutor replicationTaskExecutor,
) replicationDLQHandler {
taskExecutor TaskExecutor,
) DLQHandler {

return &replicationDLQHandlerImpl{
shard: shard,
replicationTaskExecutor: replicationTaskExecutor,
logger: shard.GetLogger(),
return &dlqHandlerImpl{
shard: shard,
taskExecutor: taskExecutor,
logger: shard.GetLogger(),
}
}

func (r *replicationDLQHandlerImpl) readMessages(
func (r *dlqHandlerImpl) ReadMessages(
ctx context.Context,
sourceCluster string,
lastMessageID int64,
Expand All @@ -93,7 +96,7 @@ func (r *replicationDLQHandlerImpl) readMessages(
return tasks, token, err
}

func (r *replicationDLQHandlerImpl) readMessagesWithAckLevel(
func (r *dlqHandlerImpl) readMessagesWithAckLevel(
ctx context.Context,
sourceCluster string,
lastMessageID int64,
Expand Down Expand Up @@ -142,7 +145,7 @@ func (r *replicationDLQHandlerImpl) readMessagesWithAckLevel(
return dlqResponse.ReplicationTasks, ackLevel, resp.NextPageToken, nil
}

func (r *replicationDLQHandlerImpl) purgeMessages(
func (r *dlqHandlerImpl) PurgeMessages(
sourceCluster string,
lastMessageID int64,
) error {
Expand All @@ -169,7 +172,7 @@ func (r *replicationDLQHandlerImpl) purgeMessages(
return nil
}

func (r *replicationDLQHandlerImpl) mergeMessages(
func (r *dlqHandlerImpl) MergeMessages(
ctx context.Context,
sourceCluster string,
lastMessageID int64,
Expand All @@ -186,7 +189,7 @@ func (r *replicationDLQHandlerImpl) mergeMessages(
)

for _, task := range tasks {
if _, err := r.replicationTaskExecutor.execute(
if _, err := r.taskExecutor.execute(
sourceCluster,
task,
true,
Expand Down
105 changes: 105 additions & 0 deletions service/history/replication/dlq_handler_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 24aa94d

Please sign in to comment.