Skip to content

Commit

Permalink
Merge branch 'master' into histograms
Browse files Browse the repository at this point in the history
  • Loading branch information
mantas-sidlauskas authored Jul 30, 2020
2 parents 1bd600b + 78031ab commit fe921e5
Show file tree
Hide file tree
Showing 53 changed files with 1,556 additions and 572 deletions.
6 changes: 6 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ const (
PersistenceDeleteCurrentWorkflowExecutionScope
// PersistenceGetCurrentExecutionScope tracks GetCurrentExecution calls made by service to persistence layer
PersistenceGetCurrentExecutionScope
// PersistenceIsWorkflowExecutionExistsScope tracks IsWorkflowExecutionExists calls made by service to persistence layer
PersistenceIsWorkflowExecutionExistsScope
// PersistenceListCurrentExecutionsScope tracks ListCurrentExecutions calls made by service to persistence layer
PersistenceListCurrentExecutionsScope
// PersistenceListConcreteExecutionsScope tracks ListConcreteExecutions calls made by service to persistence layer
PersistenceListConcreteExecutionsScope
// PersistenceGetTransferTasksScope tracks GetTransferTasks calls made by service to persistence layer
Expand Down Expand Up @@ -1069,6 +1073,8 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
PersistenceDeleteWorkflowExecutionScope: {operation: "DeleteWorkflowExecution"},
PersistenceDeleteCurrentWorkflowExecutionScope: {operation: "DeleteCurrentWorkflowExecution"},
PersistenceGetCurrentExecutionScope: {operation: "GetCurrentExecution"},
PersistenceIsWorkflowExecutionExistsScope: {operation: "IsWorkflowExecutionExists"},
PersistenceListCurrentExecutionsScope: {operation: "ListCurrentExecutions"},
PersistenceListConcreteExecutionsScope: {operation: "ListConcreteExecutions"},
PersistenceGetTransferTasksScope: {operation: "GetTransferTasks"},
PersistenceCompleteTransferTaskScope: {operation: "CompleteTransferTask"},
Expand Down
46 changes: 46 additions & 0 deletions common/mocks/ExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,52 @@ func (_m *ExecutionManager) GetCurrentExecution(request *persistence.GetCurrentE
return r0, r1
}

// ListCurrentExecutions provides a mock function with given fields: request
func (_m *ExecutionManager) ListCurrentExecutions(request *persistence.ListCurrentExecutionsRequest) (*persistence.ListCurrentExecutionsResponse, error) {
ret := _m.Called(request)

var r0 *persistence.ListCurrentExecutionsResponse
if rf, ok := ret.Get(0).(func(*persistence.ListCurrentExecutionsRequest) *persistence.ListCurrentExecutionsResponse); ok {
r0 = rf(request)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*persistence.ListCurrentExecutionsResponse)
}
}

var r1 error
if rf, ok := ret.Get(1).(func(*persistence.ListCurrentExecutionsRequest) error); ok {
r1 = rf(request)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// IsWorkflowExecutionExists provides a mock function with given fields: request
func (_m *ExecutionManager) IsWorkflowExecutionExists(request *persistence.IsWorkflowExecutionExistsRequest) (*persistence.IsWorkflowExecutionExistsResponse, error) {
ret := _m.Called(request)

var r0 *persistence.IsWorkflowExecutionExistsResponse
if rf, ok := ret.Get(0).(func(*persistence.IsWorkflowExecutionExistsRequest) *persistence.IsWorkflowExecutionExistsResponse); ok {
r0 = rf(request)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*persistence.IsWorkflowExecutionExistsResponse)
}
}

