Skip to content

Commit

Permalink
Merge branch 'master' into roadmap
Browse files Browse the repository at this point in the history
  • Loading branch information
meiliang86 authored May 5, 2020
2 parents 5786a3b + e36e80b commit cb4cacf
Show file tree
Hide file tree
Showing 49 changed files with 2,907 additions and 183 deletions.
52 changes: 48 additions & 4 deletions .gen/go/replicator/replicator.go

Large diffs are not rendered by default.

17 changes: 15 additions & 2 deletions .gen/go/shared/shared.go

Large diffs are not rendered by default.

13 changes: 7 additions & 6 deletions common/archiver/filestore/historyArchiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/service/config"
"github.com/uber/cadence/common/util"
)

const (
Expand Down Expand Up @@ -168,13 +169,13 @@ func (h *historyArchiver) Archive(
}

dirPath := URI.Path()
if err = common.MkdirAll(dirPath, h.dirMode); err != nil {
if err = util.MkdirAll(dirPath, h.dirMode); err != nil {
logger.Error(archiver.ArchiveNonRetriableErrorMsg, tag.ArchivalArchiveFailReason(errMakeDirectory), tag.Error(err))
return err
}

filename := constructHistoryFilename(request.DomainID, request.WorkflowID, request.RunID, request.CloseFailoverVersion)
if err := common.WriteFile(path.Join(dirPath, filename), encodedHistoryBatches, h.fileMode); err != nil {
if err := util.WriteFile(path.Join(dirPath, filename), encodedHistoryBatches, h.fileMode); err != nil {
logger.Error(archiver.ArchiveNonRetriableErrorMsg, tag.ArchivalArchiveFailReason(errWriteFile), tag.Error(err))
return err
}
Expand All @@ -196,7 +197,7 @@ func (h *historyArchiver) Get(
}

dirPath := URI.Path()
exists, err := common.DirectoryExists(dirPath)
exists, err := util.DirectoryExists(dirPath)
if err != nil {
return nil, &shared.InternalServiceError{Message: err.Error()}
}
Expand Down Expand Up @@ -228,15 +229,15 @@ func (h *historyArchiver) Get(

filename := constructHistoryFilename(request.DomainID, request.WorkflowID, request.RunID, token.CloseFailoverVersion)
filepath := path.Join(dirPath, filename)
exists, err = common.FileExists(filepath)
exists, err = util.FileExists(filepath)
if err != nil {
return nil, &shared.InternalServiceError{Message: err.Error()}
}
if !exists {
return nil, &shared.EntityNotExistsError{Message: archiver.ErrHistoryNotExist.Error()}
}

encodedHistoryBatches, err := common.ReadFile(filepath)
encodedHistoryBatches, err := util.ReadFile(filepath)
if err != nil {
return nil, &shared.InternalServiceError{Message: err.Error()}
}
Expand Down Expand Up @@ -298,7 +299,7 @@ func getNextHistoryBlob(ctx context.Context, historyIterator archiver.HistoryIte
}

func getHighestVersion(dirPath string, request *archiver.GetHistoryRequest) (*int64, error) {
filenames, err := common.ListFilesByPrefix(dirPath, constructHistoryFilenamePrefix(request.DomainID, request.WorkflowID, request.RunID))
filenames, err := util.ListFilesByPrefix(dirPath, constructHistoryFilenamePrefix(request.DomainID, request.WorkflowID, request.RunID))
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions common/archiver/filestore/historyArchiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/service/config"
"github.com/uber/cadence/common/util"
)

const (
Expand Down Expand Up @@ -578,12 +579,12 @@ func (s *historyArchiverSuite) writeHistoryBatchesForGetTest(historyBatches []*s
data, err := encode(historyBatches)
s.Require().NoError(err)
filename := constructHistoryFilename(testDomainID, testWorkflowID, testRunID, version)
err = common.WriteFile(path.Join(s.testGetDirectory, filename), data, testFileMode)
err = util.WriteFile(path.Join(s.testGetDirectory, filename), data, testFileMode)
s.Require().NoError(err)
}

func (s *historyArchiverSuite) assertFileExists(filepath string) {
exists, err := common.FileExists(filepath)
exists, err := util.FileExists(filepath)
s.NoError(err)
s.True(exists)
}
Expand Down
4 changes: 2 additions & 2 deletions common/archiver/filestore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (
"github.com/dgryski/go-farm"

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/util"
)

var (
Expand Down Expand Up @@ -116,7 +116,7 @@ func validateDirPath(dirPath string) error {
return err
}
if !info.IsDir() {
return common.ErrDirectoryExpected
return util.ErrDirectoryExpected
}
return nil
}
Expand Down
5 changes: 3 additions & 2 deletions common/archiver/filestore/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/util"
)

const (
Expand Down Expand Up @@ -120,7 +121,7 @@ func (s *UtilSuite) TestValidateDirPath() {
},
{
dirPath: fpath,
expectedErr: common.ErrDirectoryExpected,
expectedErr: util.ErrDirectoryExpected,
},
}

Expand Down Expand Up @@ -314,7 +315,7 @@ func (s *UtilSuite) createFile(dir string, filename string) {
}

func (s *UtilSuite) assertDirectoryExists(path string) {
exists, err := common.DirectoryExists(path)
exists, err := util.DirectoryExists(path)
s.NoError(err)
s.True(exists)
}
11 changes: 6 additions & 5 deletions common/archiver/filestore/visibilityArchiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/service/config"
"github.com/uber/cadence/common/util"
)

const (
Expand Down Expand Up @@ -110,7 +111,7 @@ func (v *visibilityArchiver) Archive(
}

dirPath := path.Join(URI.Path(), request.DomainID)
if err = common.MkdirAll(dirPath, v.dirMode); err != nil {
if err = util.MkdirAll(dirPath, v.dirMode); err != nil {
logger.Error(archiver.ArchiveNonRetriableErrorMsg, tag.ArchivalArchiveFailReason(errMakeDirectory), tag.Error(err))
return err
}
Expand All @@ -124,7 +125,7 @@ func (v *visibilityArchiver) Archive(
// The filename has the format: closeTimestamp_hash(runID).visibility
// This format allows the archiver to sort all records without reading the file contents
filename := constructVisibilityFilename(request.CloseTimestamp, request.RunID)
if err := common.WriteFile(path.Join(dirPath, filename), encodedVisibilityRecord, v.fileMode); err != nil {
if err := util.WriteFile(path.Join(dirPath, filename), encodedVisibilityRecord, v.fileMode); err != nil {
logger.Error(archiver.ArchiveNonRetriableErrorMsg, tag.ArchivalArchiveFailReason(errWriteFile), tag.Error(err))
return err
}
Expand Down Expand Up @@ -177,15 +178,15 @@ func (v *visibilityArchiver) query(
}

dirPath := path.Join(URI.Path(), request.domainID)
exists, err := common.DirectoryExists(dirPath)
exists, err := util.DirectoryExists(dirPath)
if err != nil {
return nil, &shared.InternalServiceError{Message: err.Error()}
}
if !exists {
return &archiver.QueryVisibilityResponse{}, nil
}

files, err := common.ListFiles(dirPath)
files, err := util.ListFiles(dirPath)
if err != nil {
return nil, &shared.InternalServiceError{Message: err.Error()}
}
Expand All @@ -200,7 +201,7 @@ func (v *visibilityArchiver) query(

response := &archiver.QueryVisibilityResponse{}
for idx, file := range files {
encodedRecord, err := common.ReadFile(path.Join(dirPath, file))
encodedRecord, err := util.ReadFile(path.Join(dirPath, file))
if err != nil {
return nil, &shared.InternalServiceError{Message: err.Error()}
}
Expand Down
7 changes: 4 additions & 3 deletions common/archiver/filestore/visibilityArchiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/service/config"
"github.com/uber/cadence/common/util"
)

const (
Expand Down Expand Up @@ -189,7 +190,7 @@ func (s *visibilityArchiverSuite) TestArchive_Success() {
filepath := path.Join(dir, testDomainID, expectedFilename)
s.assertFileExists(filepath)

data, err := common.ReadFile(filepath)
data, err := util.ReadFile(filepath)
s.NoError(err)

archivedRecord := &archiver.ArchiveVisibilityRequest{}
Expand Down Expand Up @@ -573,12 +574,12 @@ func (s *visibilityArchiverSuite) writeVisibilityRecordForQueryTest(record *visi
s.Require().NoError(err)
filename := constructVisibilityFilename(record.CloseTimestamp, record.RunID)
s.Require().NoError(os.MkdirAll(path.Join(s.testQueryDirectory, record.DomainID), testDirMode))
err = common.WriteFile(path.Join(s.testQueryDirectory, record.DomainID, filename), data, testFileMode)
err = util.WriteFile(path.Join(s.testQueryDirectory, record.DomainID, filename), data, testFileMode)
s.Require().NoError(err)
}

func (s *visibilityArchiverSuite) assertFileExists(filepath string) {
exists, err := common.FileExists(filepath)
exists, err := util.FileExists(filepath)
s.NoError(err)
s.True(exists)
}
12 changes: 6 additions & 6 deletions common/blobstore/filestore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ import (
"fmt"
"os"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/blobstore"
"github.com/uber/cadence/common/service/config"
"github.com/uber/cadence/common/util"
)

type (
Expand All @@ -49,12 +49,12 @@ func NewFilestoreClient(cfg *config.FileBlobstore) (blobstore.Client, error) {
return nil, errors.New("output directory not given for file blobstore")
}
outputDirectory := cfg.OutputDirectory
exists, err := common.DirectoryExists(outputDirectory)
exists, err := util.DirectoryExists(outputDirectory)
if err != nil {
return nil, err
}
if !exists {
if err := common.MkdirAll(outputDirectory, os.FileMode(0766)); err != nil {
if err := util.MkdirAll(outputDirectory, os.FileMode(0766)); err != nil {
return nil, err
}
}
Expand All @@ -69,15 +69,15 @@ func (c *client) Put(_ context.Context, request *blobstore.PutRequest) (*blobsto
if err != nil {
return nil, err
}
if err := common.WriteFile(c.filepath(request.Key), data, os.FileMode(0666)); err != nil {
if err := util.WriteFile(c.filepath(request.Key), data, os.FileMode(0666)); err != nil {
return nil, err
}
return &blobstore.PutResponse{}, nil
}

// Get fetches a blob
func (c *client) Get(_ context.Context, request *blobstore.GetRequest) (*blobstore.GetResponse, error) {
data, err := common.ReadFile(c.filepath(request.Key))
data, err := util.ReadFile(c.filepath(request.Key))
if err != nil {
return nil, err
}
Expand All @@ -92,7 +92,7 @@ func (c *client) Get(_ context.Context, request *blobstore.GetRequest) (*blobsto

// Exists determines if a blob exists
func (c *client) Exists(_ context.Context, request *blobstore.ExistsRequest) (*blobstore.ExistsResponse, error) {
exists, err := common.FileExists(c.filepath(request.Key))
exists, err := util.FileExists(c.filepath(request.Key))
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions common/blobstore/filestore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/blobstore"
"github.com/uber/cadence/common/service/config"
"github.com/uber/cadence/common/util"
)

type ClientSuite struct {
Expand Down Expand Up @@ -67,13 +67,13 @@ func (s *ClientSuite) TestNewFilestoreClient_DirectoryAlreadyExists() {
func (s *ClientSuite) TestNewFilestoreClient_DirectoryNotAlreadyExists() {
name := s.createTempDir("TestNewFilestoreClient_DirectoryNotAlreadyExists")
os.RemoveAll(name)
exists, err := common.DirectoryExists(name)
exists, err := util.DirectoryExists(name)
s.NoError(err)
s.False(exists)
c, err := NewFilestoreClient(&config.FileBlobstore{OutputDirectory: name})
s.NoError(err)
s.Equal(name, c.(*client).outputDirectory)
exists, err = common.DirectoryExists(name)
exists, err = util.DirectoryExists(name)
s.NoError(err)
s.True(exists)
os.RemoveAll(name)
Expand Down
21 changes: 20 additions & 1 deletion common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ type (
failoverVersion int64
isGlobalDomain bool
failoverNotificationVersion int64
failoverEndTime *int64
notificationVersion int64
initialized bool
}
Expand Down Expand Up @@ -547,6 +548,7 @@ func (c *domainCache) updateIDToDomainCache(
entry.failoverVersion = record.failoverVersion
entry.isGlobalDomain = record.isGlobalDomain
entry.failoverNotificationVersion = record.failoverNotificationVersion
entry.failoverEndTime = record.failoverEndTime
entry.notificationVersion = record.notificationVersion
entry.initialized = record.initialized

Expand Down Expand Up @@ -675,6 +677,7 @@ func (c *domainCache) buildEntryFromRecord(
newEntry.failoverVersion = record.FailoverVersion
newEntry.isGlobalDomain = record.IsGlobalDomain
newEntry.failoverNotificationVersion = record.FailoverNotificationVersion
newEntry.failoverEndTime = record.FailoverEndTime
newEntry.notificationVersion = record.NotificationVersion
newEntry.initialized = true
return newEntry
Expand Down Expand Up @@ -723,6 +726,7 @@ func (entry *DomainCacheEntry) duplicate() *DomainCacheEntry {
result.failoverVersion = entry.failoverVersion
result.isGlobalDomain = entry.isGlobalDomain
result.failoverNotificationVersion = entry.failoverNotificationVersion
result.failoverEndTime = entry.failoverEndTime
result.notificationVersion = entry.notificationVersion
result.initialized = entry.initialized
return result
Expand Down Expand Up @@ -774,7 +778,16 @@ func (entry *DomainCacheEntry) IsDomainActive() bool {
// domain is not a global domain, meaning domain is always "active" within each cluster
return true
}
return entry.clusterMetadata.GetCurrentClusterName() == entry.replicationConfig.ActiveClusterName
return entry.clusterMetadata.GetCurrentClusterName() == entry.replicationConfig.ActiveClusterName && !entry.IsDomainPendingActive()
}

// IsDomainPendingActive returns whether the domain is in pending active state
func (entry *DomainCacheEntry) IsDomainPendingActive() bool {
if !entry.isGlobalDomain {
// domain is not a global domain, meaning domain is always "active" within each cluster
return true
}
return entry.failoverEndTime != nil
}

// GetReplicationPolicy return the derived workflow replication policy
Expand All @@ -793,6 +806,12 @@ func (entry *DomainCacheEntry) GetDomainNotActiveErr() error {
// domain is consider active
return nil
}
if entry.IsDomainPendingActive() {
return errors.NewDomainPendingActiveError(
entry.info.Name,
entry.clusterMetadata.GetCurrentClusterName(),
)
}
return errors.NewDomainNotActiveError(
entry.info.Name,
entry.clusterMetadata.GetCurrentClusterName(),
Expand Down
13 changes: 13 additions & 0 deletions common/errors/domainNotActiveError.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,16 @@ func NewDomainNotActiveError(domainName string, currentCluster string, activeClu
ActiveCluster: activeCluster,
}
}

// NewDomainPendingActiveError return a domain not active error
func NewDomainPendingActiveError(domainName string, currentCluster string) *workflow.DomainNotActiveError {
return &workflow.DomainNotActiveError{
Message: fmt.Sprintf(
"Domain: %s is pending active in cluster: %s.",
domainName,
currentCluster,
),
DomainName: domainName,
CurrentCluster: currentCluster,
}
}
Loading

0 comments on commit cb4cacf

Please sign in to comment.