Skip to content

Commit

Permalink
[BUG] Datacatalog put errors don't get persisted (#4938)
Browse files Browse the repository at this point in the history
* bubble up catalog put failures with populated catalogmetadata

Signed-off-by: Paul Dittamo <[email protected]>

* tests

Signed-off-by: Paul Dittamo <[email protected]>

* don't error on already exists catalog puts

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>
  • Loading branch information
pvditt authored Feb 27, 2024
1 parent b43af79 commit 311de70
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 19 deletions.
7 changes: 7 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/catalog/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ func (s Status) GetMetadata() *core.CatalogMetadata {
return s.metadata
}

func NewPutFailureStatus(key *Key) Status {
md := &core.CatalogMetadata{
DatasetId: &key.Identifier,
}
return Status{cacheStatus: core.CatalogCacheStatus_CACHE_PUT_FAILURE, metadata: md}
}

func NewStatus(cacheStatus core.CatalogCacheStatus, md *core.CatalogMetadata) Status {
return Status{cacheStatus: cacheStatus, metadata: md}
}
Expand Down
20 changes: 20 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/catalog/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,28 @@ var (
Name: "artifactName",
},
}
key = &Key{
Identifier: core.Identifier{
Project: "project",
Domain: "domain",
Name: "name",
Version: "1.0.0",
},
CacheVersion: "1.0.0",
TypedInterface: core.TypedInterface{
Inputs: nil,
Outputs: nil,
},
}
)

func TestNewPutFailureStatus(t *testing.T) {
status := NewPutFailureStatus(key)

assert.Equal(t, status.GetCacheStatus(), core.CatalogCacheStatus_CACHE_PUT_FAILURE)
assert.EqualValues(t, status.GetMetadata().GetDatasetId(), &key.Identifier)
}