var r1 error
if rf, ok := ret.Get(1).(func(*persistence.IsWorkflowExecutionExistsRequest) error); ok {
r1 = rf(request)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// ListConcreteExecutions provides a mock function with given fields: request
func (_m *ExecutionManager) ListConcreteExecutions(request *persistence.ListConcreteExecutionsRequest) (*persistence.ListConcreteExecutionsResponse, error) {
ret := _m.Called(request)
Expand Down
87 changes: 87 additions & 0 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,21 @@ workflow_state = ? ` +
`and visibility_ts = ? ` +
`and task_id = ?`

templateListCurrentExecutionsQuery = `SELECT domain_id, workflow_id, run_id, current_run_id, workflow_state ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ?`

templateIsWorkflowExecutionExistsQuery = `SELECT shard_id, type, domain_id, workflow_id, run_id, visibility_ts, task_id ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
`and workflow_id = ? ` +
`and run_id = ? ` +
`and visibility_ts = ? ` +
`and task_id = ?`

templateListWorkflowExecutionQuery = `SELECT run_id, execution, version_histories, version_histories_encoding ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
Expand Down Expand Up @@ -2053,6 +2068,78 @@ func (d *cassandraPersistence) GetCurrentExecution(request *p.GetCurrentExecutio
}, nil
}

func (d *cassandraPersistence) ListCurrentExecutions(
request *p.ListCurrentExecutionsRequest,
) (*p.ListCurrentExecutionsResponse, error) {
query := d.session.Query(
templateListCurrentExecutionsQuery,
d.shardID,
rowTypeExecution,
).PageSize(request.PageSize).PageState(request.PageToken)

iter := query.Iter()
if iter == nil {
return nil, &workflow.InternalServiceError{
Message: "ListCurrentExecutions operation failed. Not able to create query iterator.",
}
}
response := &p.ListCurrentExecutionsResponse{}
result := make(map[string]interface{})
for iter.MapScan(result) {
runID := result["run_id"].(gocql.UUID).String()
if runID != permanentRunID {
result = make(map[string]interface{})
continue
}
response.Executions = append(response.Executions, &p.CurrentWorkflowExecution{
DomainID: result["domain_id"].(gocql.UUID).String(),
WorkflowID: result["workflow_id"].(string),
RunID: result["run_id"].(gocql.UUID).String(),
State: result["workflow_state"].(int),
CurrentRunID: result["current_run_id"].(gocql.UUID).String(),
})
result = make(map[string]interface{})
}
nextPageToken := iter.PageState()
response.PageToken = make([]byte, len(nextPageToken))
copy(response.PageToken, nextPageToken)

if err := iter.Close(); err != nil {
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("ListCurrentExecutions operation failed. Error: %v", err),
}
}
return response, nil
}

func (d *cassandraPersistence) IsWorkflowExecutionExists(request *p.IsWorkflowExecutionExistsRequest) (*p.IsWorkflowExecutionExistsResponse,
error) {
query := d.session.Query(templateIsWorkflowExecutionExistsQuery,
d.shardID,
rowTypeExecution,
request.DomainID,
request.WorkflowID,
request.RunID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID)

result := make(map[string]interface{})
if err := query.MapScan(result); err != nil {
if err == gocql.ErrNotFound {
return &p.IsWorkflowExecutionExistsResponse{Exists: false}, nil
} else if isThrottlingError(err) {
return nil, &workflow.ServiceBusyError{
Message: fmt.Sprintf("IsWorkflowExecutionExists operation failed. Error: %v", err),
}
}

return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("IsWorkflowExecutionExists operation failed. Error: %v", err),
}
}
return &p.IsWorkflowExecutionExistsResponse{Exists: true}, nil
}

func (d *cassandraPersistence) ListConcreteExecutions(
request *p.ListConcreteExecutionsRequest,
) (*p.InternalListConcreteExecutionsResponse, error) {
Expand Down
35 changes: 35 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,15 @@ type (
LastReplicationInfo map[string]*ReplicationInfo
}

// CurrentWorkflowExecution describes a current execution record
CurrentWorkflowExecution struct {
DomainID string
WorkflowID string
RunID string
State int
CurrentRunID string
}

// TransferTaskInfo describes a transfer task
TransferTaskInfo struct {
DomainID string
Expand Down Expand Up @@ -795,6 +804,25 @@ type (
WorkflowID string
}

// ListCurrentExecutionsRequest is request to ListCurrentExecutions
ListCurrentExecutionsRequest struct {
PageSize int
PageToken []byte
}

// ListCurrentExecutionsResponse is the response to ListCurrentExecutionsRequest
ListCurrentExecutionsResponse struct {
Executions []*CurrentWorkflowExecution
PageToken []byte
}

// IsWorkflowExecutionExistsRequest is used to check if the concrete execution exists
IsWorkflowExecutionExistsRequest struct {
DomainID string
WorkflowID string
RunID string
}

// ListConcreteExecutionsRequest is request to ListConcreteExecutions
ListConcreteExecutionsRequest struct {
PageSize int
Expand Down Expand Up @@ -822,6 +850,11 @@ type (
LastWriteVersion int64
}

// IsWorkflowExecutionExistsResponse is the response to IsWorkflowExecutionExists
IsWorkflowExecutionExistsResponse struct {
Exists bool
}

// UpdateWorkflowExecutionRequest is used to update a workflow execution
UpdateWorkflowExecutionRequest struct {
RangeID int64
Expand Down Expand Up @@ -1521,6 +1554,7 @@ type (
DeleteWorkflowExecution(request *DeleteWorkflowExecutionRequest) error
DeleteCurrentWorkflowExecution(request *DeleteCurrentWorkflowExecutionRequest) error
GetCurrentExecution(request *GetCurrentExecutionRequest) (*GetCurrentExecutionResponse, error)
IsWorkflowExecutionExists(request *IsWorkflowExecutionExistsRequest) (*IsWorkflowExecutionExistsResponse, error)

// Transfer task related methods
GetTransferTasks(request *GetTransferTasksRequest) (*GetTransferTasksResponse, error)
Expand All @@ -1545,6 +1579,7 @@ type (

// Scan operations
ListConcreteExecutions(request *ListConcreteExecutionsRequest) (*ListConcreteExecutionsResponse, error)
ListCurrentExecutions(request *ListCurrentExecutionsRequest) (*ListCurrentExecutionsResponse, error)
}

// ExecutionManagerFactory creates an instance of ExecutionManager for a given shard
Expand Down
12 changes: 12 additions & 0 deletions common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,18 @@ func (m *executionManagerImpl) GetCurrentExecution(
return m.persistence.GetCurrentExecution(request)
}

func (m *executionManagerImpl) ListCurrentExecutions(
request *ListCurrentExecutionsRequest,
) (*ListCurrentExecutionsResponse, error) {
return m.persistence.ListCurrentExecutions(request)
}

func (m *executionManagerImpl) IsWorkflowExecutionExists(
request *IsWorkflowExecutionExistsRequest,
) (*IsWorkflowExecutionExistsResponse, error) {
return m.persistence.IsWorkflowExecutionExists(request)
}

func (m *executionManagerImpl) ListConcreteExecutions(
request *ListConcreteExecutionsRequest,
) (*ListConcreteExecutionsResponse, error) {
Expand Down
2 changes: 2 additions & 0 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type (
DeleteWorkflowExecution(request *DeleteWorkflowExecutionRequest) error
DeleteCurrentWorkflowExecution(request *DeleteCurrentWorkflowExecutionRequest) error
GetCurrentExecution(request *GetCurrentExecutionRequest) (*GetCurrentExecutionResponse, error)
IsWorkflowExecutionExists(request *IsWorkflowExecutionExistsRequest) (*IsWorkflowExecutionExistsResponse, error)

// Transfer task related methods
GetTransferTasks(request *GetTransferTasksRequest) (*GetTransferTasksResponse, error)
Expand All @@ -93,6 +94,7 @@ type (

// Scan related methods
ListConcreteExecutions(request *ListConcreteExecutionsRequest) (*InternalListConcreteExecutionsResponse, error)
ListCurrentExecutions(request *ListCurrentExecutionsRequest) (*ListCurrentExecutionsResponse, error)
}

// HistoryStore is to manager workflow history events
Expand Down
28 changes: 28 additions & 0 deletions common/persistence/persistenceMetricClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,34 @@ func (p *workflowExecutionPersistenceClient) GetCurrentExecution(request *GetCur
return response, err
}

func (p *workflowExecutionPersistenceClient) ListCurrentExecutions(request *ListCurrentExecutionsRequest) (*ListCurrentExecutionsResponse, error) {
p.metricClient.IncCounter(metrics.PersistenceListCurrentExecutionsScope, metrics.PersistenceRequests)

sw := p.metricClient.StartTimer(metrics.PersistenceListCurrentExecutionsScope, metrics.PersistenceLatency)
response, err := p.persistence.ListCurrentExecutions(request)
sw.Stop()

if err != nil {
p.updateErrorMetric(metrics.PersistenceListCurrentExecutionsScope, err)
}

return response, err
}

func (p *workflowExecutionPersistenceClient) IsWorkflowExecutionExists(request *IsWorkflowExecutionExistsRequest) (*IsWorkflowExecutionExistsResponse, error) {
p.metricClient.IncCounter(metrics.PersistenceIsWorkflowExecutionExistsScope, metrics.PersistenceRequests)

sw := p.metricClient.StartTimer(metrics.PersistenceIsWorkflowExecutionExistsScope, metrics.PersistenceLatency)
response, err := p.persistence.IsWorkflowExecutionExists(request)
sw.Stop()

if err != nil {
p.updateErrorMetric(metrics.PersistenceIsWorkflowExecutionExistsScope, err)
}

return response, err
}

func (p *workflowExecutionPersistenceClient) ListConcreteExecutions(request *ListConcreteExecutionsRequest) (*ListConcreteExecutionsResponse, error) {
p.metricClient.IncCounter(metrics.PersistenceListConcreteExecutionsScope, metrics.PersistenceRequests)

Expand Down
18 changes: 18 additions & 0 deletions common/persistence/persistenceRateLimitedClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,24 @@ func (p *workflowExecutionRateLimitedPersistenceClient) GetCurrentExecution(requ
return response, err
}

func (p *workflowExecutionRateLimitedPersistenceClient) ListCurrentExecutions(request *ListCurrentExecutionsRequest) (*ListCurrentExecutionsResponse, error) {
if ok := p.rateLimiter.Allow(); !ok {
return nil, ErrPersistenceLimitExceeded
}

response, err := p.persistence.ListCurrentExecutions(request)
return response, err
}

func (p *workflowExecutionRateLimitedPersistenceClient) IsWorkflowExecutionExists(request *IsWorkflowExecutionExistsRequest) (*IsWorkflowExecutionExistsResponse, error) {
if ok := p.rateLimiter.Allow(); !ok {
return nil, ErrPersistenceLimitExceeded
}

response, err := p.persistence.IsWorkflowExecutionExists(request)
return response, err
}

func (p *workflowExecutionRateLimitedPersistenceClient) ListConcreteExecutions(request *ListConcreteExecutionsRequest) (*ListConcreteExecutionsResponse, error) {
if ok := p.rateLimiter.Allow(); !ok {
return nil, ErrPersistenceLimitExceeded
Expand Down
12 changes: 12 additions & 0 deletions common/persistence/sql/sqlExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,18 @@ func (m *sqlExecutionManager) GetCurrentExecution(
}, nil
}

func (m *sqlExecutionManager) ListCurrentExecutions(
_ *p.ListCurrentExecutionsRequest,
) (*p.ListCurrentExecutionsResponse, error) {
return nil, &workflow.InternalServiceError{Message: "Not yet implemented"}
}

func (m *sqlExecutionManager) IsWorkflowExecutionExists(
request *p.IsWorkflowExecutionExistsRequest,
) (*p.IsWorkflowExecutionExistsResponse, error) {
panic("not implemented yet")
}

func (m *sqlExecutionManager) ListConcreteExecutions(
_ *p.ListConcreteExecutionsRequest,
) (*p.InternalListConcreteExecutionsResponse, error) {
Expand Down
2 changes: 2 additions & 0 deletions common/reconciliation/common/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ type (
// PersistenceRetryer is used to retry requests to persistence
PersistenceRetryer interface {
ListConcreteExecutions(*persistence.ListConcreteExecutionsRequest) (*persistence.ListConcreteExecutionsResponse, error)
ListCurrentExecutions(request *persistence.ListCurrentExecutionsRequest) (*persistence.ListCurrentExecutionsResponse, error)
GetWorkflowExecution(*persistence.GetWorkflowExecutionRequest) (*persistence.GetWorkflowExecutionResponse, error)
GetCurrentExecution(*persistence.GetCurrentExecutionRequest) (*persistence.GetCurrentExecutionResponse, error)
IsWorkflowExecutionExists(request *persistence.IsWorkflowExecutionExistsRequest) (*persistence.IsWorkflowExecutionExistsResponse, error)
ReadHistoryBranch(*persistence.ReadHistoryBranchRequest) (*persistence.ReadHistoryBranchResponse, error)
DeleteWorkflowExecution(*persistence.DeleteWorkflowExecutionRequest) error
DeleteCurrentWorkflowExecution(request *persistence.DeleteCurrentWorkflowExecutionRequest) error
Expand Down
Loading

0 comments on commit fe921e5

Please sign in to comment.