Skip to content

Commit

Permalink
Revert datablob change from manager/store for Visibility (cadence-wor…
Browse files Browse the repository at this point in the history
…kflow#3716)

Co-authored-by: Andrew Dawson <[email protected]>
  • Loading branch information
2 people authored and yux0 committed May 4, 2021
1 parent 5de7909 commit 1f6ce44
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 192 deletions.
10 changes: 1 addition & 9 deletions common/elasticsearch/client_v6.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,21 +545,13 @@ func (c *elasticV6) convertSearchResultToVisibilityRecord(hit *elastic.SearchHit
return nil
}

memo, err := c.serializer.DeserializeVisibilityMemo(p.NewDataBlob(source.Memo, common.EncodingType(source.Encoding)))
if err != nil {
c.logger.Error("failed to deserialize memo",
tag.WorkflowID(source.WorkflowID),
tag.WorkflowRunID(source.RunID),
tag.Error(err))
}

record := &p.InternalVisibilityWorkflowExecutionInfo{
WorkflowID: source.WorkflowID,
RunID: source.RunID,
TypeName: source.WorkflowType,
StartTime: time.Unix(0, source.StartTime),
ExecutionTime: time.Unix(0, source.ExecutionTime),
Memo: thrift.ToMemo(memo),
Memo: p.NewDataBlob(source.Memo, common.EncodingType(source.Encoding)),
TaskList: source.TaskList,
SearchAttributes: source.Attr,
}
Expand Down
114 changes: 39 additions & 75 deletions common/persistence/cassandra/cassandraVisibilityPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@ import (
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
p "github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra"
"github.com/uber/cadence/common/service/config"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/common/types/mapper/thrift"
)

Expand Down Expand Up @@ -186,7 +184,6 @@ type (
sortByCloseTime bool
cassandraStore
lowConslevel gocql.Consistency
serializer p.PayloadSerializer
}
)

Expand All @@ -211,7 +208,6 @@ func newVisibilityPersistence(
sortByCloseTime: listClosedOrderingByCloseTime,
cassandraStore: cassandraStore{session: session, logger: logger},
lowConslevel: gocql.One,
serializer: p.NewPayloadSerializer(),
}, nil
}

Expand All @@ -229,7 +225,6 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionStarted(
ttl := request.WorkflowTimeout + openExecutionTTLBuffer
var query *gocql.Query

memo := v.serializeMemo(request.Memo, request.DomainUUID, request.WorkflowID, request.RunID)
if ttl > maxCassandraTTL {
query = v.session.Query(templateCreateWorkflowExecutionStarted,
request.DomainUUID,
Expand All @@ -239,8 +234,8 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionStarted(
p.UnixNanoToDBTimestamp(request.StartTimestamp),
p.UnixNanoToDBTimestamp(request.ExecutionTimestamp),
request.WorkflowTypeName,
memo.Data,
string(memo.GetEncoding()),
request.Memo.Data,
string(request.Memo.GetEncoding()),
request.TaskList,
)
} else {
Expand All @@ -252,8 +247,8 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionStarted(
p.UnixNanoToDBTimestamp(request.StartTimestamp),
p.UnixNanoToDBTimestamp(request.ExecutionTimestamp),
request.WorkflowTypeName,
memo.Data,
string(memo.GetEncoding()),
request.Memo.Data,
string(request.Memo.GetEncoding()),
request.TaskList,
ttl,
)
Expand Down Expand Up @@ -289,7 +284,6 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosed(
retention = defaultCloseTTLSeconds
}

memo := v.serializeMemo(request.Memo, request.DomainUUID, request.WorkflowID, request.RunID)
if retention > maxCassandraTTL {
batch.Query(templateCreateWorkflowExecutionClosed,
request.DomainUUID,
Expand All @@ -302,8 +296,8 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosed(
request.WorkflowTypeName,
*thrift.FromWorkflowExecutionCloseStatus(&request.Status),
request.HistoryLength,
memo.Data,
string(memo.GetEncoding()),
request.Memo.Data,
string(request.Memo.GetEncoding()),
request.TaskList,
)
// duplicate write to v2 to order by close time
Expand All @@ -318,8 +312,8 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosed(
request.WorkflowTypeName,
*thrift.FromWorkflowExecutionCloseStatus(&request.Status),
request.HistoryLength,
memo.Data,
string(memo.GetEncoding()),
request.Memo.Data,
string(request.Memo.GetEncoding()),
request.TaskList,
)
} else {
Expand All @@ -334,8 +328,8 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosed(
request.WorkflowTypeName,
*thrift.FromWorkflowExecutionCloseStatus(&request.Status),
request.HistoryLength,
memo.Data,
string(memo.GetEncoding()),
request.Memo.Data,
string(request.Memo.GetEncoding()),
request.TaskList,
retention,
)
Expand All @@ -351,8 +345,8 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosed(
request.WorkflowTypeName,
*thrift.FromWorkflowExecutionCloseStatus(&request.Status),
request.HistoryLength,
memo.Data,
string(memo.GetEncoding()),
request.Memo.Data,
string(request.Memo.GetEncoding()),
request.TaskList,
retention,
)
Expand Down Expand Up @@ -405,10 +399,10 @@ func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutions(

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0)
wfexecution, has := readOpenWorkflowExecutionRecord(iter, v.serializer, v.logger)
wfexecution, has := readOpenWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
wfexecution, has = readOpenWorkflowExecutionRecord(iter, v.serializer, v.logger)
wfexecution, has = readOpenWorkflowExecutionRecord(iter)
}

nextPageToken := iter.PageState()
Expand Down Expand Up @@ -443,10 +437,10 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutions(

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0)
wfexecution, has := readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger)
wfexecution, has := readClosedWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
wfexecution, has = readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger)
wfexecution, has = readClosedWorkflowExecutionRecord(iter)
}

nextPageToken := iter.PageState()
Expand Down Expand Up @@ -479,10 +473,10 @@ func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutionsByType(

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0)
wfexecution, has := readOpenWorkflowExecutionRecord(iter, v.serializer, v.logger)
wfexecution, has := readOpenWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
wfexecution, has = readOpenWorkflowExecutionRecord(iter, v.serializer, v.logger)
wfexecution, has = readOpenWorkflowExecutionRecord(iter)
}

