Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Validate multiple input values in map task (#334)
Browse files Browse the repository at this point in the history
* validating multiple input values in map task and correctly populating for cache lookup

Signed-off-by: Daniel Rammer <[email protected]>

* added unit tests

Signed-off-by: Daniel Rammer <[email protected]>

---------

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Apr 4, 2023
1 parent 38e8a07 commit e84d27a
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 104 deletions.
46 changes: 30 additions & 16 deletions go/tasks/plugins/array/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,20 @@ func DetermineDiscoverability(ctx context.Context, tCtx core.TaskExecutionContex

size := -1
var literalCollection *idlCore.LiteralCollection
var discoveredInputName string
literals := make([][]*idlCore.Literal, 0)
discoveredInputNames := make([]string, 0)
for inputName, literal := range inputs.Literals {
if literalCollection = literal.GetCollection(); literalCollection != nil {
size = len(literal.GetCollection().Literals)
discoveredInputName = inputName
break
// validate length of input list
if size != -1 && size != len(literalCollection.Literals) {
state = state.SetPhase(arrayCore.PhasePermanentFailure, 0).SetReason("all maptask input lists must be the same length")
return state, nil
}

literals = append(literals, literalCollection.Literals)
discoveredInputNames = append(discoveredInputNames, inputName)

size = len(literalCollection.Literals)
}
}

Expand All @@ -105,7 +113,7 @@ func DetermineDiscoverability(ctx context.Context, tCtx core.TaskExecutionContex
arrayJobSize = int64(size)

// build input readers
inputReaders = ConstructStaticInputReaders(tCtx.InputReader(), literalCollection.Literals, discoveredInputName)
inputReaders = ConstructStaticInputReaders(tCtx.InputReader(), literals, discoveredInputNames)
}

if arrayJobSize > maxArrayJobSize {
Expand Down Expand Up @@ -242,16 +250,17 @@ func WriteToDiscovery(ctx context.Context, tCtx core.TaskExecutionContext, state
}

var literalCollection *idlCore.LiteralCollection
var discoveredInputName string
literals := make([][]*idlCore.Literal, 0)
discoveredInputNames := make([]string, 0)
for inputName, literal := range inputs.Literals {
if literalCollection = literal.GetCollection(); literalCollection != nil {
discoveredInputName = inputName
break
literals = append(literals, literalCollection.Literals)
discoveredInputNames = append(discoveredInputNames, inputName)
}
}

// build input readers
inputReaders = ConstructStaticInputReaders(tCtx.InputReader(), literalCollection.Literals, discoveredInputName)
inputReaders = ConstructStaticInputReaders(tCtx.InputReader(), literals, discoveredInputNames)
}

// output reader
Expand Down Expand Up @@ -470,14 +479,19 @@ func ConstructCatalogReaderWorkItems(ctx context.Context, taskReader core.TaskRe

// ConstructStaticInputReaders constructs input readers that comply with the io.InputReader interface but have their
// inputs already populated.
func ConstructStaticInputReaders(inputPaths io.InputFilePaths, inputs []*idlCore.Literal, inputName string) []io.InputReader {
func ConstructStaticInputReaders(inputPaths io.InputFilePaths, inputs [][]*idlCore.Literal, inputNames []string) []io.InputReader {
inputReaders := make([]io.InputReader, 0, len(inputs))
for i := 0; i < len(inputs); i++ {
inputReaders = append(inputReaders, NewStaticInputReader(inputPaths, &idlCore.LiteralMap{
Literals: map[string]*idlCore.Literal{
inputName: inputs[i],
},
}))
if len(inputs) == 0 {
return inputReaders
}

for i := 0; i < len(inputs[0]); i++ {
literals := make(map[string]*idlCore.Literal)
for j := 0; j < len(inputNames); j++ {
literals[inputNames[j]] = inputs[j][i]
}

inputReaders = append(inputReaders, NewStaticInputReader(inputPaths, &idlCore.LiteralMap{Literals: literals}))
}

return inputReaders
Expand Down
Loading

0 comments on commit e84d27a

Please sign in to comment.