Skip to content

Commit

Permalink
Skip the structured dataset format check when the expected format is …
Browse files Browse the repository at this point in the history
…empty (flyteorg#483)

Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw authored Sep 20, 2022
1 parent b2c0b97 commit 0d961dc
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 13 deletions.
26 changes: 13 additions & 13 deletions flytepropeller/pkg/compiler/validators/typing.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,12 @@ type schemaTypeChecker struct {
// Schemas are more complex types in the Flyte ecosystem. A schema is considered castable in the following
// cases.
//
// 1. The downstream schema has no column types specified. In such a case, it accepts all schema input since it is
// generic.
// 1. The downstream schema has no column types specified. In such a case, it accepts all schema input since it is
// generic.
//
// 2. The downstream schema has a subset of the upstream columns and they match perfectly.
//
// 3. The upstream type can be Schema type or structured dataset type
// 2. The downstream schema has a subset of the upstream columns and they match perfectly.
//
// 3. The upstream type can be Schema type or structured dataset type
func (t schemaTypeChecker) CastsFrom(upstreamType *flyte.LiteralType) bool {
schemaType := upstreamType.GetSchema()
structuredDatasetType := upstreamType.GetStructuredDatasetType()
Expand All @@ -130,13 +129,12 @@ type structuredDatasetChecker struct {
// CastsFrom for Structured dataset are more complex types in the Flyte ecosystem. A structured dataset is considered
// castable in the following cases:
//
// 1. The downstream structured dataset has no column types specified. In such a case, it accepts all structured dataset input since it is
// generic.
//
// 2. The downstream structured dataset has a subset of the upstream structured dataset columns and they match perfectly.
// 1. The downstream structured dataset has no column types specified. In such a case, it accepts all structured dataset input since it is
// generic.
//
// 3. The upstream type can be Schema type or structured dataset type
// 2. The downstream structured dataset has a subset of the upstream structured dataset columns and they match perfectly.
//
// 3. The upstream type can be Schema type or structured dataset type
func (t structuredDatasetChecker) CastsFrom(upstreamType *flyte.LiteralType) bool {
// structured datasets are nullable
if isNoneType(upstreamType) {
Expand All @@ -154,9 +152,6 @@ func (t structuredDatasetChecker) CastsFrom(upstreamType *flyte.LiteralType) boo
}
return structuredDatasetCastFromSchema(schemaType, t.literalType.GetStructuredDatasetType())
}
if !strings.EqualFold(structuredDatasetType.Format, t.literalType.GetStructuredDatasetType().Format) {
return false
}
return structuredDatasetCastFromStructuredDataset(structuredDatasetType, t.literalType.GetStructuredDatasetType())
}

Expand Down Expand Up @@ -226,6 +221,11 @@ func (t unionTypeChecker) CastsFrom(upstreamType *flyte.LiteralType) bool {

// Upstream (structuredDatasetType) -> downstream (structuredDatasetType)
func structuredDatasetCastFromStructuredDataset(upstream *flyte.StructuredDatasetType, downstream *flyte.StructuredDatasetType) bool {
// Skip the format check here when format is empty. https://github.com/flyteorg/flyte/issues/2864
if len(upstream.Format) != 0 && len(downstream.Format) != 0 && !strings.EqualFold(upstream.Format, downstream.Format) {
return false
}

if len(upstream.Columns) == 0 || len(downstream.Columns) == 0 {
return true
}
Expand Down
18 changes: 18 additions & 0 deletions flytepropeller/pkg/compiler/validators/typing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,14 @@ func TestSchemaCasting(t *testing.T) {
}

func TestStructuredDatasetCasting(t *testing.T) {
emptyStructuredDataset := &core.LiteralType{
Type: &core.LiteralType_StructuredDatasetType{
StructuredDatasetType: &core.StructuredDatasetType{
Columns: []*core.StructuredDatasetType_DatasetColumn{},
Format: "",
},
},
}
genericStructuredDataset := &core.LiteralType{
Type: &core.LiteralType_StructuredDatasetType{
StructuredDatasetType: &core.StructuredDatasetType{
Expand Down Expand Up @@ -819,6 +827,16 @@ func TestStructuredDatasetCasting(t *testing.T) {
assert.False(t, castable, "StructuredDataset(a=Float) should not be castable to StructuredDataset(a=Integer, b=Collection)")
})

t.Run("GenericToEmptyFormat", func(t *testing.T) {
castable := AreTypesCastable(genericStructuredDataset, emptyStructuredDataset)
assert.True(t, castable, "StructuredDataset(format='Parquet') should be castable to StructuredDataset()")
})

t.Run("EmptyFormatToGeneric", func(t *testing.T) {
castable := AreTypesCastable(genericStructuredDataset, emptyStructuredDataset)
assert.True(t, castable, "StructuredDataset() should be castable to StructuredDataset(format='Parquet')")
})

t.Run("StructuredDatasetsAreNullable", func(t *testing.T) {
castable := AreTypesCastable(
&core.LiteralType{
Expand Down

0 comments on commit 0d961dc

Please sign in to comment.