nextPageToken := iter.PageState()
Expand Down Expand Up @@ -518,10 +512,10 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutionsByType(

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0)
wfexecution, has := readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger)
wfexecution, has := readClosedWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
wfexecution, has = readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger)
wfexecution, has = readClosedWorkflowExecutionRecord(iter)
}

nextPageToken := iter.PageState()
Expand Down Expand Up @@ -554,10 +548,10 @@ func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutionsByWorkflowID(

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0)
wfexecution, has := readOpenWorkflowExecutionRecord(iter, v.serializer, v.logger)
wfexecution, has := readOpenWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
wfexecution, has = readOpenWorkflowExecutionRecord(iter, v.serializer, v.logger)
wfexecution, has = readOpenWorkflowExecutionRecord(iter)
}

nextPageToken := iter.PageState()
Expand Down Expand Up @@ -593,10 +587,10 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutionsByWorkflowI

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0)
wfexecution, has := readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger)
wfexecution, has := readClosedWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
wfexecution, has = readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger)
wfexecution, has = readClosedWorkflowExecutionRecord(iter)
}

nextPageToken := iter.PageState()
Expand Down Expand Up @@ -632,10 +626,10 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutionsByStatus(

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0)
wfexecution, has := readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger)
wfexecution, has := readClosedWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
wfexecution, has = readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger)
wfexecution, has = readClosedWorkflowExecutionRecord(iter)
}

nextPageToken := iter.PageState()
Expand Down Expand Up @@ -666,7 +660,7 @@ func (v *cassandraVisibilityPersistence) GetClosedWorkflowExecution(
}
}

wfexecution, has := readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger)
wfexecution, has := readClosedWorkflowExecutionRecord(iter)
if !has {
return nil, &workflow.EntityNotExistsError{
Message: fmt.Sprintf("Workflow execution not found. WorkflowId: %v, RunId: %v",
Expand Down Expand Up @@ -730,10 +724,10 @@ func (v *cassandraVisibilityPersistence) listClosedWorkflowExecutionsOrderByClos

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0)
wfexecution, has := readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger)
wfexecution, has := readClosedWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
wfexecution, has = readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger)
wfexecution, has = readClosedWorkflowExecutionRecord(iter)
}

nextPageToken := iter.PageState()
Expand Down Expand Up @@ -766,10 +760,10 @@ func (v *cassandraVisibilityPersistence) listClosedWorkflowExecutionsByTypeOrder

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0)
wfexecution, has := readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger)
wfexecution, has := readClosedWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
wfexecution, has = readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger)
wfexecution, has = readClosedWorkflowExecutionRecord(iter)
}

