diff --git a/common/elasticsearch/client_v6.go b/common/elasticsearch/client_v6.go index b28025245be..b18e854217f 100644 --- a/common/elasticsearch/client_v6.go +++ b/common/elasticsearch/client_v6.go @@ -545,21 +545,13 @@ func (c *elasticV6) convertSearchResultToVisibilityRecord(hit *elastic.SearchHit return nil } - memo, err := c.serializer.DeserializeVisibilityMemo(p.NewDataBlob(source.Memo, common.EncodingType(source.Encoding))) - if err != nil { - c.logger.Error("failed to deserialize memo", - tag.WorkflowID(source.WorkflowID), - tag.WorkflowRunID(source.RunID), - tag.Error(err)) - } - record := &p.InternalVisibilityWorkflowExecutionInfo{ WorkflowID: source.WorkflowID, RunID: source.RunID, TypeName: source.WorkflowType, StartTime: time.Unix(0, source.StartTime), ExecutionTime: time.Unix(0, source.ExecutionTime), - Memo: thrift.ToMemo(memo), + Memo: p.NewDataBlob(source.Memo, common.EncodingType(source.Encoding)), TaskList: source.TaskList, SearchAttributes: source.Attr, } diff --git a/common/persistence/cassandra/cassandraVisibilityPersistence.go b/common/persistence/cassandra/cassandraVisibilityPersistence.go index bd40e930597..7bc1a10c94d 100644 --- a/common/persistence/cassandra/cassandraVisibilityPersistence.go +++ b/common/persistence/cassandra/cassandraVisibilityPersistence.go @@ -30,11 +30,9 @@ import ( workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" "github.com/uber/cadence/common/log" - "github.com/uber/cadence/common/log/tag" p "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra" "github.com/uber/cadence/common/service/config" - "github.com/uber/cadence/common/types" "github.com/uber/cadence/common/types/mapper/thrift" ) @@ -186,7 +184,6 @@ type ( sortByCloseTime bool cassandraStore lowConslevel gocql.Consistency - serializer p.PayloadSerializer } ) @@ -211,7 +208,6 @@ func newVisibilityPersistence( sortByCloseTime: listClosedOrderingByCloseTime, cassandraStore: cassandraStore{session: session, logger: logger}, lowConslevel: gocql.One, - serializer: p.NewPayloadSerializer(), }, nil } @@ -229,7 +225,6 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionStarted( ttl := request.WorkflowTimeout + openExecutionTTLBuffer var query *gocql.Query - memo := v.serializeMemo(request.Memo, request.DomainUUID, request.WorkflowID, request.RunID) if ttl > maxCassandraTTL { query = v.session.Query(templateCreateWorkflowExecutionStarted, request.DomainUUID, @@ -239,8 +234,8 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionStarted( p.UnixNanoToDBTimestamp(request.StartTimestamp), p.UnixNanoToDBTimestamp(request.ExecutionTimestamp), request.WorkflowTypeName, - memo.Data, - string(memo.GetEncoding()), + request.Memo.Data, + string(request.Memo.GetEncoding()), request.TaskList, ) } else { @@ -252,8 +247,8 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionStarted( p.UnixNanoToDBTimestamp(request.StartTimestamp), p.UnixNanoToDBTimestamp(request.ExecutionTimestamp), request.WorkflowTypeName, - memo.Data, - string(memo.GetEncoding()), + request.Memo.Data, + string(request.Memo.GetEncoding()), request.TaskList, ttl, ) @@ -289,7 +284,6 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosed( retention = defaultCloseTTLSeconds } - memo := v.serializeMemo(request.Memo, request.DomainUUID, request.WorkflowID, request.RunID) if retention > maxCassandraTTL { batch.Query(templateCreateWorkflowExecutionClosed, request.DomainUUID, @@ -302,8 +296,8 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosed( request.WorkflowTypeName, *thrift.FromWorkflowExecutionCloseStatus(&request.Status), request.HistoryLength, - memo.Data, - string(memo.GetEncoding()), + request.Memo.Data, + string(request.Memo.GetEncoding()), request.TaskList, ) // duplicate write to v2 to order by close time @@ -318,8 +312,8 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosed( request.WorkflowTypeName, *thrift.FromWorkflowExecutionCloseStatus(&request.Status), request.HistoryLength, - memo.Data, - string(memo.GetEncoding()), + request.Memo.Data, + string(request.Memo.GetEncoding()), request.TaskList, ) } else { @@ -334,8 +328,8 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosed( request.WorkflowTypeName, *thrift.FromWorkflowExecutionCloseStatus(&request.Status), request.HistoryLength, - memo.Data, - string(memo.GetEncoding()), + request.Memo.Data, + string(request.Memo.GetEncoding()), request.TaskList, retention, ) @@ -351,8 +345,8 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosed( request.WorkflowTypeName, *thrift.FromWorkflowExecutionCloseStatus(&request.Status), request.HistoryLength, - memo.Data, - string(memo.GetEncoding()), + request.Memo.Data, + string(request.Memo.GetEncoding()), request.TaskList, retention, ) @@ -405,10 +399,10 @@ func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutions( response := &p.InternalListWorkflowExecutionsResponse{} response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0) - wfexecution, has := readOpenWorkflowExecutionRecord(iter, v.serializer, v.logger) + wfexecution, has := readOpenWorkflowExecutionRecord(iter) for has { response.Executions = append(response.Executions, wfexecution) - wfexecution, has = readOpenWorkflowExecutionRecord(iter, v.serializer, v.logger) + wfexecution, has = readOpenWorkflowExecutionRecord(iter) } nextPageToken := iter.PageState() @@ -443,10 +437,10 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutions( response := &p.InternalListWorkflowExecutionsResponse{} response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0) - wfexecution, has := readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger) + wfexecution, has := readClosedWorkflowExecutionRecord(iter) for has { response.Executions = append(response.Executions, wfexecution) - wfexecution, has = readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger) + wfexecution, has = readClosedWorkflowExecutionRecord(iter) } nextPageToken := iter.PageState() @@ -479,10 +473,10 @@ func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutionsByType( response := &p.InternalListWorkflowExecutionsResponse{} response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0) - wfexecution, has := readOpenWorkflowExecutionRecord(iter, v.serializer, v.logger) + wfexecution, has := readOpenWorkflowExecutionRecord(iter) for has { response.Executions = append(response.Executions, wfexecution) - wfexecution, has = readOpenWorkflowExecutionRecord(iter, v.serializer, v.logger) + wfexecution, has = readOpenWorkflowExecutionRecord(iter) } nextPageToken := iter.PageState() @@ -518,10 +512,10 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutionsByType( response := &p.InternalListWorkflowExecutionsResponse{} response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0) - wfexecution, has := readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger) + wfexecution, has := readClosedWorkflowExecutionRecord(iter) for has { response.Executions = append(response.Executions, wfexecution) - wfexecution, has = readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger) + wfexecution, has = readClosedWorkflowExecutionRecord(iter) } nextPageToken := iter.PageState() @@ -554,10 +548,10 @@ func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutionsByWorkflowID( response := &p.InternalListWorkflowExecutionsResponse{} response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0) - wfexecution, has := readOpenWorkflowExecutionRecord(iter, v.serializer, v.logger) + wfexecution, has := readOpenWorkflowExecutionRecord(iter) for has { response.Executions = append(response.Executions, wfexecution) - wfexecution, has = readOpenWorkflowExecutionRecord(iter, v.serializer, v.logger) + wfexecution, has = readOpenWorkflowExecutionRecord(iter) } nextPageToken := iter.PageState() @@ -593,10 +587,10 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutionsByWorkflowI response := &p.InternalListWorkflowExecutionsResponse{} response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0) - wfexecution, has := readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger) + wfexecution, has := readClosedWorkflowExecutionRecord(iter) for has { response.Executions = append(response.Executions, wfexecution) - wfexecution, has = readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger) + wfexecution, has = readClosedWorkflowExecutionRecord(iter) } nextPageToken := iter.PageState() @@ -632,10 +626,10 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutionsByStatus( response := &p.InternalListWorkflowExecutionsResponse{} response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0) - wfexecution, has := readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger) + wfexecution, has := readClosedWorkflowExecutionRecord(iter) for has { response.Executions = append(response.Executions, wfexecution) - wfexecution, has = readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger) + wfexecution, has = readClosedWorkflowExecutionRecord(iter) } nextPageToken := iter.PageState() @@ -666,7 +660,7 @@ func (v *cassandraVisibilityPersistence) GetClosedWorkflowExecution( } } - wfexecution, has := readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger) + wfexecution, has := readClosedWorkflowExecutionRecord(iter) if !has { return nil, &workflow.EntityNotExistsError{ Message: fmt.Sprintf("Workflow execution not found. WorkflowId: %v, RunId: %v", @@ -730,10 +724,10 @@ func (v *cassandraVisibilityPersistence) listClosedWorkflowExecutionsOrderByClos response := &p.InternalListWorkflowExecutionsResponse{} response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0) - wfexecution, has := readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger) + wfexecution, has := readClosedWorkflowExecutionRecord(iter) for has { response.Executions = append(response.Executions, wfexecution) - wfexecution, has = readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger) + wfexecution, has = readClosedWorkflowExecutionRecord(iter) } nextPageToken := iter.PageState() @@ -766,10 +760,10 @@ func (v *cassandraVisibilityPersistence) listClosedWorkflowExecutionsByTypeOrder response := &p.InternalListWorkflowExecutionsResponse{} response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0) - wfexecution, has := readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger) + wfexecution, has := readClosedWorkflowExecutionRecord(iter) for has { response.Executions = append(response.Executions, wfexecution) - wfexecution, has = readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger) + wfexecution, has = readClosedWorkflowExecutionRecord(iter) } nextPageToken := iter.PageState() @@ -802,10 +796,10 @@ func (v *cassandraVisibilityPersistence) listClosedWorkflowExecutionsByWorkflowI response := &p.InternalListWorkflowExecutionsResponse{} response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0) - wfexecution, has := readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger) + wfexecution, has := readClosedWorkflowExecutionRecord(iter) for has { response.Executions = append(response.Executions, wfexecution) - wfexecution, has = readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger) + wfexecution, has = readClosedWorkflowExecutionRecord(iter) } nextPageToken := iter.PageState() @@ -838,10 +832,10 @@ func (v *cassandraVisibilityPersistence) listClosedWorkflowExecutionsByStatusOrd response := &p.InternalListWorkflowExecutionsResponse{} response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0) - wfexecution, has := readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger) + wfexecution, has := readClosedWorkflowExecutionRecord(iter) for has { response.Executions = append(response.Executions, wfexecution) - wfexecution, has = readClosedWorkflowExecutionRecord(iter, v.serializer, v.logger) + wfexecution, has = readClosedWorkflowExecutionRecord(iter) } nextPageToken := iter.PageState() @@ -854,23 +848,7 @@ func (v *cassandraVisibilityPersistence) listClosedWorkflowExecutionsByStatusOrd return response, nil } -func (v *cassandraVisibilityPersistence) serializeMemo(visibilityMemo *types.Memo, domainID, wID, rID string) *p.DataBlob { - memo, err := v.serializer.SerializeVisibilityMemo(thrift.FromMemo(visibilityMemo), common.EncodingTypeThriftRW) - if err != nil { - v.logger.WithTags( - tag.WorkflowDomainID(domainID), - tag.WorkflowID(wID), - tag.WorkflowRunID(rID), - tag.Error(err)). - Error("Unable to encode visibility memo") - } - if memo == nil { - return &p.DataBlob{} - } - return memo -} - -func readOpenWorkflowExecutionRecord(iter *gocql.Iter, serializer p.PayloadSerializer, logger log.Logger) (*p.InternalVisibilityWorkflowExecutionInfo, bool) { +func readOpenWorkflowExecutionRecord(iter *gocql.Iter) (*p.InternalVisibilityWorkflowExecutionInfo, bool) { var workflowID string var runID gocql.UUID var typeName string @@ -880,20 +858,13 @@ func readOpenWorkflowExecutionRecord(iter *gocql.Iter, serializer p.PayloadSeria var encoding string var taskList string if iter.Scan(&workflowID, &runID, &startTime, &executionTime, &typeName, &memo, &encoding, &taskList) { - memo, err := serializer.DeserializeVisibilityMemo(p.NewDataBlob(memo, common.EncodingType(encoding))) - if err != nil { - logger.Error("failed to deserialize memo", - tag.WorkflowID(workflowID), - tag.WorkflowRunID(runID.String()), - tag.Error(err)) - } record := &p.InternalVisibilityWorkflowExecutionInfo{ WorkflowID: workflowID, RunID: runID.String(), TypeName: typeName, StartTime: startTime, ExecutionTime: executionTime, - Memo: thrift.ToMemo(memo), + Memo: p.NewDataBlob(memo, common.EncodingType(encoding)), TaskList: taskList, } return record, true @@ -901,7 +872,7 @@ func readOpenWorkflowExecutionRecord(iter *gocql.Iter, serializer p.PayloadSeria return nil, false } -func readClosedWorkflowExecutionRecord(iter *gocql.Iter, serializer p.PayloadSerializer, logger log.Logger) (*p.InternalVisibilityWorkflowExecutionInfo, bool) { +func readClosedWorkflowExecutionRecord(iter *gocql.Iter) (*p.InternalVisibilityWorkflowExecutionInfo, bool) { var workflowID string var runID gocql.UUID var typeName string @@ -914,13 +885,6 @@ func readClosedWorkflowExecutionRecord(iter *gocql.Iter, serializer p.PayloadSer var encoding string var taskList string if iter.Scan(&workflowID, &runID, &startTime, &executionTime, &closeTime, &typeName, &status, &historyLength, &memo, &encoding, &taskList) { - memo, err := serializer.DeserializeVisibilityMemo(p.NewDataBlob(memo, common.EncodingType(encoding))) - if err != nil { - logger.Error("failed to deserialize memo", - tag.WorkflowID(workflowID), - tag.WorkflowRunID(runID.String()), - tag.Error(err)) - } record := &p.InternalVisibilityWorkflowExecutionInfo{ WorkflowID: workflowID, RunID: runID.String(), @@ -930,7 +894,7 @@ func readClosedWorkflowExecutionRecord(iter *gocql.Iter, serializer p.PayloadSer CloseTime: closeTime, Status: thrift.ToWorkflowExecutionCloseStatus(&status), HistoryLength: historyLength, - Memo: thrift.ToMemo(memo), + Memo: p.NewDataBlob(memo, common.EncodingType(encoding)), TaskList: taskList, } return record, true diff --git a/common/persistence/elasticsearch/decodeBench_test.go b/common/persistence/elasticsearch/decodeBench_test.go index 499632be91d..5d9ba6e0c42 100644 --- a/common/persistence/elasticsearch/decodeBench_test.go +++ b/common/persistence/elasticsearch/decodeBench_test.go @@ -58,18 +58,16 @@ BenchmarkJSONDecodeToMap-8 100000 12878 ns/op //nolint func BenchmarkJSONDecodeToType(b *testing.B) { bytes := (*json.RawMessage)(&data) - serializer := p.NewPayloadSerializer() for i := 0; i < b.N; i++ { var source *es.VisibilityRecord json.Unmarshal(*bytes, &source) - memo, _ := serializer.DeserializeVisibilityMemo(p.NewDataBlob(source.Memo, common.EncodingType(source.Encoding))) record := &p.InternalVisibilityWorkflowExecutionInfo{ WorkflowID: source.WorkflowID, RunID: source.RunID, TypeName: source.WorkflowType, StartTime: time.Unix(0, source.StartTime), ExecutionTime: time.Unix(0, source.ExecutionTime), - Memo: thrift.ToMemo(memo), + Memo: p.NewDataBlob(source.Memo, common.EncodingType(source.Encoding)), TaskList: source.TaskList, } record.CloseTime = time.Unix(0, source.CloseTime) @@ -80,7 +78,6 @@ func BenchmarkJSONDecodeToType(b *testing.B) { //nolint func BenchmarkJSONDecodeToMap(b *testing.B) { - serializer := p.NewPayloadSerializer() for i := 0; i < b.N; i++ { var source map[string]interface{} d := json.NewDecoder(bytes.NewReader(data)) @@ -93,7 +90,6 @@ func BenchmarkJSONDecodeToMap(b *testing.B) { closeStatus, _ := source[definition.CloseStatus].(json.Number).Int64() historyLen, _ := source[definition.HistoryLength].(json.Number).Int64() - memo, _ := serializer.DeserializeVisibilityMemo(p.NewDataBlob([]byte(source[definition.Memo].(string)), common.EncodingType(source[definition.Encoding].(string)))) record := &p.InternalVisibilityWorkflowExecutionInfo{ WorkflowID: source[definition.WorkflowID].(string), RunID: source[definition.RunID].(string), @@ -101,7 +97,7 @@ func BenchmarkJSONDecodeToMap(b *testing.B) { StartTime: time.Unix(0, startTime), ExecutionTime: time.Unix(0, executionTime), TaskList: source[definition.TaskList].(string), - Memo: thrift.ToMemo(memo), + Memo: p.NewDataBlob([]byte(source[definition.Memo].(string)), common.EncodingType(source[definition.Encoding].(string))), } record.CloseTime = time.Unix(0, closeTime) status := (shared.WorkflowExecutionCloseStatus)(int32(closeStatus)) diff --git a/common/persistence/elasticsearch/esVisibilityStore.go b/common/persistence/elasticsearch/esVisibilityStore.go index 24061216fb3..81c0fcfae6d 100644 --- a/common/persistence/elasticsearch/esVisibilityStore.go +++ b/common/persistence/elasticsearch/esVisibilityStore.go @@ -44,7 +44,6 @@ import ( "github.com/uber/cadence/common/messaging" p "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/service/config" - "github.com/uber/cadence/common/types" "github.com/uber/cadence/common/types/mapper/thrift" ) @@ -54,12 +53,11 @@ const ( type ( esVisibilityStore struct { - esClient es.GenericClient - index string - producer messaging.Producer - logger log.Logger - config *config.VisibilityConfig - serializer p.PayloadSerializer + esClient es.GenericClient + index string + producer messaging.Producer + logger log.Logger + config *config.VisibilityConfig } ) @@ -74,12 +72,11 @@ func NewElasticSearchVisibilityStore( logger log.Logger, ) p.VisibilityStore { return &esVisibilityStore{ - esClient: esClient, - index: index, - producer: producer, - logger: logger.WithTags(tag.ComponentESVisibilityManager), - config: config, - serializer: p.NewPayloadSerializer(), + esClient: esClient, + index: index, + producer: producer, + logger: logger.WithTags(tag.ComponentESVisibilityManager), + config: config, } } @@ -94,7 +91,6 @@ func (v *esVisibilityStore) RecordWorkflowExecutionStarted( request *p.InternalRecordWorkflowExecutionStartedRequest, ) error { v.checkProducer() - memo := v.serializeMemo(request.Memo, request.DomainUUID, request.WorkflowID, request.RunID) msg := getVisibilityMessage( request.DomainUUID, request.WorkflowID, @@ -104,8 +100,8 @@ func (v *esVisibilityStore) RecordWorkflowExecutionStarted( request.StartTimestamp, request.ExecutionTimestamp, request.TaskID, - memo.Data, - memo.GetEncoding(), + request.Memo.Data, + request.Memo.GetEncoding(), request.SearchAttributes, ) return v.producer.Publish(ctx, msg) @@ -116,7 +112,6 @@ func (v *esVisibilityStore) RecordWorkflowExecutionClosed( request *p.InternalRecordWorkflowExecutionClosedRequest, ) error { v.checkProducer() - memo := v.serializeMemo(request.Memo, request.DomainUUID, request.WorkflowID, request.RunID) msg := getVisibilityMessageForCloseExecution( request.DomainUUID, request.WorkflowID, @@ -128,9 +123,9 @@ func (v *esVisibilityStore) RecordWorkflowExecutionClosed( *thrift.FromWorkflowExecutionCloseStatus(&request.Status), request.HistoryLength, request.TaskID, - memo.Data, + request.Memo.Data, request.TaskList, - memo.GetEncoding(), + request.Memo.GetEncoding(), request.SearchAttributes, ) return v.producer.Publish(ctx, msg) @@ -141,7 +136,6 @@ func (v *esVisibilityStore) UpsertWorkflowExecution( request *p.InternalUpsertWorkflowExecutionRequest, ) error { v.checkProducer() - memo := v.serializeMemo(request.Memo, request.DomainUUID, request.WorkflowID, request.RunID) msg := getVisibilityMessage( request.DomainUUID, request.WorkflowID, @@ -151,8 +145,8 @@ func (v *esVisibilityStore) UpsertWorkflowExecution( request.StartTimestamp, request.ExecutionTimestamp, request.TaskID, - memo.Data, - memo.GetEncoding(), + request.Memo.Data, + request.Memo.GetEncoding(), request.SearchAttributes, ) return v.producer.Publish(ctx, msg) @@ -854,19 +848,3 @@ func cleanDSL(input string) string { result := re.ReplaceAllString(input, `$2`) return result } - -func (v *esVisibilityStore) serializeMemo(visibilityMemo *types.Memo, domainID, wID, rID string) *p.DataBlob { - memo, err := v.serializer.SerializeVisibilityMemo(thrift.FromMemo(visibilityMemo), common.EncodingTypeThriftRW) - if err != nil { - v.logger.WithTags( - tag.WorkflowDomainID(domainID), - tag.WorkflowID(wID), - tag.WorkflowRunID(rID), - tag.Error(err)). - Error("Unable to encode visibility memo") - } - if memo == nil { - return &p.DataBlob{} - } - return memo -} diff --git a/common/persistence/elasticsearch/esVisibilityStore_test.go b/common/persistence/elasticsearch/esVisibilityStore_test.go index ae67e443b09..89c86caf005 100644 --- a/common/persistence/elasticsearch/esVisibilityStore_test.go +++ b/common/persistence/elasticsearch/esVisibilityStore_test.go @@ -56,7 +56,6 @@ type ESVisibilitySuite struct { visibilityStore *esVisibilityStore mockESClient *esMocks.GenericClient mockProducer *mocks.KafkaProducer - serializer p.PayloadSerializer } var ( @@ -108,7 +107,6 @@ func (s *ESVisibilitySuite) SetupTest() { s.mockProducer = &mocks.KafkaProducer{} mgr := NewElasticSearchVisibilityStore(s.mockESClient, testIndex, s.mockProducer, config, loggerimpl.NewNopLogger()) s.visibilityStore = mgr.(*esVisibilityStore) - s.serializer = p.NewPayloadSerializer() } func (s *ESVisibilitySuite) TearDownTest() { @@ -126,12 +124,8 @@ func (s *ESVisibilitySuite) TestRecordWorkflowExecutionStarted() { request.StartTimestamp = int64(123) request.ExecutionTimestamp = int64(321) request.TaskID = int64(111) - memo := &workflow.Memo{ - Fields: map[string][]byte{"test": []byte("test bytes")}, - } - request.Memo = thrift.ToMemo(memo) - memoBlob, err := s.serializer.SerializeVisibilityMemo(memo, common.EncodingTypeThriftRW) - s.NoError(err) + memoBytes := []byte(`test bytes`) + request.Memo = p.NewDataBlob(memoBytes, common.EncodingTypeThriftRW) s.mockProducer.On("Publish", mock.Anything, mock.MatchedBy(func(input *indexer.Message) bool { fields := input.Fields @@ -142,7 +136,7 @@ func (s *ESVisibilitySuite) TestRecordWorkflowExecutionStarted() { s.Equal(request.WorkflowTypeName, fields[es.WorkflowType].GetStringData()) s.Equal(request.StartTimestamp, fields[es.StartTime].GetIntData()) s.Equal(request.ExecutionTimestamp, fields[es.ExecutionTime].GetIntData()) - s.Equal(memoBlob.Data, fields[es.Memo].GetBinaryData()) + s.Equal(memoBytes, fields[es.Memo].GetBinaryData()) s.Equal(string(common.EncodingTypeThriftRW), fields[es.Encoding].GetStringData()) return true })).Return(nil).Once() @@ -150,14 +144,14 @@ func (s *ESVisibilitySuite) TestRecordWorkflowExecutionStarted() { ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout) defer cancel() - err = s.visibilityStore.RecordWorkflowExecutionStarted(ctx, request) + err := s.visibilityStore.RecordWorkflowExecutionStarted(ctx, request) s.NoError(err) } func (s *ESVisibilitySuite) TestRecordWorkflowExecutionStarted_EmptyRequest() { // test empty request request := &p.InternalRecordWorkflowExecutionStartedRequest{ - Memo: nil, + Memo: &p.DataBlob{}, } s.mockProducer.On("Publish", mock.Anything, mock.MatchedBy(func(input *indexer.Message) bool { s.Equal(indexer.MessageTypeIndex, input.GetMessageType()) @@ -185,14 +179,8 @@ func (s *ESVisibilitySuite) TestRecordWorkflowExecutionClosed() { request.StartTimestamp = int64(123) request.ExecutionTimestamp = int64(321) request.TaskID = int64(111) - memo := &workflow.Memo{ - Fields: map[string][]byte{"test": []byte("test bytes")}, - } - request.Memo = thrift.ToMemo(memo) - memoBlob, err := s.serializer.SerializeVisibilityMemo(memo, common.EncodingTypeThriftRW) - s.NoError(err) - request.Memo = thrift.ToMemo(memo) - + memoBytes := []byte(`test bytes`) + request.Memo = p.NewDataBlob(memoBytes, common.EncodingTypeThriftRW) request.CloseTimestamp = int64(999) closeStatus := workflow.WorkflowExecutionCloseStatusTerminated request.Status = *thrift.ToWorkflowExecutionCloseStatus(&closeStatus) @@ -206,7 +194,7 @@ func (s *ESVisibilitySuite) TestRecordWorkflowExecutionClosed() { s.Equal(request.WorkflowTypeName, fields[es.WorkflowType].GetStringData()) s.Equal(request.StartTimestamp, fields[es.StartTime].GetIntData()) s.Equal(request.ExecutionTimestamp, fields[es.ExecutionTime].GetIntData()) - s.Equal(memoBlob.Data, fields[es.Memo].GetBinaryData()) + s.Equal(memoBytes, fields[es.Memo].GetBinaryData()) s.Equal(string(common.EncodingTypeThriftRW), fields[es.Encoding].GetStringData()) s.Equal(request.CloseTimestamp, fields[es.CloseTime].GetIntData()) s.Equal(int64(closeStatus), fields[es.CloseStatus].GetIntData()) @@ -217,14 +205,14 @@ func (s *ESVisibilitySuite) TestRecordWorkflowExecutionClosed() { ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout) defer cancel() - err = s.visibilityStore.RecordWorkflowExecutionClosed(ctx, request) + err := s.visibilityStore.RecordWorkflowExecutionClosed(ctx, request) s.NoError(err) } func (s *ESVisibilitySuite) TestRecordWorkflowExecutionClosed_EmptyRequest() { // test empty request request := &p.InternalRecordWorkflowExecutionClosedRequest{ - Memo: nil, + Memo: &p.DataBlob{}, } s.mockProducer.On("Publish", mock.Anything, mock.MatchedBy(func(input *indexer.Message) bool { s.Equal(indexer.MessageTypeIndex, input.GetMessageType()) diff --git a/common/persistence/persistenceInterface.go b/common/persistence/persistenceInterface.go index 86e7d6fab08..510b0eafd23 100644 --- a/common/persistence/persistenceInterface.go +++ b/common/persistence/persistenceInterface.go @@ -630,7 +630,7 @@ type ( CloseTime time.Time Status *types.WorkflowExecutionCloseStatus HistoryLength int64 - Memo *types.Memo + Memo *DataBlob TaskList string SearchAttributes map[string]interface{} } @@ -683,7 +683,7 @@ type ( ExecutionTimestamp int64 WorkflowTimeout int64 TaskID int64 - Memo *types.Memo + Memo *DataBlob TaskList string SearchAttributes map[string][]byte } @@ -697,7 +697,7 @@ type ( StartTimestamp int64 ExecutionTimestamp int64 TaskID int64 - Memo *types.Memo + Memo *DataBlob TaskList string SearchAttributes map[string][]byte CloseTimestamp int64 @@ -716,7 +716,7 @@ type ( ExecutionTimestamp int64 WorkflowTimeout int64 TaskID int64 - Memo *types.Memo + Memo *DataBlob TaskList string SearchAttributes map[string][]byte } diff --git a/common/persistence/sql/sqlVisibilityStore.go b/common/persistence/sql/sqlVisibilityStore.go index d3d13d0f3d3..0f44028693f 100644 --- a/common/persistence/sql/sqlVisibilityStore.go +++ b/common/persistence/sql/sqlVisibilityStore.go @@ -30,18 +30,15 @@ import ( workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" "github.com/uber/cadence/common/log" - "github.com/uber/cadence/common/log/tag" p "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/persistence/sql/sqlplugin" "github.com/uber/cadence/common/service/config" - "github.com/uber/cadence/common/types" "github.com/uber/cadence/common/types/mapper/thrift" ) type ( sqlVisibilityStore struct { sqlStore - serializer p.PayloadSerializer } visibilityPageToken struct { @@ -61,7 +58,6 @@ func NewSQLVisibilityStore(cfg config.SQL, logger log.Logger) (p.VisibilityStore db: db, logger: logger, }, - serializer: p.NewPayloadSerializer(), }, nil } @@ -69,7 +65,6 @@ func (s *sqlVisibilityStore) RecordWorkflowExecutionStarted( ctx context.Context, request *p.InternalRecordWorkflowExecutionStartedRequest, ) error { - memo := s.serializeMemo(request.Memo, request.DomainUUID, request.WorkflowID, request.RunID) _, err := s.db.InsertIntoVisibility(ctx, &sqlplugin.VisibilityRow{ DomainID: request.DomainUUID, WorkflowID: request.WorkflowID, @@ -77,8 +72,8 @@ func (s *sqlVisibilityStore) RecordWorkflowExecutionStarted( StartTime: time.Unix(0, request.StartTimestamp), ExecutionTime: time.Unix(0, request.ExecutionTimestamp), WorkflowTypeName: request.WorkflowTypeName, - Memo: memo.Data, - Encoding: string(memo.GetEncoding()), + Memo: request.Memo.Data, + Encoding: string(request.Memo.GetEncoding()), }) return err @@ -88,7 +83,6 @@ func (s *sqlVisibilityStore) RecordWorkflowExecutionClosed( ctx context.Context, request *p.InternalRecordWorkflowExecutionClosedRequest, ) error { - memo := s.serializeMemo(request.Memo, request.DomainUUID, request.WorkflowID, request.RunID) closeTime := time.Unix(0, request.CloseTimestamp) result, err := s.db.ReplaceIntoVisibility(ctx, &sqlplugin.VisibilityRow{ DomainID: request.DomainUUID, @@ -100,8 +94,8 @@ func (s *sqlVisibilityStore) RecordWorkflowExecutionClosed( CloseTime: &closeTime, CloseStatus: common.Int32Ptr(int32(*thrift.FromWorkflowExecutionCloseStatus(&request.Status))), HistoryLength: &request.HistoryLength, - Memo: memo.Data, - Encoding: string(memo.GetEncoding()), + Memo: request.Memo.Data, + Encoding: string(request.Memo.GetEncoding()), }) if err != nil { return err @@ -320,20 +314,13 @@ func (s *sqlVisibilityStore) rowToInfo(row *sqlplugin.VisibilityRow) *p.Internal if row.ExecutionTime.UnixNano() == 0 { row.ExecutionTime = row.StartTime } - memo, err := s.serializer.DeserializeVisibilityMemo(p.NewDataBlob(row.Memo, common.EncodingType(row.Encoding))) - if err != nil { - s.logger.Error("failed to deserialize memo", - tag.WorkflowID(row.WorkflowID), - tag.WorkflowRunID(row.RunID), - tag.Error(err)) - } info := &p.InternalVisibilityWorkflowExecutionInfo{ WorkflowID: row.WorkflowID, RunID: row.RunID, TypeName: row.WorkflowTypeName, StartTime: row.StartTime, ExecutionTime: row.ExecutionTime, - Memo: thrift.ToMemo(memo), + Memo: p.NewDataBlob(row.Memo, common.EncodingType(row.Encoding)), } if row.CloseStatus != nil { status := workflow.WorkflowExecutionCloseStatus(*row.CloseStatus) @@ -397,19 +384,3 @@ func (s *sqlVisibilityStore) serializePageToken(token *visibilityPageToken) ([]b data, err := json.Marshal(token) return data, err } - -func (s *sqlVisibilityStore) serializeMemo(visibilityMemo *types.Memo, domainID, wID, rID string) *p.DataBlob { - memo, err := s.serializer.SerializeVisibilityMemo(thrift.FromMemo(visibilityMemo), common.EncodingTypeThriftRW) - if err != nil { - s.logger.WithTags( - tag.WorkflowDomainID(domainID), - tag.WorkflowID(wID), - tag.WorkflowRunID(rID), - tag.Error(err)). - Error("Unable to encode visibility memo") - } - if memo == nil { - return &p.DataBlob{} - } - return memo -} diff --git a/common/persistence/visibilityStore.go b/common/persistence/visibilityStore.go index eef5b4fcc18..16b9505ff8c 100644 --- a/common/persistence/visibilityStore.go +++ b/common/persistence/visibilityStore.go @@ -39,6 +39,9 @@ type ( } ) +// VisibilityEncoding is default encoding for visibility data +const VisibilityEncoding = common.EncodingTypeThriftRW + var _ VisibilityManager = (*visibilityManagerImpl)(nil) // NewVisibilityManagerImpl returns new VisibilityManager @@ -72,7 +75,7 @@ func (v *visibilityManagerImpl) RecordWorkflowExecutionStarted( WorkflowTimeout: request.WorkflowTimeout, TaskID: request.TaskID, TaskList: request.TaskList, - Memo: thrift.ToMemo(request.Memo), + Memo: v.serializeMemo(request.Memo, request.DomainUUID, request.Execution.GetWorkflowId(), request.Execution.GetRunId()), SearchAttributes: request.SearchAttributes, } return v.persistence.RecordWorkflowExecutionStarted(ctx, req) @@ -90,7 +93,7 @@ func (v *visibilityManagerImpl) RecordWorkflowExecutionClosed( StartTimestamp: request.StartTimestamp, ExecutionTimestamp: request.ExecutionTimestamp, TaskID: request.TaskID, - Memo: thrift.ToMemo(request.Memo), + Memo: v.serializeMemo(request.Memo, request.DomainUUID, request.Execution.GetWorkflowId(), request.Execution.GetRunId()), TaskList: request.TaskList, SearchAttributes: request.SearchAttributes, CloseTimestamp: request.CloseTimestamp, @@ -113,7 +116,7 @@ func (v *visibilityManagerImpl) UpsertWorkflowExecution( StartTimestamp: request.StartTimestamp, ExecutionTimestamp: request.ExecutionTimestamp, TaskID: request.TaskID, - Memo: thrift.ToMemo(request.Memo), + Memo: v.serializeMemo(request.Memo, request.DomainUUID, request.Execution.GetWorkflowId(), request.Execution.GetRunId()), TaskList: request.TaskList, SearchAttributes: request.SearchAttributes, } @@ -335,7 +338,13 @@ func (v *visibilityManagerImpl) convertVisibilityWorkflowExecutionInfo(execution execution.ExecutionTime = execution.StartTime } - memo := thrift.FromMemo(execution.Memo) + memo, err := v.serializer.DeserializeVisibilityMemo(execution.Memo) + if err != nil { + v.logger.Error("failed to deserialize memo", + tag.WorkflowID(execution.WorkflowID), + tag.WorkflowRunID(execution.RunID), + tag.Error(err)) + } searchAttributes, err := v.getSearchAttributes(execution.SearchAttributes) if err != nil { v.logger.Error("failed to convert search attributes", @@ -396,3 +405,19 @@ func (v *visibilityManagerImpl) toInternalListWorkflowExecutionsRequest(req *Lis NextPageToken: req.NextPageToken, } } + +func (v *visibilityManagerImpl) serializeMemo(visibilityMemo *shared.Memo, domainID, wID, rID string) *DataBlob { + memo, err := v.serializer.SerializeVisibilityMemo(visibilityMemo, VisibilityEncoding) + if err != nil { + v.logger.WithTags( + tag.WorkflowDomainID(domainID), + tag.WorkflowID(wID), + tag.WorkflowRunID(rID), + tag.Error(err)). + Error("Unable to encode visibility memo") + } + if memo == nil { + return &DataBlob{} + } + return memo +}