Skip to content

Commit

Permalink
RawDataOutput directory for every task execution (flyteorg#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ketan Umare authored Mar 25, 2020
1 parent 635416c commit c97d1d5
Show file tree
Hide file tree
Showing 23 changed files with 491 additions and 41 deletions.
15 changes: 14 additions & 1 deletion flyteplugins/go/tasks/pluginmachinery/io/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,22 @@ type OutputReader interface {
Read(ctx context.Context) (*core.LiteralMap, *ExecutionError, error)
}

// All paths where various outputs produced by the task can be placed, such that the framework can directly access them.
// RawOutputPaths is the actual path where the data produced by a task can be placed. It is completely optional. The advantage
// of using this path is to provide exactly once semantics. It is guaranteed that this path is unique for every new execution
// of a task (across retries etc) and is constant for a specific execution.
// As of 02/20/2020 Flytekit generates this path randomly for S3. This structure proposes migration of this logic to
// FlytePluginMachinery so that it can be used more universally outside of Flytekit.
type RawOutputPaths interface {
// This is prefix (blob store prefix or directory) where all data produced can be stored.
GetRawOutputPrefix() storage.DataReference
}

// All paths where various meta outputs produced by the task can be placed, such that the framework can directly access them.
// All paths are reperesented using storage.DataReference -> an URN for the configured storage backend
type OutputFilePaths interface {
// RawOutputPaths are available with OutputFilePaths
RawOutputPaths

// A path to a directory or prefix that contains all execution metadata for this execution
GetOutputPrefixPath() storage.DataReference
// A fully qualified path (URN) to where the framework expects the output to exist in the configured storage backend
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/io/mocks/output_writer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 45 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/io/mocks/raw_output_paths.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/ioutils/data_sharder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package ioutils

import "context"

// This interface allows shard selection for OutputSandbox.
type ShardSelector interface {
GetShardPrefix(ctx context.Context, s []byte) (string, error)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package ioutils

import (
"context"
"hash/fnv"
"strings"

"github.com/pkg/errors"
)

// Generates the entire latin alphabet and appends it to the passed in array and returns the new array
func GenerateAlphabet(b []rune) []rune {
for i := 'a'; i <= 'z'; i++ {
b = append(b, i)
}
return b
}

// Generates all arabic numerals and appends to the passed in array and returns the new array/slice
func GenerateArabicNumerals(b []rune) []rune {
for i := '0'; i <= '9'; i++ {
b = append(b, i)
}
return b
}

func createAlphabetAndNumerals() []rune {
b := make([]rune, 0, 36)
b = GenerateAlphabet(b)
return GenerateArabicNumerals(b)
}

// this sharder distributes data into one of the precomputed buckets. The bucket is deterministically determined given the input s
type PrecomputedShardSelector struct {
precomputedPrefixes []string
buckets uint32
}

// Generates deterministic shard id for the given string s
func (d *PrecomputedShardSelector) GetShardPrefix(_ context.Context, s []byte) (string, error) {
h := fnv.New32a()
_, err := h.Write(s)
if err != nil {
return "", errors.Wrap(err, "failed to create shard prefix, reason hash failure.")
}
idx := h.Sum32() % d.buckets
return d.precomputedPrefixes[idx], nil
}

// Creates a PrecomputedShardSelector with 36*36 unique shards. Each shard is of the format {[0-9a-z][0-9a-z]}, i.e. 2 character long.
func NewBase36PrefixShardSelector(ctx context.Context) (ShardSelector, error) {
permittedChars := createAlphabetAndNumerals()
n := len(permittedChars)
precomputedPrefixes := make([]string, 0, n*n)
for _, c1 := range permittedChars {
for _, c2 := range permittedChars {
sb := strings.Builder{}
sb.WriteRune(c1)
sb.WriteRune(c2)
precomputedPrefixes = append(precomputedPrefixes, sb.String())
}
}

return NewConstantShardSelector(precomputedPrefixes), nil
}

// uses the given shards to select a shard
func NewConstantShardSelector(shards []string) ShardSelector {
return &PrecomputedShardSelector{
precomputedPrefixes: shards,
buckets: uint32(len(shards)),
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package ioutils

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
)

func TestPrecomputedShardSelector_GetShardPrefix(t *testing.T) {
ctx := context.TODO()
t.Run("single-shard", func(t *testing.T) {
ss := PrecomputedShardSelector{precomputedPrefixes: []string{"x"}, buckets: 1}
p, err := ss.GetShardPrefix(ctx, []byte("abc"))
assert.NoError(t, err)
assert.Equal(t, "x", p)
})

t.Run("two-shards", func(t *testing.T) {
ss := PrecomputedShardSelector{precomputedPrefixes: []string{"x", "y"}, buckets: 2}
p, err := ss.GetShardPrefix(ctx, []byte("abc"))
assert.NoError(t, err)
assert.Equal(t, "y", p)
p, err = ss.GetShardPrefix(ctx, []byte("xyz"))
assert.NoError(t, err)
assert.Equal(t, "x", p)
})
}

func TestGenerateAlphabet(t *testing.T) {
var b []rune
b = GenerateAlphabet(b)

assert.Equal(t, 26, len(b))
assert.Equal(t, 'a', b[0])
assert.Equal(t, 'z', b[25])

// Additive
b = GenerateAlphabet(b)

assert.Equal(t, 52, len(b))
assert.Equal(t, 'a', b[26])
assert.Equal(t, 'z', b[51])
}

func TestGenerateArabicNumerals(t *testing.T) {
var b []rune
b = GenerateArabicNumerals(b)

assert.Equal(t, 10, len(b))
assert.Equal(t, '0', b[0])
assert.Equal(t, '9', b[9])

// Additive
b = GenerateArabicNumerals(b)
assert.Equal(t, 20, len(b))
assert.Equal(t, '0', b[0])
assert.Equal(t, '9', b[9])
assert.Equal(t, '0', b[10])
assert.Equal(t, '9', b[19])
}
Loading

0 comments on commit c97d1d5

Please sign in to comment.