From 0f404b6bd3b944ca10771c61baa627c6893cfd5d Mon Sep 17 00:00:00 2001 From: Bernhard Stadlbauer <11799671+bstadlbauer@users.noreply.github.com> Date: Fri, 16 Jun 2023 12:16:02 +0200 Subject: [PATCH 1/5] Add input hash to workItemID Signed-off-by: Bernhard Stadlbauer <11799671+bstadlbauer@users.noreply.github.com> --- go.mod | 1 + go.sum | 2 + .../catalog/async_client_impl.go | 23 +- go/tasks/pluginmachinery/catalog/hashing.go | 75 +++ .../pluginmachinery/catalog/hashing_test.go | 595 ++++++++++++++++++ 5 files changed, 694 insertions(+), 2 deletions(-) create mode 100644 go/tasks/pluginmachinery/catalog/hashing.go create mode 100644 go/tasks/pluginmachinery/catalog/hashing_test.go diff --git a/go.mod b/go.mod index 919ed121f..cc36e109b 100644 --- a/go.mod +++ b/go.mod @@ -63,6 +63,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.0.0 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.0.0 // indirect github.com/aws/smithy-go v1.1.0 // indirect + github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect diff --git a/go.sum b/go.sum index 7c1295f2c..d0286e09e 100644 --- a/go.sum +++ b/go.sum @@ -151,6 +151,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.0.0/go.mod h1:5f+cELGATgill5Pu3/vK3E github.com/aws/smithy-go v1.0.0/go.mod h1:EzMw8dbp/YJL4A5/sbhGddag+NPT7q084agLbB9LgIw= github.com/aws/smithy-go v1.1.0 h1:D6CSsM3gdxaGaqXnPgOBCeL6Mophqzu7KJOu7zW78sU= github.com/aws/smithy-go v1.1.0/go.mod h1:EzMw8dbp/YJL4A5/sbhGddag+NPT7q084agLbB9LgIw= +github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 h1:VRtJdDi2lqc3MFwmouppm2jlm6icF+7H3WYKpLENMTo= +github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1/go.mod h1:jvdWlw8vowVGnZqSDC7yhPd7AifQeQbRDkZcQXV2nRg= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= diff --git a/go/tasks/pluginmachinery/catalog/async_client_impl.go b/go/tasks/pluginmachinery/catalog/async_client_impl.go index 24deca5a8..58edae021 100644 --- a/go/tasks/pluginmachinery/catalog/async_client_impl.go +++ b/go/tasks/pluginmachinery/catalog/async_client_impl.go @@ -14,11 +14,14 @@ import ( "github.com/flyteorg/flytestdlib/errors" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/workqueue" + + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" ) const specialEncoderKey = "abcdefghijklmnopqrstuvwxyz123456" var base32Encoder = base32.NewEncoding(specialEncoderKey).WithPadding(base32.NoPadding) +var emptyLiteralMap = core.LiteralMap{Literals: map[string]*core.Literal{}} // An async-client for catalog that can queue download and upload requests on workqueues. type AsyncClientImpl struct { @@ -41,6 +44,18 @@ func consistentHash(str string) (string, error) { return base32Encoder.EncodeToString(b), nil } +func hashInputs(ctx context.Context, key Key) (string, error) { + inputs := &core.LiteralMap{} + if key.TypedInterface.Inputs != nil { + retInputs, err := key.InputReader.Get(ctx) + if err != nil { + return "", err + } + inputs = retInputs + } + return HashLiteralMap(ctx, inputs) +} + func (c AsyncClientImpl) Download(ctx context.Context, requests ...DownloadRequest) (outputFuture DownloadFuture, err error) { status := ResponseStatusReady cachedResults := bitarray.NewBitSet(uint(len(requests))) @@ -95,8 +110,12 @@ func (c AsyncClientImpl) Upload(ctx context.Context, requests ...UploadRequest) status := ResponseStatusReady var respErr error for idx, request := range requests { - workItemID := formatWorkItemID(request.Key, idx, "") - err := c.Writer.Queue(ctx, workItemID, NewWriterWorkItem( + inputHash, err := hashInputs(ctx, request.Key) + if err != nil { + return nil, errors.Wrapf(ErrSystemError, err, "Failed to hash inputs for item: %v", request.Key) + } + workItemID := formatWorkItemID(request.Key, idx, inputHash) + err = c.Writer.Queue(ctx, workItemID, NewWriterWorkItem( request.Key, request.ArtifactData, request.ArtifactMetadata)) diff --git a/go/tasks/pluginmachinery/catalog/hashing.go b/go/tasks/pluginmachinery/catalog/hashing.go new file mode 100644 index 000000000..9ef06d9c5 --- /dev/null +++ b/go/tasks/pluginmachinery/catalog/hashing.go @@ -0,0 +1,75 @@ +package catalog + +import ( + "context" + "encoding/base64" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flytestdlib/pbhash" +) + +// Hashify a literal, in other words, produce a new literal where the corresponding value is removed in case +// the literal hash is set. +func hashify(literal *core.Literal) *core.Literal { + // Two recursive cases: + // 1. A collection of literals or + // 2. A map of literals + + if literal.GetCollection() != nil { + literals := literal.GetCollection().Literals + literalsHash := make([]*core.Literal, 0) + for _, lit := range literals { + literalsHash = append(literalsHash, hashify(lit)) + } + return &core.Literal{ + Value: &core.Literal_Collection{ + Collection: &core.LiteralCollection{ + Literals: literalsHash, + }, + }, + } + } + if literal.GetMap() != nil { + literalsMap := make(map[string]*core.Literal) + for key, lit := range literal.GetMap().Literals { + literalsMap[key] = hashify(lit) + } + return &core.Literal{ + Value: &core.Literal_Map{ + Map: &core.LiteralMap{ + Literals: literalsMap, + }, + }, + } + } + + // And a base case that consists of a scalar, where the hash might be set + if literal.GetHash() != "" { + return &core.Literal{ + Hash: literal.GetHash(), + } + } + return literal +} + +func HashLiteralMap(ctx context.Context, literalMap *core.LiteralMap) (string, error) { + if literalMap == nil || len(literalMap.Literals) == 0 { + literalMap = &emptyLiteralMap + } + + // Hashify, i.e. generate a copy of the literal map where each literal value is removed + // in case the corresponding hash is set. + hashifiedLiteralMap := make(map[string]*core.Literal, len(literalMap.Literals)) + for name, literal := range literalMap.Literals { + hashifiedLiteralMap[name] = hashify(literal) + } + hashifiedInputs := &core.LiteralMap{ + Literals: hashifiedLiteralMap, + } + + inputsHash, err := pbhash.ComputeHash(ctx, hashifiedInputs) + if err != nil { + return "", err + } + + return base64.RawURLEncoding.EncodeToString(inputsHash), nil +} diff --git a/go/tasks/pluginmachinery/catalog/hashing_test.go b/go/tasks/pluginmachinery/catalog/hashing_test.go new file mode 100644 index 000000000..fae0606d0 --- /dev/null +++ b/go/tasks/pluginmachinery/catalog/hashing_test.go @@ -0,0 +1,595 @@ +package catalog + +import ( + "context" + "testing" + + "github.com/flyteorg/flyteidl/clients/go/coreutils" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/stretchr/testify/assert" +) + +func TestHashLiteralMap_LiteralsWithHashSet(t *testing.T) { + tests := []struct { + name string + literal *core.Literal + expectedLiteral *core.Literal + }{ + { + name: "single literal where hash is not set", + literal: coreutils.MustMakeLiteral(42), + expectedLiteral: coreutils.MustMakeLiteral(42), + }, + { + name: "single literal containing hash", + literal: &core.Literal{ + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_StructuredDataset{ + StructuredDataset: &core.StructuredDataset{ + Uri: "my-blob-stora://some-address", + Metadata: &core.StructuredDatasetMetadata{ + StructuredDatasetType: &core.StructuredDatasetType{ + Format: "my-columnar-data-format", + }, + }, + }, + }, + }, + }, + Hash: "abcde", + }, + expectedLiteral: &core.Literal{ + Value: nil, + Hash: "abcde", + }, + }, + { + name: "list of literals containing a single item where literal sets its hash", + literal: &core.Literal{ + Value: &core.Literal_Collection{ + Collection: &core.LiteralCollection{ + Literals: []*core.Literal{ + { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_StructuredDataset{ + StructuredDataset: &core.StructuredDataset{ + Uri: "my-blob-stora://some-address", + Metadata: &core.StructuredDatasetMetadata{ + StructuredDatasetType: &core.StructuredDatasetType{ + Format: "my-columnar-data-format", + }, + }, + }, + }, + }, + }, + Hash: "hash1", + }, + }, + }, + }, + }, + expectedLiteral: &core.Literal{ + Value: &core.Literal_Collection{ + Collection: &core.LiteralCollection{ + Literals: []*core.Literal{ + { + Value: nil, + Hash: "hash1", + }, + }, + }, + }, + }, + }, + { + name: "list of literals containing two items where each literal sets its hash", + literal: &core.Literal{ + Value: &core.Literal_Collection{ + Collection: &core.LiteralCollection{ + Literals: []*core.Literal{ + { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_StructuredDataset{ + StructuredDataset: &core.StructuredDataset{ + Uri: "my-blob-stora://some-address", + Metadata: &core.StructuredDatasetMetadata{ + StructuredDatasetType: &core.StructuredDatasetType{ + Format: "my-columnar-data-format", + }, + }, + }, + }, + }, + }, + Hash: "hash1", + }, + { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_StructuredDataset{ + StructuredDataset: &core.StructuredDataset{ + Uri: "my-blob-stora://another-address", + Metadata: &core.StructuredDatasetMetadata{ + StructuredDatasetType: &core.StructuredDatasetType{ + Format: "my-columnar-data-format", + }, + }, + }, + }, + }, + }, + Hash: "hash2", + }, + }, + }, + }, + }, + expectedLiteral: &core.Literal{ + Value: &core.Literal_Collection{ + Collection: &core.LiteralCollection{ + Literals: []*core.Literal{ + { + Value: nil, + Hash: "hash1", + }, + { + Value: nil, + Hash: "hash2", + }, + }, + }, + }, + }, + }, + { + name: "list of literals containing two items where only one literal has its hash set", + literal: &core.Literal{ + Value: &core.Literal_Collection{ + Collection: &core.LiteralCollection{ + Literals: []*core.Literal{ + { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_StructuredDataset{ + StructuredDataset: &core.StructuredDataset{ + Uri: "my-blob-stora://some-address", + Metadata: &core.StructuredDatasetMetadata{ + StructuredDatasetType: &core.StructuredDatasetType{ + Format: "my-columnar-data-format", + }, + }, + }, + }, + }, + }, + Hash: "hash1", + }, + { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_StructuredDataset{ + StructuredDataset: &core.StructuredDataset{ + Uri: "my-blob-stora://another-address", + Metadata: &core.StructuredDatasetMetadata{ + StructuredDatasetType: &core.StructuredDatasetType{ + Format: "my-columnar-data-format", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + expectedLiteral: &core.Literal{ + Value: &core.Literal_Collection{ + Collection: &core.LiteralCollection{ + Literals: []*core.Literal{ + { + Value: nil, + Hash: "hash1", + }, + { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_StructuredDataset{ + StructuredDataset: &core.StructuredDataset{ + Uri: "my-blob-stora://another-address", + Metadata: &core.StructuredDatasetMetadata{ + StructuredDatasetType: &core.StructuredDatasetType{ + Format: "my-columnar-data-format", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "map of literals containing a single item where literal sets its hash", + literal: &core.Literal{ + Value: &core.Literal_Map{ + Map: &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "literal1": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_StructuredDataset{ + StructuredDataset: &core.StructuredDataset{ + Uri: "my-blob-stora://some-address", + Metadata: &core.StructuredDatasetMetadata{ + StructuredDatasetType: &core.StructuredDatasetType{ + Format: "my-columnar-data-format", + }, + }, + }, + }, + }, + }, + Hash: "hash-42", + }, + }, + }, + }, + }, + expectedLiteral: &core.Literal{ + Value: &core.Literal_Map{ + Map: &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "literal1": { + Value: nil, + Hash: "hash-42", + }, + }, + }, + }, + }, + }, + { + name: "map of literals containing a three items where only one literal sets its hash", + literal: &core.Literal{ + Value: &core.Literal_Map{ + Map: &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "literal1": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_StructuredDataset{ + StructuredDataset: &core.StructuredDataset{ + Uri: "my-blob-stora://some-address", + Metadata: &core.StructuredDatasetMetadata{ + StructuredDatasetType: &core.StructuredDatasetType{ + Format: "my-columnar-data-format", + }, + }, + }, + }, + }, + }, + }, + "literal2-set-its-hash": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_StructuredDataset{ + StructuredDataset: &core.StructuredDataset{ + Uri: "my-blob-stora://some-address-for-literal-2", + Metadata: &core.StructuredDatasetMetadata{ + StructuredDatasetType: &core.StructuredDatasetType{ + Format: "my-columnar-data-format", + }, + }, + }, + }, + }, + }, + Hash: "literal-2-hash", + }, + "literal3": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_StructuredDataset{ + StructuredDataset: &core.StructuredDataset{ + Uri: "my-blob-stora://some-address-for-literal-3", + Metadata: &core.StructuredDatasetMetadata{ + StructuredDatasetType: &core.StructuredDatasetType{ + Format: "my-columnar-data-format", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + expectedLiteral: &core.Literal{ + Value: &core.Literal_Map{ + Map: &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "literal1": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_StructuredDataset{ + StructuredDataset: &core.StructuredDataset{ + Uri: "my-blob-stora://some-address", + Metadata: &core.StructuredDatasetMetadata{ + StructuredDatasetType: &core.StructuredDatasetType{ + Format: "my-columnar-data-format", + }, + }, + }, + }, + }, + }, + }, + "literal2-set-its-hash": { + Value: nil, + Hash: "literal-2-hash", + }, + "literal3": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_StructuredDataset{ + StructuredDataset: &core.StructuredDataset{ + Uri: "my-blob-stora://some-address-for-literal-3", + Metadata: &core.StructuredDatasetMetadata{ + StructuredDatasetType: &core.StructuredDatasetType{ + Format: "my-columnar-data-format", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "list of map of literals containing a mixture of literals have their hashes set or not set", + literal: &core.Literal{ + Value: &core.Literal_Collection{ + Collection: &core.LiteralCollection{ + Literals: []*core.Literal{ + { + Value: &core.Literal_Map{ + Map: &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "literal1": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_StructuredDataset{ + StructuredDataset: &core.StructuredDataset{ + Uri: "my-blob-stora://some-address", + Metadata: &core.StructuredDatasetMetadata{ + StructuredDatasetType: &core.StructuredDatasetType{ + Format: "my-columnar-data-format", + }, + }, + }, + }, + }, + }, + }, + "literal2-set-its-hash": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_StructuredDataset{ + StructuredDataset: &core.StructuredDataset{ + Uri: "my-blob-stora://some-address-for-literal-2", + Metadata: &core.StructuredDatasetMetadata{ + StructuredDatasetType: &core.StructuredDatasetType{ + Format: "my-columnar-data-format", + }, + }, + }, + }, + }, + }, + Hash: "literal-2-hash", + }, + "literal3": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_StructuredDataset{ + StructuredDataset: &core.StructuredDataset{ + Uri: "my-blob-stora://some-address-for-literal-3", + Metadata: &core.StructuredDatasetMetadata{ + StructuredDatasetType: &core.StructuredDatasetType{ + Format: "my-columnar-data-format", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + Value: &core.Literal_Map{ + Map: &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "another-literal-1": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_StructuredDataset{ + StructuredDataset: &core.StructuredDataset{ + Uri: "my-blob-stora://some-address-for-another-literal-1", + Metadata: &core.StructuredDatasetMetadata{ + StructuredDatasetType: &core.StructuredDatasetType{ + Format: "my-columnar-data-format", + }, + }, + }, + }, + }, + }, + Hash: "another-literal-1-hash", + }, + "another-literal2-set-its-hash": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_StructuredDataset{ + StructuredDataset: &core.StructuredDataset{ + Uri: "my-blob-stora://some-address-for-literal-2", + Metadata: &core.StructuredDatasetMetadata{ + StructuredDatasetType: &core.StructuredDatasetType{ + Format: "my-columnar-data-format", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + expectedLiteral: &core.Literal{ + Value: &core.Literal_Collection{ + Collection: &core.LiteralCollection{ + Literals: []*core.Literal{ + { + Value: &core.Literal_Map{ + Map: &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "literal1": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_StructuredDataset{ + StructuredDataset: &core.StructuredDataset{ + Uri: "my-blob-stora://some-address", + Metadata: &core.StructuredDatasetMetadata{ + StructuredDatasetType: &core.StructuredDatasetType{ + Format: "my-columnar-data-format", + }, + }, + }, + }, + }, + }, + }, + "literal2-set-its-hash": { + Value: nil, + Hash: "literal-2-hash", + }, + "literal3": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_StructuredDataset{ + StructuredDataset: &core.StructuredDataset{ + Uri: "my-blob-stora://some-address-for-literal-3", + Metadata: &core.StructuredDatasetMetadata{ + StructuredDatasetType: &core.StructuredDatasetType{ + Format: "my-columnar-data-format", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + Value: &core.Literal_Map{ + Map: &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "another-literal-1": { + Value: nil, + Hash: "another-literal-1-hash", + }, + "another-literal2-set-its-hash": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_StructuredDataset{ + StructuredDataset: &core.StructuredDataset{ + Uri: "my-blob-stora://some-address-for-literal-2", + Metadata: &core.StructuredDatasetMetadata{ + StructuredDatasetType: &core.StructuredDatasetType{ + Format: "my-columnar-data-format", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expectedLiteral, hashify(tt.literal)) + + // Double-check that generating a tag is successful + literalMap := &core.LiteralMap{Literals: map[string]*core.Literal{"o0": tt.literal}} + hash, err := HashLiteralMap(context.TODO(), literalMap) + assert.NoError(t, err) + assert.NotEmpty(t, hash) + }) + } +} + +// Ensure the key order on the inputs generates the same hash +func TestInputValueSorted(t *testing.T) { + literalMap, err := coreutils.MakeLiteralMap(map[string]interface{}{"1": 1, "2": 2}) + assert.NoError(t, err) + + hash, err := HashLiteralMap(context.TODO(), literalMap) + assert.NoError(t, err) + assert.Equal(t, "GQid5LjHbakcW68DS3P2jp80QLbiF0olFHF2hTh5bg8", hash) + + literalMap, err = coreutils.MakeLiteralMap(map[string]interface{}{"2": 2, "1": 1}) + assert.NoError(t, err) + + hashDupe, err := HashLiteralMap(context.TODO(), literalMap) + assert.NoError(t, err) + assert.Equal(t, hashDupe, hash) +} + +// Ensure that empty inputs are hashed the same way +func TestNoInputValues(t *testing.T) { + hash, err := HashLiteralMap(context.TODO(), nil) + assert.NoError(t, err) + assert.Equal(t, "GKw-c0PwFokMUQ6T-TUmEWnZ4_VlQ2Qpgw-vCTT0-OQ", hash) + + hashDupe, err := HashLiteralMap(context.TODO(), &core.LiteralMap{Literals: nil}) + assert.NoError(t, err) + assert.Equal(t, "GKw-c0PwFokMUQ6T-TUmEWnZ4_VlQ2Qpgw-vCTT0-OQ", hashDupe) + assert.Equal(t, hashDupe, hash) +} From 15eb5a0dead692734936f89cc6274f51da5f238e Mon Sep 17 00:00:00 2001 From: Bernhard Stadlbauer <11799671+bstadlbauer@users.noreply.github.com> Date: Sat, 17 Jun 2023 14:38:56 +0200 Subject: [PATCH 2/5] Add test to ensure different IDs Signed-off-by: Bernhard Stadlbauer <11799671+bstadlbauer@users.noreply.github.com> --- .../catalog/async_client_impl_test.go | 95 ++++++++++++++++++- 1 file changed, 90 insertions(+), 5 deletions(-) diff --git a/go/tasks/pluginmachinery/catalog/async_client_impl_test.go b/go/tasks/pluginmachinery/catalog/async_client_impl_test.go index edbf99c99..2cbf1e775 100644 --- a/go/tasks/pluginmachinery/catalog/async_client_impl_test.go +++ b/go/tasks/pluginmachinery/catalog/async_client_impl_test.go @@ -2,6 +2,7 @@ package catalog import ( "context" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "reflect" "testing" @@ -13,6 +14,54 @@ import ( "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/workqueue" ) +var exampleInterface = &core.TypedInterface{ + Inputs: &core.VariableMap{ + Variables: map[string]*core.Variable{ + "a": { + Type: &core.LiteralType{ + Type: &core.LiteralType_Simple{ + Simple: core.SimpleType_INTEGER, + }, + }, + }, + }, + }, +} +var input1 = &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "a": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_Primitive{ + Primitive: &core.Primitive{ + Value: &core.Primitive_Integer{ + Integer: 1, + }, + }, + }, + }, + }, + }, + }, +} +var input2 = &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "a": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_Primitive{ + Primitive: &core.Primitive{ + Value: &core.Primitive_Integer{ + Integer: 2, + }, + }, + }, + }, + }, + }, + }, +} + func TestAsyncClientImpl_Download(t *testing.T) { ctx := context.Background() @@ -61,24 +110,50 @@ func TestAsyncClientImpl_Download(t *testing.T) { func TestAsyncClientImpl_Upload(t *testing.T) { ctx := context.Background() + inputHash1 := "{UNSPECIFIED {} [] 0}:-0-DNhkpTTPC5YDtRGb4yT-PFxgMSgHzHrKAQKgQGEfGRY" + inputHash2 := "{UNSPECIFIED {} [] 0}:-1-26M4dwarvBVJqJSUC4JC1GtRYgVBIAmQfsFSdLVMlAc" + q := &mocks.IndexedWorkQueue{} info := &mocks.WorkItemInfo{} info.OnItem().Return(NewReaderWorkItem(Key{}, &mocks2.OutputWriter{})) info.OnStatus().Return(workqueue.WorkStatusSucceeded) - q.OnGet("{UNSPECIFIED {} [] 0}:-0-").Return(info, true, nil) + q.OnGet(inputHash1).Return(info, true, nil) + q.OnGet(inputHash2).Return(info, true, nil) q.OnQueueMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil) + inputReader1 := &mocks2.InputReader{} + inputReader1.OnGetMatch(mock.Anything).Return(input1, nil) + inputReader2 := &mocks2.InputReader{} + inputReader2.OnGetMatch(mock.Anything).Return(input2, nil) + tests := []struct { name string requests []UploadRequest wantPutFuture UploadFuture wantErr bool }{ - {"UploadSucceeded", []UploadRequest{ - { - Key: Key{}, + { + "UploadSucceeded", + // The second request has the same Key.Identifier and Key.Cache version but a different + // Key.InputReader. This should lead to a different WorkItemID in the queue. + // See https://github.com/flyteorg/flyte/issues/3787 for more details + []UploadRequest{ + { + Key: Key{ + TypedInterface: *exampleInterface, + InputReader: inputReader1, + }, + }, + { + Key: Key{ + TypedInterface: *exampleInterface, + InputReader: inputReader2, + }, + }, }, - }, newUploadFuture(ResponseStatusReady, nil), false}, + newUploadFuture(ResponseStatusReady, nil), + false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -93,6 +168,16 @@ func TestAsyncClientImpl_Upload(t *testing.T) { if !reflect.DeepEqual(gotPutFuture, tt.wantPutFuture) { t.Errorf("AsyncClientImpl.Sidecar() = %v, want %v", gotPutFuture, tt.wantPutFuture) } + expectedWorkItemIDs := []string{inputHash1, inputHash2} + gottenWorkItemIDs := make([]string, 0) + for _, mockCall := range q.Calls { + if mockCall.Method == "Get" { + gottenWorkItemIDs = append(gottenWorkItemIDs, mockCall.Arguments[0].(string)) + } + } + if !reflect.DeepEqual(gottenWorkItemIDs, expectedWorkItemIDs) { + t.Errorf("Retrieved workitem IDs = %v, want %v", gottenWorkItemIDs, expectedWorkItemIDs) + } }) } } From 4ce80187caaa9e4d818460fde970d03dfdfaf5a0 Mon Sep 17 00:00:00 2001 From: Bernhard Stadlbauer <11799671+bstadlbauer@users.noreply.github.com> Date: Sat, 17 Jun 2023 16:39:01 +0200 Subject: [PATCH 3/5] Cleanup `make lint` Signed-off-by: Bernhard Stadlbauer <11799671+bstadlbauer@users.noreply.github.com> --- go/tasks/pluginmachinery/catalog/async_client_impl_test.go | 3 ++- go/tasks/pluginmachinery/catalog/hashing.go | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go/tasks/pluginmachinery/catalog/async_client_impl_test.go b/go/tasks/pluginmachinery/catalog/async_client_impl_test.go index 2cbf1e775..7ffbdb9f8 100644 --- a/go/tasks/pluginmachinery/catalog/async_client_impl_test.go +++ b/go/tasks/pluginmachinery/catalog/async_client_impl_test.go @@ -2,10 +2,11 @@ package catalog import ( "context" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "reflect" "testing" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + mocks2 "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/workqueue/mocks" "github.com/flyteorg/flytestdlib/bitarray" diff --git a/go/tasks/pluginmachinery/catalog/hashing.go b/go/tasks/pluginmachinery/catalog/hashing.go index 9ef06d9c5..84ca0f63d 100644 --- a/go/tasks/pluginmachinery/catalog/hashing.go +++ b/go/tasks/pluginmachinery/catalog/hashing.go @@ -3,6 +3,7 @@ package catalog import ( "context" "encoding/base64" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flytestdlib/pbhash" ) From 30ca1642ce7fe834781ebad7916bed5d90a84ecf Mon Sep 17 00:00:00 2001 From: Bernhard Stadlbauer <11799671+bstadlbauer@users.noreply.github.com> Date: Tue, 27 Jun 2023 09:46:13 +0200 Subject: [PATCH 4/5] Move `emptyLiteralMap` to `hashing.go` Signed-off-by: Bernhard Stadlbauer <11799671+bstadlbauer@users.noreply.github.com> --- go/tasks/pluginmachinery/catalog/async_client_impl.go | 1 - go/tasks/pluginmachinery/catalog/hashing.go | 2 ++ 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/go/tasks/pluginmachinery/catalog/async_client_impl.go b/go/tasks/pluginmachinery/catalog/async_client_impl.go index 58edae021..834cc8b55 100644 --- a/go/tasks/pluginmachinery/catalog/async_client_impl.go +++ b/go/tasks/pluginmachinery/catalog/async_client_impl.go @@ -21,7 +21,6 @@ import ( const specialEncoderKey = "abcdefghijklmnopqrstuvwxyz123456" var base32Encoder = base32.NewEncoding(specialEncoderKey).WithPadding(base32.NoPadding) -var emptyLiteralMap = core.LiteralMap{Literals: map[string]*core.Literal{}} // An async-client for catalog that can queue download and upload requests on workqueues. type AsyncClientImpl struct { diff --git a/go/tasks/pluginmachinery/catalog/hashing.go b/go/tasks/pluginmachinery/catalog/hashing.go index 84ca0f63d..79ba05a2b 100644 --- a/go/tasks/pluginmachinery/catalog/hashing.go +++ b/go/tasks/pluginmachinery/catalog/hashing.go @@ -8,6 +8,8 @@ import ( "github.com/flyteorg/flytestdlib/pbhash" ) +var emptyLiteralMap = core.LiteralMap{Literals: map[string]*core.Literal{}} + // Hashify a literal, in other words, produce a new literal where the corresponding value is removed in case // the literal hash is set. func hashify(literal *core.Literal) *core.Literal { From 7833836e396cd899dabc6827b52bd6709e13c17e Mon Sep 17 00:00:00 2001 From: Bernhard Stadlbauer <11799671+bstadlbauer@users.noreply.github.com> Date: Tue, 27 Jun 2023 09:57:32 +0200 Subject: [PATCH 5/5] Fix import ordering Signed-off-by: Bernhard Stadlbauer <11799671+bstadlbauer@users.noreply.github.com> --- go/tasks/pluginmachinery/catalog/async_client_impl.go | 10 +++------- .../pluginmachinery/catalog/async_client_impl_test.go | 4 +--- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/go/tasks/pluginmachinery/catalog/async_client_impl.go b/go/tasks/pluginmachinery/catalog/async_client_impl.go index 834cc8b55..886e0fabc 100644 --- a/go/tasks/pluginmachinery/catalog/async_client_impl.go +++ b/go/tasks/pluginmachinery/catalog/async_client_impl.go @@ -7,15 +7,11 @@ import ( "hash/fnv" "reflect" - "github.com/flyteorg/flytestdlib/promutils" - + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/workqueue" "github.com/flyteorg/flytestdlib/bitarray" - "github.com/flyteorg/flytestdlib/errors" - - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/workqueue" - - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flytestdlib/promutils" ) const specialEncoderKey = "abcdefghijklmnopqrstuvwxyz123456" diff --git a/go/tasks/pluginmachinery/catalog/async_client_impl_test.go b/go/tasks/pluginmachinery/catalog/async_client_impl_test.go index 7ffbdb9f8..4255a97a4 100644 --- a/go/tasks/pluginmachinery/catalog/async_client_impl_test.go +++ b/go/tasks/pluginmachinery/catalog/async_client_impl_test.go @@ -6,13 +6,11 @@ import ( "testing" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" - mocks2 "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/workqueue" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/workqueue/mocks" "github.com/flyteorg/flytestdlib/bitarray" "github.com/stretchr/testify/mock" - - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/workqueue" ) var exampleInterface = &core.TypedInterface{