func TestStatus(t *testing.T) {
status := NewStatus(cacheStatus, &catalogMetadata)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ func (m *CatalogClient) Get(ctx context.Context, key catalog.Key) (catalog.Entry
return catalog.NewCatalogEntry(ioutils.NewInMemoryOutputReader(outputs, nil, nil), catalog.NewStatus(core.CatalogCacheStatus_CACHE_HIT, md)), nil
}

// CreateDataset creates a Dataset in datacatalog including the associated metadata.
func (m *CatalogClient) CreateDataset(ctx context.Context, key catalog.Key, metadata *datacatalog.Metadata) (*datacatalog.DatasetID, error) {
// createDataset creates a Dataset in datacatalog including the associated metadata.
func (m *CatalogClient) createDataset(ctx context.Context, key catalog.Key, metadata *datacatalog.Metadata) (*datacatalog.DatasetID, error) {
datasetID, err := GenerateDatasetIDForTask(ctx, key)
if err != nil {
logger.Errorf(ctx, "DataCatalog failed to generate dataset for ID: %s, err: %s", key.Identifier, err)
Expand Down Expand Up @@ -205,9 +205,9 @@ func (m *CatalogClient) prepareInputsAndOutputs(ctx context.Context, key catalog
return inputs, outputs, nil
}

// CreateArtifact creates an Artifact in datacatalog including its associated ArtifactData and tags it with a hash of
// createArtifact creates an Artifact in datacatalog including its associated ArtifactData and tags it with a hash of
// the provided input values for retrieval.
func (m *CatalogClient) CreateArtifact(ctx context.Context, key catalog.Key, datasetID *datacatalog.DatasetID, inputs *core.LiteralMap, outputs *core.LiteralMap, metadata catalog.Metadata) (catalog.Status, error) {
func (m *CatalogClient) createArtifact(ctx context.Context, key catalog.Key, datasetID *datacatalog.DatasetID, inputs *core.LiteralMap, outputs *core.LiteralMap, metadata catalog.Metadata) (catalog.Status, error) {
logger.Debugf(ctx, "Creating artifact for key %+v, dataset %+v and execution %+v", key, datasetID, metadata)

// Create the artifact for the execution that belongs in the task
Expand Down Expand Up @@ -263,8 +263,8 @@ func (m *CatalogClient) CreateArtifact(ctx context.Context, key catalog.Key, dat
return catalog.NewStatus(core.CatalogCacheStatus_CACHE_POPULATED, EventCatalogMetadata(datasetID, tag, nil)), nil
}

// UpdateArtifact overwrites the ArtifactData of an existing artifact with the provided data in datacatalog.
func (m *CatalogClient) UpdateArtifact(ctx context.Context, key catalog.Key, datasetID *datacatalog.DatasetID, inputs *core.LiteralMap, outputs *core.LiteralMap, metadata catalog.Metadata) (catalog.Status, error) {
// updateArtifact overwrites the ArtifactData of an existing artifact with the provided data in datacatalog.
func (m *CatalogClient) updateArtifact(ctx context.Context, key catalog.Key, datasetID *datacatalog.DatasetID, inputs *core.LiteralMap, outputs *core.LiteralMap, metadata catalog.Metadata) (catalog.Status, error) {
logger.Debugf(ctx, "Updating artifact for key %+v, dataset %+v and execution %+v", key, datasetID, metadata)

artifactDataList := make([]*datacatalog.ArtifactData, 0, len(outputs.Literals))
Expand Down Expand Up @@ -316,17 +316,21 @@ func (m *CatalogClient) UpdateArtifact(ctx context.Context, key catalog.Key, dat
// Lastly, CatalogClient will create an Artifact tagged with the input value hash and store the provided execution data.
func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.OutputReader, metadata catalog.Metadata) (catalog.Status, error) {
// Ensure dataset exists, idempotent operations. Populate Metadata for later recovery
datasetID, err := m.CreateDataset(ctx, key, GetDatasetMetadataForSource(metadata.TaskExecutionIdentifier))
datasetID, err := m.createDataset(ctx, key, GetDatasetMetadataForSource(metadata.TaskExecutionIdentifier))
if err != nil {
return catalog.Status{}, err
return catalog.NewPutFailureStatus(&key), err
}

inputs, outputs, err := m.prepareInputsAndOutputs(ctx, key, reader)
if err != nil {
return catalog.Status{}, err
return catalog.NewPutFailureStatus(&key), err
}

return m.CreateArtifact(ctx, key, datasetID, inputs, outputs, metadata)
createArtifactStatus, err := m.createArtifact(ctx, key, datasetID, inputs, outputs, metadata)
if err != nil {
return catalog.NewPutFailureStatus(&key), err
}
return createArtifactStatus, err
}

// Update stores the result of a task execution as a cached Artifact, overwriting any already stored data from a previous
Expand All @@ -337,27 +341,31 @@ func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.Outp
// has of the input values will exist.
func (m *CatalogClient) Update(ctx context.Context, key catalog.Key, reader io.OutputReader, metadata catalog.Metadata) (catalog.Status, error) {
// Ensure dataset exists, idempotent operations. Populate Metadata for later recovery
datasetID, err := m.CreateDataset(ctx, key, GetDatasetMetadataForSource(metadata.TaskExecutionIdentifier))
datasetID, err := m.createDataset(ctx, key, GetDatasetMetadataForSource(metadata.TaskExecutionIdentifier))
if err != nil {
return catalog.Status{}, err
return catalog.NewPutFailureStatus(&key), err
}

inputs, outputs, err := m.prepareInputsAndOutputs(ctx, key, reader)
if err != nil {
return catalog.Status{}, err
return catalog.NewPutFailureStatus(&key), err
}

catalogStatus, err := m.UpdateArtifact(ctx, key, datasetID, inputs, outputs, metadata)
catalogStatus, err := m.updateArtifact(ctx, key, datasetID, inputs, outputs, metadata)
if err != nil {
if status.Code(err) == codes.NotFound {
// No existing artifact found (e.g. initial execution of task with overwrite flag already set),
// silently ignore error and create artifact instead to make overwriting an idempotent operation.
logger.Debugf(ctx, "Artifact %+v for dataset %+v does not exist while updating, creating instead", key, datasetID)
return m.CreateArtifact(ctx, key, datasetID, inputs, outputs, metadata)
createArtifactStatus, err := m.createArtifact(ctx, key, datasetID, inputs, outputs, metadata)
if err != nil {
return catalog.NewPutFailureStatus(&key), err
}
return createArtifactStatus, nil
}

logger.Errorf(ctx, "Failed to update artifact %+v for dataset %+v: %v", key, datasetID, err)
return catalog.Status{}, err
return catalog.NewPutFailureStatus(&key), err
}

logger.Debugf(ctx, "Successfully updated artifact %+v for dataset %+v", key, datasetID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,73 @@ func TestCatalog_Put(t *testing.T) {
assert.Equal(t, "flyte_cached-BE6CZsMk6N3ExR_4X9EuwBgj2Jh2UwasXK3a_pM9xlY", s.GetMetadata().ArtifactTag.Name)
})

t.Run("Create dataset fails", func(t *testing.T) {
ir := &mocks2.InputReader{}
ir.On("Get", mock.Anything).Return(sampleParameters, nil, nil)

mockClient := &mocks.DataCatalogClient{}
discovery := &CatalogClient{
client: mockClient,
}

mockClient.On("CreateDataset",
ctx,
mock.MatchedBy(func(o *datacatalog.CreateDatasetRequest) bool {
return true
}),
).Return(&datacatalog.CreateDatasetResponse{}, errors.New("generic error"))

newKey := sampleKey
newKey.InputReader = ir
or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil)
s, err := discovery.Put(ctx, newKey, or, catalog.Metadata{
WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{
Name: "test",
},
TaskExecutionIdentifier: nil,
})
assert.Error(t, err)
assert.Equal(t, core.CatalogCacheStatus_CACHE_PUT_FAILURE, s.GetCacheStatus())
assert.NotNil(t, s.GetMetadata())
})

t.Run("Create artifact fails", func(t *testing.T) {
ir := &mocks2.InputReader{}
ir.On("Get", mock.Anything).Return(sampleParameters, nil, nil)

mockClient := &mocks.DataCatalogClient{}
discovery := &CatalogClient{
client: mockClient,
}

mockClient.On("CreateDataset",
ctx,
mock.MatchedBy(func(o *datacatalog.CreateDatasetRequest) bool {
return true
}),
).Return(&datacatalog.CreateDatasetResponse{}, nil)

mockClient.On("CreateArtifact",
ctx,
mock.MatchedBy(func(o *datacatalog.CreateArtifactRequest) bool {
return true
}),
).Return(&datacatalog.CreateArtifactResponse{}, errors.New("generic error"))

newKey := sampleKey
newKey.InputReader = ir
or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil)
s, err := discovery.Put(ctx, newKey, or, catalog.Metadata{
WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{
Name: "test",
},
TaskExecutionIdentifier: nil,
})
assert.Error(t, err)
assert.Equal(t, core.CatalogCacheStatus_CACHE_PUT_FAILURE, s.GetCacheStatus())
assert.NotNil(t, s.GetMetadata())
})

t.Run("Create new cached execution with no inputs/outputs", func(t *testing.T) {
mockClient := &mocks.DataCatalogClient{}
catalogClient := &CatalogClient{
Expand Down Expand Up @@ -611,6 +678,70 @@ func TestCatalog_Update(t *testing.T) {
assert.Equal(t, taskID.NodeExecutionId.String(), sourceTID.NodeExecutionId.String())
})

t.Run("Overwrite non-existing execution", func(t *testing.T) {
ir := &mocks2.InputReader{}
ir.On("Get", mock.Anything).Return(sampleParameters, nil, nil)

mockClient := &mocks.DataCatalogClient{}
discovery := &CatalogClient{
client: mockClient,
}

mockClient.On("CreateDataset",
ctx,
mock.MatchedBy(func(o *datacatalog.CreateDatasetRequest) bool {
return true
}),
).Return(&datacatalog.CreateDatasetResponse{}, nil)

mockClient.On("UpdateArtifact", ctx, mock.Anything).Return(nil, status.New(codes.NotFound, "missing entity of type Artifact with identifier id").Err())

mockClient.On("CreateArtifact",
ctx,
mock.MatchedBy(func(o *datacatalog.CreateArtifactRequest) bool {
return true
}),
).Return(&datacatalog.CreateArtifactResponse{}, errors.New("generic error"))

taskID := &core.TaskExecutionIdentifier{
TaskId: &core.Identifier{
ResourceType: core.ResourceType_TASK,
Name: sampleKey.Identifier.Name,
Project: sampleKey.Identifier.Project,
Domain: sampleKey.Identifier.Domain,
Version: "version",
},
NodeExecutionId: &core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Name: "wf",
Project: "p1",
Domain: "d1",
},
NodeId: "unknown", // not set in Put request below --> defaults to "unknown"
},
RetryAttempt: 0,
}

newKey := sampleKey
newKey.InputReader = ir
or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil)
s, err := discovery.Update(ctx, newKey, or, catalog.Metadata{
WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{
Name: taskID.NodeExecutionId.ExecutionId.Name,
Domain: taskID.NodeExecutionId.ExecutionId.Domain,
Project: taskID.NodeExecutionId.ExecutionId.Project,
},
TaskExecutionIdentifier: &core.TaskExecutionIdentifier{
TaskId: &sampleKey.Identifier,
NodeExecutionId: taskID.NodeExecutionId,
RetryAttempt: 0,
},
})
assert.Error(t, err)
assert.Equal(t, core.CatalogCacheStatus_CACHE_PUT_FAILURE, s.GetCacheStatus())
assert.NotNil(t, s.GetMetadata())
})

t.Run("Overwrite non-existing execution", func(t *testing.T) {
ir := &mocks2.InputReader{}
ir.On("Get", mock.Anything).Return(sampleParameters, nil, nil)
Expand Down Expand Up @@ -702,6 +833,36 @@ func TestCatalog_Update(t *testing.T) {
assert.True(t, addTagCalled)
})

t.Run("Error while creating dataset", func(t *testing.T) {
ir := &mocks2.InputReader{}
ir.On("Get", mock.Anything).Return(sampleParameters, nil, nil)

mockClient := &mocks.DataCatalogClient{}
discovery := &CatalogClient{
client: mockClient,
}

mockClient.On("CreateDataset",
ctx,
mock.MatchedBy(func(o *datacatalog.CreateDatasetRequest) bool {
return true
}),
).Return(&datacatalog.CreateDatasetResponse{}, errors.New("generic error"))

newKey := sampleKey
newKey.InputReader = ir
or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil)
s, err := discovery.Update(ctx, newKey, or, catalog.Metadata{
WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{
Name: "test",
},
TaskExecutionIdentifier: nil,
})
assert.Error(t, err)
assert.Equal(t, core.CatalogCacheStatus_CACHE_PUT_FAILURE, s.GetCacheStatus())
assert.NotNil(t, s.GetMetadata())
})

t.Run("Error while overwriting execution", func(t *testing.T) {
ir := &mocks2.InputReader{}
ir.On("Get", mock.Anything).Return(sampleParameters, nil, nil)
Expand All @@ -714,7 +875,6 @@ func TestCatalog_Update(t *testing.T) {
mockClient.On("CreateDataset",
ctx,
mock.MatchedBy(func(o *datacatalog.CreateDatasetRequest) bool {
assert.True(t, proto.Equal(o.Dataset.Id, datasetID))
return true
}),
).Return(&datacatalog.CreateDatasetResponse{}, nil)
Expand All @@ -733,8 +893,8 @@ func TestCatalog_Update(t *testing.T) {
})
assert.Error(t, err)
assert.Equal(t, genericErr, err)
assert.Equal(t, core.CatalogCacheStatus_CACHE_DISABLED, s.GetCacheStatus())
assert.Nil(t, s.GetMetadata())
assert.Equal(t, core.CatalogCacheStatus_CACHE_PUT_FAILURE, s.GetCacheStatus())
assert.NotNil(t, s.GetMetadata())
})
}

Expand Down

0 comments on commit 311de70

Please sign in to comment.