Skip to content

Commit

Permalink
Convert PayloadSerializer to internal types (#3799)
Browse files Browse the repository at this point in the history
  • Loading branch information
vytautas-karpavicius authored Dec 1, 2020
1 parent 94c063a commit 2069ee4
Show file tree
Hide file tree
Showing 28 changed files with 295 additions and 255 deletions.
2 changes: 1 addition & 1 deletion common/ndc/history_resender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func (s *historyResenderSuite) TestCurrentExecutionCheck() {
}

func (s *historyResenderSuite) serializeEvents(events []*shared.HistoryEvent) *shared.DataBlob {
blob, err := s.serializer.SerializeBatchEvents(events, common.EncodingTypeThriftRW)
blob, err := s.serializer.SerializeBatchEvents(thrift.ToHistoryEventArray(events), common.EncodingTypeThriftRW)
s.Nil(err)
return &shared.DataBlob{
EncodingType: shared.EncodingTypeThriftRW.Ptr(),
Expand Down
39 changes: 20 additions & 19 deletions common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/common/types/mapper/thrift"
)

Expand Down Expand Up @@ -131,7 +132,7 @@ func (m *executionManagerImpl) DeserializeExecutionInfo(
}

newInfo := &WorkflowExecutionInfo{
CompletionEvent: completionEvent,
CompletionEvent: thrift.FromHistoryEvent(completionEvent),

DomainID: info.DomainID,
WorkflowID: info.WorkflowID,
Expand Down Expand Up @@ -183,7 +184,7 @@ func (m *executionManagerImpl) DeserializeExecutionInfo(
BranchToken: info.BranchToken,
CronSchedule: info.CronSchedule,
ExpirationSeconds: int32(info.ExpirationSeconds.Seconds()),
AutoResetPoints: autoResetPoints,
AutoResetPoints: thrift.FromResetPoints(autoResetPoints),
SearchAttributes: info.SearchAttributes,
Memo: info.Memo,
}
Expand All @@ -197,15 +198,15 @@ func (m *executionManagerImpl) DeserializeBufferedEvents(
blobs []*DataBlob,
) ([]*workflow.HistoryEvent, error) {

events := make([]*workflow.HistoryEvent, 0)
events := make([]*types.HistoryEvent, 0)
for _, b := range blobs {
history, err := m.serializer.DeserializeBatchEvents(b)
if err != nil {
return nil, err
}
events = append(events, history...)
}
return events, nil
return thrift.FromHistoryEventArray(events), nil
}

func (m *executionManagerImpl) DeserializeChildExecutionInfos(
Expand All @@ -223,8 +224,8 @@ func (m *executionManagerImpl) DeserializeChildExecutionInfos(
return nil, err
}
c := &ChildExecutionInfo{
InitiatedEvent: initiatedEvent,
StartedEvent: startedEvent,
InitiatedEvent: thrift.FromHistoryEvent(initiatedEvent),
StartedEvent: thrift.FromHistoryEvent(startedEvent),

Version: v.Version,
InitiatedID: v.InitiatedID,
Expand All @@ -246,8 +247,8 @@ func (m *executionManagerImpl) DeserializeChildExecutionInfos(
if startedEvent != nil && startedEvent.ChildWorkflowExecutionStartedEventAttributes != nil &&
startedEvent.ChildWorkflowExecutionStartedEventAttributes.WorkflowExecution != nil {
startedExecution := startedEvent.ChildWorkflowExecutionStartedEventAttributes.WorkflowExecution
c.StartedWorkflowID = startedExecution.GetWorkflowId()
c.StartedRunID = startedExecution.GetRunId()
c.StartedWorkflowID = startedExecution.GetWorkflowID()
c.StartedRunID = startedExecution.GetRunID()
}
newInfos[k] = c
}
Expand All @@ -269,8 +270,8 @@ func (m *executionManagerImpl) DeserializeActivityInfos(
return nil, err
}
a := &ActivityInfo{
ScheduledEvent: scheduledEvent,
StartedEvent: startedEvent,
ScheduledEvent: thrift.FromHistoryEvent(scheduledEvent),
StartedEvent: thrift.FromHistoryEvent(startedEvent),

Version: v.Version,
ScheduleID: v.ScheduleID,
Expand Down Expand Up @@ -347,11 +348,11 @@ func (m *executionManagerImpl) SerializeUpsertChildExecutionInfos(

newInfos := make([]*InternalChildExecutionInfo, 0)
for _, v := range infos {
initiatedEvent, err := m.serializer.SerializeEvent(v.InitiatedEvent, encoding)
initiatedEvent, err := m.serializer.SerializeEvent(thrift.ToHistoryEvent(v.InitiatedEvent), encoding)
if err != nil {
return nil, err
}
startedEvent, err := m.serializer.SerializeEvent(v.StartedEvent, encoding)
startedEvent, err := m.serializer.SerializeEvent(thrift.ToHistoryEvent(v.StartedEvent), encoding)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -382,11 +383,11 @@ func (m *executionManagerImpl) SerializeUpsertActivityInfos(

newInfos := make([]*InternalActivityInfo, 0)
for _, v := range infos {
scheduledEvent, err := m.serializer.SerializeEvent(v.ScheduledEvent, encoding)
scheduledEvent, err := m.serializer.SerializeEvent(thrift.ToHistoryEvent(v.ScheduledEvent), encoding)
if err != nil {
return nil, err
}
startedEvent, err := m.serializer.SerializeEvent(v.StartedEvent, encoding)
startedEvent, err := m.serializer.SerializeEvent(thrift.ToHistoryEvent(v.StartedEvent), encoding)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -440,12 +441,12 @@ func (m *executionManagerImpl) SerializeExecutionInfo(
if info == nil {
return &InternalWorkflowExecutionInfo{}, nil
}
completionEvent, err := m.serializer.SerializeEvent(info.CompletionEvent, encoding)
completionEvent, err := m.serializer.SerializeEvent(thrift.ToHistoryEvent(info.CompletionEvent), encoding)
if err != nil {
return nil, err
}

resetPoints, err := m.serializer.SerializeResetPoints(info.AutoResetPoints, encoding)
resetPoints, err := m.serializer.SerializeResetPoints(thrift.ToResetPoints(info.AutoResetPoints), encoding)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -635,7 +636,7 @@ func (m *executionManagerImpl) SerializeWorkflowMutation(
}
var serializedNewBufferedEvents *DataBlob
if input.NewBufferedEvents != nil {
serializedNewBufferedEvents, err = m.serializer.SerializeBatchEvents(input.NewBufferedEvents, encoding)
serializedNewBufferedEvents, err = m.serializer.SerializeBatchEvents(thrift.ToHistoryEventArray(input.NewBufferedEvents), encoding)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -745,7 +746,7 @@ func (m *executionManagerImpl) SerializeVersionHistories(
if versionHistories == nil {
return nil, nil
}
return m.serializer.SerializeVersionHistories(versionHistories.ToThrift(), encoding)
return m.serializer.SerializeVersionHistories(versionHistories.ToInternalType(), encoding)
}

func (m *executionManagerImpl) DeserializeVersionHistories(
Expand All @@ -759,7 +760,7 @@ func (m *executionManagerImpl) DeserializeVersionHistories(
if err != nil {
return nil, err
}
return NewVersionHistoriesFromThrift(versionHistories), nil
return NewVersionHistoriesFromInternalType(versionHistories), nil
}

func (m *executionManagerImpl) DeleteWorkflowExecution(
Expand Down
5 changes: 3 additions & 2 deletions common/persistence/historyStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (m *historyV2ManagerImpl) AppendHistoryNodes(
}

// nodeID will be the first eventID
blob, err := m.historySerializer.SerializeBatchEvents(request.Events, request.Encoding)
blob, err := m.historySerializer.SerializeBatchEvents(thrift.ToHistoryEventArray(request.Events), request.Encoding)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -457,10 +457,11 @@ func (m *historyV2ManagerImpl) readHistoryBranch(
lastFirstEventID := common.EmptyEventID

for _, batch := range dataBlobs {
events, err := m.historySerializer.DeserializeBatchEvents(batch)
thriftEvents, err := m.historySerializer.DeserializeBatchEvents(batch)
if err != nil {
return nil, nil, nil, 0, 0, err
}
events := thrift.FromHistoryEventArray(thriftEvents)
if len(events) == 0 {
logger.Error("Empty events in a batch")
return nil, nil, nil, 0, 0, &types.InternalServiceError{
Expand Down
5 changes: 3 additions & 2 deletions common/persistence/metadataStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (m *metadataManagerImpl) toInternalDomainConfig(c *DomainConfig) (InternalD
if c.BadBinaries.Binaries == nil {
c.BadBinaries.Binaries = map[string]*shared.BadBinaryInfo{}
}
badBinaries, err := m.serializer.SerializeBadBinaries(&c.BadBinaries, common.EncodingTypeThriftRW)
badBinaries, err := m.serializer.SerializeBadBinaries(thrift.ToBadBinaries(&c.BadBinaries), common.EncodingTypeThriftRW)
if err != nil {
return InternalDomainConfig{}, err
}
Expand All @@ -208,10 +208,11 @@ func (m *metadataManagerImpl) fromInternalDomainConfig(ic *InternalDomainConfig)
if ic == nil {
return DomainConfig{}, nil
}
badBinaries, err := m.serializer.DeserializeBadBinaries(ic.BadBinaries)
internalBadBinaries, err := m.serializer.DeserializeBadBinaries(ic.BadBinaries)
if err != nil {
return DomainConfig{}, err
}
badBinaries := thrift.FromBadBinaries(internalBadBinaries)
if badBinaries.Binaries == nil {
badBinaries.Binaries = map[string]*shared.BadBinaryInfo{}
}
Expand Down
Loading

0 comments on commit 2069ee4

Please sign in to comment.