nextPageToken := iter.PageState()
Expand Down Expand Up @@ -802,10 +796,10 @@ func (v *cassandraVisibilityPersistence) listClosedWorkflowExecutionsByWorkflowI

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0)
wfexecution, has := readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger)
wfexecution, has := readClosedWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
wfexecution, has = readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger)
wfexecution, has = readClosedWorkflowExecutionRecord(iter)
}

nextPageToken := iter.PageState()
Expand Down Expand Up @@ -838,10 +832,10 @@ func (v *cassandraVisibilityPersistence) listClosedWorkflowExecutionsByStatusOrd

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0)
wfexecution, has := readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger)
wfexecution, has := readClosedWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
wfexecution, has = readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger)
wfexecution, has = readClosedWorkflowExecutionRecord(iter)
}

nextPageToken := iter.PageState()
Expand All @@ -854,23 +848,7 @@ func (v *cassandraVisibilityPersistence) listClosedWorkflowExecutionsByStatusOrd
return response, nil
}

func (v *cassandraVisibilityPersistence) serializeMemo(visibilityMemo *types.Memo, domainID, wID, rID string) *p.DataBlob {
memo, err := v.serializer.SerializeVisibilityMemo(thrift.FromMemo(visibilityMemo), common.EncodingTypeThriftRW)
if err != nil {
v.logger.WithTags(
tag.WorkflowDomainID(domainID),
tag.WorkflowID(wID),
tag.WorkflowRunID(rID),
tag.Error(err)).
Error("Unable to encode visibility memo")
}
if memo == nil {
return &p.DataBlob{}
}
return memo
}

func readOpenWorkflowExecutionRecord(iter *gocql.Iter, serializer p.PayloadSerializer, logger log.Logger) (*p.InternalVisibilityWorkflowExecutionInfo, bool) {
func readOpenWorkflowExecutionRecord(iter *gocql.Iter) (*p.InternalVisibilityWorkflowExecutionInfo, bool) {
var workflowID string
var runID gocql.UUID
var typeName string
Expand All @@ -880,28 +858,21 @@ func readOpenWorkflowExecutionRecord(iter *gocql.Iter, serializer p.PayloadSeria
var encoding string
var taskList string
if iter.Scan(&workflowID, &runID, &startTime, &executionTime, &typeName, &memo, &encoding, &taskList) {
memo, err := serializer.DeserializeVisibilityMemo(p.NewDataBlob(memo, common.EncodingType(encoding)))
if err != nil {
logger.Error("failed to deserialize memo",
tag.WorkflowID(workflowID),
tag.WorkflowRunID(runID.String()),
tag.Error(err))
}
record := &p.InternalVisibilityWorkflowExecutionInfo{
WorkflowID: workflowID,
RunID: runID.String(),
TypeName: typeName,
StartTime: startTime,
ExecutionTime: executionTime,
Memo: thrift.ToMemo(memo),
Memo: p.NewDataBlob(memo, common.EncodingType(encoding)),
TaskList: taskList,
}
return record, true
}
return nil, false
}

func readClosedWorkflowExecutionRecord(iter *gocql.Iter, serializer p.PayloadSerializer, logger log.Logger) (*p.InternalVisibilityWorkflowExecutionInfo, bool) {
func readClosedWorkflowExecutionRecord(iter *gocql.Iter) (*p.InternalVisibilityWorkflowExecutionInfo, bool) {
var workflowID string
var runID gocql.UUID
var typeName string
Expand All @@ -914,13 +885,6 @@ func readClosedWorkflowExecutionRecord(iter *gocql.Iter, serializer p.PayloadSer
var encoding string
var taskList string
if iter.Scan(&workflowID, &runID, &startTime, &executionTime, &closeTime, &typeName, &status, &historyLength, &memo, &encoding, &taskList) {
memo, err := serializer.DeserializeVisibilityMemo(p.NewDataBlob(memo, common.EncodingType(encoding)))
if err != nil {
logger.Error("failed to deserialize memo",
tag.WorkflowID(workflowID),
tag.WorkflowRunID(runID.String()),
tag.Error(err))
}
record := &p.InternalVisibilityWorkflowExecutionInfo{
WorkflowID: workflowID,
RunID: runID.String(),
Expand All @@ -930,7 +894,7 @@ func readClosedWorkflowExecutionRecord(iter *gocql.Iter, serializer p.PayloadSer
CloseTime: closeTime,
Status: thrift.ToWorkflowExecutionCloseStatus(&status),
HistoryLength: historyLength,
Memo: thrift.ToMemo(memo),
Memo: p.NewDataBlob(memo, common.EncodingType(encoding)),
TaskList: taskList,
}
return record, true
Expand Down
Loading

0 comments on commit 1f6ce44

Please sign in to comment.