diff --git a/flytepropeller/pkg/compiler/validators/typing.go b/flytepropeller/pkg/compiler/validators/typing.go index 0d941f795d..9f9c1321c7 100644 --- a/flytepropeller/pkg/compiler/validators/typing.go +++ b/flytepropeller/pkg/compiler/validators/typing.go @@ -97,13 +97,12 @@ type schemaTypeChecker struct { // Schemas are more complex types in the Flyte ecosystem. A schema is considered castable in the following // cases. // -// 1. The downstream schema has no column types specified. In such a case, it accepts all schema input since it is -// generic. +// 1. The downstream schema has no column types specified. In such a case, it accepts all schema input since it is +// generic. // -// 2. The downstream schema has a subset of the upstream columns and they match perfectly. -// -// 3. The upstream type can be Schema type or structured dataset type +// 2. The downstream schema has a subset of the upstream columns and they match perfectly. // +// 3. The upstream type can be Schema type or structured dataset type func (t schemaTypeChecker) CastsFrom(upstreamType *flyte.LiteralType) bool { schemaType := upstreamType.GetSchema() structuredDatasetType := upstreamType.GetStructuredDatasetType() @@ -130,13 +129,12 @@ type structuredDatasetChecker struct { // CastsFrom for Structured dataset are more complex types in the Flyte ecosystem. A structured dataset is considered // castable in the following cases: // -// 1. The downstream structured dataset has no column types specified. In such a case, it accepts all structured dataset input since it is -// generic. -// -// 2. The downstream structured dataset has a subset of the upstream structured dataset columns and they match perfectly. +// 1. The downstream structured dataset has no column types specified. In such a case, it accepts all structured dataset input since it is +// generic. // -// 3. The upstream type can be Schema type or structured dataset type +// 2. The downstream structured dataset has a subset of the upstream structured dataset columns and they match perfectly. // +// 3. The upstream type can be Schema type or structured dataset type func (t structuredDatasetChecker) CastsFrom(upstreamType *flyte.LiteralType) bool { // structured datasets are nullable if isNoneType(upstreamType) { @@ -154,9 +152,6 @@ func (t structuredDatasetChecker) CastsFrom(upstreamType *flyte.LiteralType) boo } return structuredDatasetCastFromSchema(schemaType, t.literalType.GetStructuredDatasetType()) } - if !strings.EqualFold(structuredDatasetType.Format, t.literalType.GetStructuredDatasetType().Format) { - return false - } return structuredDatasetCastFromStructuredDataset(structuredDatasetType, t.literalType.GetStructuredDatasetType()) } @@ -226,6 +221,11 @@ func (t unionTypeChecker) CastsFrom(upstreamType *flyte.LiteralType) bool { // Upstream (structuredDatasetType) -> downstream (structuredDatasetType) func structuredDatasetCastFromStructuredDataset(upstream *flyte.StructuredDatasetType, downstream *flyte.StructuredDatasetType) bool { + // Skip the format check here when format is empty. https://github.com/flyteorg/flyte/issues/2864 + if len(upstream.Format) != 0 && len(downstream.Format) != 0 && !strings.EqualFold(upstream.Format, downstream.Format) { + return false + } + if len(upstream.Columns) == 0 || len(downstream.Columns) == 0 { return true } diff --git a/flytepropeller/pkg/compiler/validators/typing_test.go b/flytepropeller/pkg/compiler/validators/typing_test.go index 132ee0ecdd..a4b6a7a3a7 100644 --- a/flytepropeller/pkg/compiler/validators/typing_test.go +++ b/flytepropeller/pkg/compiler/validators/typing_test.go @@ -690,6 +690,14 @@ func TestSchemaCasting(t *testing.T) { } func TestStructuredDatasetCasting(t *testing.T) { + emptyStructuredDataset := &core.LiteralType{ + Type: &core.LiteralType_StructuredDatasetType{ + StructuredDatasetType: &core.StructuredDatasetType{ + Columns: []*core.StructuredDatasetType_DatasetColumn{}, + Format: "", + }, + }, + } genericStructuredDataset := &core.LiteralType{ Type: &core.LiteralType_StructuredDatasetType{ StructuredDatasetType: &core.StructuredDatasetType{ @@ -819,6 +827,16 @@ func TestStructuredDatasetCasting(t *testing.T) { assert.False(t, castable, "StructuredDataset(a=Float) should not be castable to StructuredDataset(a=Integer, b=Collection)") }) + t.Run("GenericToEmptyFormat", func(t *testing.T) { + castable := AreTypesCastable(genericStructuredDataset, emptyStructuredDataset) + assert.True(t, castable, "StructuredDataset(format='Parquet') should be castable to StructuredDataset()") + }) + + t.Run("EmptyFormatToGeneric", func(t *testing.T) { + castable := AreTypesCastable(genericStructuredDataset, emptyStructuredDataset) + assert.True(t, castable, "StructuredDataset() should be castable to StructuredDataset(format='Parquet')") + }) + t.Run("StructuredDatasetsAreNullable", func(t *testing.T) { castable := AreTypesCastable( &core.LiteralType{