Skip to content

Commit

Permalink
Merge branch 'master' into replicator_internal_types
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored Oct 15, 2020
2 parents 39f767d + 3ea959f commit 19e357e
Show file tree
Hide file tree
Showing 12 changed files with 83 additions and 33 deletions.
52 changes: 48 additions & 4 deletions .gen/go/replicator/replicator.go

Large diffs are not rendered by default.

14 changes: 8 additions & 6 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,10 @@ const (
ReplicationDLQStatsScope
// FailoverMarkerScope is scope used by all metrics emitted related to failover marker
FailoverMarkerScope
// HistoryReplicationV2TaskScope is the scope used by history task replication processing
HistoryReplicationV2TaskScope
// SyncActivityTaskScope is the scope used by sync activity information processing
SyncActivityTaskScope

NumHistoryScopes
)
Expand Down Expand Up @@ -1038,10 +1042,6 @@ const (
ReplicatorScope = iota + NumCommonScopes
// DomainReplicationTaskScope is the scope used by domain task replication processing
DomainReplicationTaskScope
// HistoryReplicationV2TaskScope is the scope used by history task replication processing
HistoryReplicationV2TaskScope
// SyncActivityTaskScope is the scope used by sync activity information processing
SyncActivityTaskScope
// ESProcessorScope is scope used by all metric emitted by esProcessor
ESProcessorScope
// IndexProcessorScope is scope used by all metric emitted by index processor
Expand Down Expand Up @@ -1517,6 +1517,8 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
ReplicationTaskCleanupScope: {operation: "ReplicationTaskCleanup"},
ReplicationDLQStatsScope: {operation: "ReplicationDLQStats"},
FailoverMarkerScope: {operation: "FailoverMarker"},
HistoryReplicationV2TaskScope: {operation: "HistoryReplicationV2Task"},
SyncActivityTaskScope: {operation: "SyncActivityTask"},
},
// Matching Scope Names
Matching: {
Expand All @@ -1535,8 +1537,6 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
Worker: {
ReplicatorScope: {operation: "Replicator"},
DomainReplicationTaskScope: {operation: "DomainReplicationTask"},
HistoryReplicationV2TaskScope: {operation: "HistoryReplicationV2Task"},
SyncActivityTaskScope: {operation: "SyncActivityTask"},
ESProcessorScope: {operation: "ESProcessor"},
IndexProcessorScope: {operation: "IndexProcessor"},
ArchiverDeleteHistoryActivityScope: {operation: "ArchiverDeleteHistoryActivity"},
Expand Down Expand Up @@ -1883,6 +1883,7 @@ const (
DecisionStartToCloseTimeoutOverrideCount
ReplicationTaskCleanupCount
ReplicationTaskCleanupFailure
ReplicationTaskLatency
MutableStateChecksumMismatch
MutableStateChecksumInvalidated
GracefulFailoverLatency
Expand Down Expand Up @@ -2352,6 +2353,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
DecisionStartToCloseTimeoutOverrideCount: {metricName: "decision_start_to_close_timeout_overrides", metricType: Counter},
ReplicationTaskCleanupCount: {metricName: "replication_task_cleanup_count", metricType: Counter},
ReplicationTaskCleanupFailure: {metricName: "replication_task_cleanup_failed", metricType: Counter},
ReplicationTaskLatency: {metricName: "replication_task_latency", metricType: Timer},
MutableStateChecksumMismatch: {metricName: "mutable_state_checksum_mismatch", metricType: Counter},
MutableStateChecksumInvalidated: {metricName: "mutable_state_checksum_invalidated", metricType: Counter},
GracefulFailoverLatency: {metricName: "graceful_failover_latency", metricType: Timer},
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/nosql/nosqlplugin/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ type (
SelectDomainMetadata(ctx context.Context) (int64, error)
}

// QueueMessageRow defines the row struct for queue message
// DomainRow defines the row struct for queue message
DomainRow struct {
Info *persistence.DomainInfo
Config *NoSQLInternalDomainConfig
Expand Down
2 changes: 1 addition & 1 deletion idls
Submodule idls updated from 7dd6f8 to b5def4
1 change: 1 addition & 0 deletions service/history/execution/mutable_state_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4217,6 +4217,7 @@ func (e *mutableStateBuilder) eventsToReplicationTask(
return nil, err
}

// the visibility timestamp will be set in shard context
replicationTask := &persistence.HistoryReplicationTask{
FirstEventID: firstEvent.GetEventId(),
NextEventID: lastEvent.GetEventId() + 1,
Expand Down
1 change: 1 addition & 0 deletions service/history/execution/mutable_state_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func convertSyncActivityInfos(
for item := range inputs {
activityInfo, ok := activityInfos[item]
if ok {
// the visibility timestamp will be set in shard context
outputs = append(outputs, &persistence.SyncActivityTask{
Version: activityInfo.Version,
ScheduledID: activityInfo.ScheduleID,
Expand Down
6 changes: 3 additions & 3 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,10 +480,10 @@ func (e *historyEngineImpl) registerDomainFailoverCallback() {
domainActiveCluster != e.currentClusterName &&
previousFailoverVersion != common.InitialPreviousFailoverVersion &&
e.clusterMetadata.ClusterNameForFailoverVersion(previousFailoverVersion) == e.currentClusterName {
// the visibility timestamp will be set in shard context
failoverMarkerTasks = append(failoverMarkerTasks, &persistence.FailoverMarkerTask{
VisibilityTimestamp: e.timeSource.Now(),
Version: nextDomain.GetFailoverVersion(),
DomainID: nextDomain.GetInfo().ID,
Version: nextDomain.GetFailoverVersion(),
DomainID: nextDomain.GetInfo().ID,
})
}
}
Expand Down
4 changes: 3 additions & 1 deletion service/history/replication/task_ack_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,8 @@ func (t *taskAckManagerImpl) generateFailoverMarkerTask(
FailoverMarkerAttributes: &replicator.FailoverMarkerAttributes{
DomainID: common.StringPtr(taskInfo.GetDomainID()),
FailoverVersion: common.Int64Ptr(taskInfo.GetVersion()),
CreationTime: common.Int64Ptr(taskInfo.CreationTime),
},
CreationTime: common.Int64Ptr(taskInfo.CreationTime),
}
}

Expand Down Expand Up @@ -525,6 +525,7 @@ func (t *taskAckManagerImpl) generateSyncActivityTask(
LastFailureDetails: activityInfo.LastFailureDetails,
VersionHistory: versionHistory,
},
CreationTime: common.Int64Ptr(taskInfo.CreationTime),
}, nil
},
)
Expand Down Expand Up @@ -592,6 +593,7 @@ func (t *taskAckManagerImpl) generateHistoryReplicationTask(
Events: eventsBlob,
NewRunEvents: newRunEventsBlob,
},
CreationTime: common.Int64Ptr(task.CreationTime),
}
return replicationTask, nil
},
Expand Down
2 changes: 1 addition & 1 deletion service/history/replication/task_ack_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ func (s *taskAckManagerSuite) TestGenerateFailoverMarkerTask() {
s.Equal(replicator.ReplicationTaskTypeFailoverMarker, task.GetTaskType())
s.Equal(domainID, task.GetFailoverMarkerAttributes().GetDomainID())
s.Equal(int64(2), task.GetFailoverMarkerAttributes().GetFailoverVersion())
s.Equal(int64(3), task.GetFailoverMarkerAttributes().GetCreationTime())
s.Equal(int64(3), task.GetCreationTime())
}

func (s *taskAckManagerSuite) TestGenerateSyncActivityTask_OK() {
Expand Down
9 changes: 1 addition & 8 deletions service/history/replication/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ package replication

import (
"context"
"time"

"github.com/uber/cadence/.gen/go/history"
r "github.com/uber/cadence/.gen/go/replicator"
Expand Down Expand Up @@ -254,13 +253,7 @@ func (e *taskExecutorImpl) handleFailoverReplicationTask(
task *r.ReplicationTask,
) error {
failoverAttributes := task.GetFailoverMarkerAttributes()
now := e.shard.GetTimeSource().Now()
e.metricsClient.Scope(
metrics.FailoverMarkerScope,
).RecordTimer(
metrics.FailoverMarkerReplicationLatency,
now.Sub(time.Unix(0, failoverAttributes.GetCreationTime())),
)
failoverAttributes.CreationTime = task.CreationTime
return e.shard.AddingPendingFailoverMarker(failoverAttributes)
}

Expand Down
21 changes: 13 additions & 8 deletions service/history/replication/task_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,21 +411,26 @@ func (p *taskProcessorImpl) processSingleTask(replicationTask *r.ReplicationTask
}

func (p *taskProcessorImpl) processTaskOnce(replicationTask *r.ReplicationTask) error {
startTime := time.Now()
ts := p.shard.GetTimeSource()
startTime := ts.Now()
scope, err := p.taskExecutor.execute(
replicationTask,
false)

if err != nil {
p.updateFailureMetric(scope, err)
} else {
p.logger.Debug("Successfully applied replication task.", tag.TaskID(replicationTask.GetSourceTaskId()))
p.metricsClient.Scope(
metrics.ReplicationTaskFetcherScope,
metrics.TargetClusterTag(p.sourceCluster),
).IncCounter(metrics.ReplicationTasksApplied)
p.metricsClient.Scope(metrics.ReplicationTaskFetcherScope).
RecordTimer(metrics.TaskProcessingLatency, time.Now().Sub(startTime))
now := ts.Now()
mScope := p.metricsClient.Scope(scope, metrics.TargetClusterTag(p.sourceCluster))
// emit the number of replication tasks
mScope.IncCounter(metrics.ReplicationTasksApplied)
// emit single task processing latency
mScope.RecordTimer(metrics.TaskProcessingLatency, now.Sub(startTime))
// emit latency from task generated to task received
mScope.RecordTimer(
metrics.ReplicationTaskLatency,
now.Sub(time.Unix(0, replicationTask.GetCreationTime())),
)
}

return err
Expand Down
2 changes: 2 additions & 0 deletions service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -1245,6 +1245,8 @@ func (s *contextImpl) allocateTransferIDsLocked(
}
s.logger.Debug(fmt.Sprintf("Assigning task ID: %v", id))
task.SetTaskID(id)
// TODO: set the task visibility time
//task.SetVisibilityTimestamp(s.GetTimeSource().Now())
*transferMaxReadLevel = id
}
return nil
Expand Down

0 comments on commit 19e357e

Please sign in to comment.