Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/mao3267/flyte into fix/fl…
Browse files Browse the repository at this point in the history
…yteorg#5489-dataclass-mismatch
  • Loading branch information
mao3267 committed Nov 8, 2024
2 parents aa4d98e + b5f23a6 commit b282e5f
Show file tree
Hide file tree
Showing 114 changed files with 2,090 additions and 799 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ define PIP_COMPILE
pip-compile $(1) --upgrade --verbose --resolver=backtracking --annotation-style=line
endef

GIT_VERSION := $(shell git describe --always --tags)
GIT_VERSION := $(shell git describe --tags --long --match "v*" --first-parent)
GIT_HASH := $(shell git rev-parse --short HEAD)
TIMESTAMP := $(shell date '+%Y-%m-%d')
PACKAGE ?=github.com/flyteorg/flytestdlib
PACKAGE ?=github.com/flyteorg/flyte/flytestdlib
LD_FLAGS="-s -w -X $(PACKAGE)/version.Version=$(GIT_VERSION) -X $(PACKAGE)/version.Build=$(GIT_HASH) -X $(PACKAGE)/version.BuildTime=$(TIMESTAMP)"
TMP_BUILD_DIR := .tmp_build

Expand Down
18 changes: 9 additions & 9 deletions docs/deployment/configuration/auth_setup.rst
Original file line number Diff line number Diff line change
Expand Up @@ -254,15 +254,15 @@ Apply OIDC Configuration
httpPort: 8088
grpc:
port: 8089
security:
secure: false
useAuth: true
allowCors: true
allowedOrigins:
# Accepting all domains for Sandbox installation
- "*"
allowedHeaders:
- "Content-Type"
security:
secure: false
useAuth: true
allowCors: true
allowedOrigins:
# Accepting all domains for Sandbox installation
- "*"
allowedHeaders:
- "Content-Type"
auth:
appAuth:
thirdPartyConfig:
Expand Down
6 changes: 3 additions & 3 deletions docs/deployment/configuration/performance.rst
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ The hash shard Strategy, denoted by ``type: Hash`` in the configuration below, u
type: Hash # use the "hash" shard strategy
shard-count: 4 # the total number of shards
The project and domain shard strategies, denoted by ``type: project`` and ``type: domain`` respectively, use the Flyte workflow project and domain metadata to shard Flyte workflows. These shard strategies are configured using a ``per-shard-mapping`` option, which is a list of IDs. Each element in the ``per-shard-mapping`` list defines a new shard, and the ID list assigns responsibility for the specified IDs to that shard. A shard configured as a single wildcard ID (i.e. ``*``) is responsible for all IDs that are not covered by other shards. Only a single shard may be configured with a wildcard ID and, on that shard, there must be only one ID, namely the wildcard.
The project and domain shard strategies, denoted by ``type: Project`` and ``type: Domain`` respectively, use the Flyte workflow project and domain metadata to shard Flyte workflows. These shard strategies are configured using a ``per-shard-mapping`` option, which is a list of IDs. Each element in the ``per-shard-mapping`` list defines a new shard, and the ID list assigns responsibility for the specified IDs to that shard. A shard configured as a single wildcard ID (i.e. ``*``) is responsible for all IDs that are not covered by other shards. Only a single shard may be configured with a wildcard ID and, on that shard, there must be only one ID, namely the wildcard.

.. code-block:: yaml
Expand All @@ -281,7 +281,7 @@ The project and domain shard strategies, denoted by ``type: project`` and ``type
# pod and scanning configuration redacted
# ...
shard:
type: project # use the "project" shard strategy
type: Project # use the "Project" shard strategy
per-shard-mapping: # a list of per shard mappings - one shard is created for each element
- ids: # the list of ids to be managed by the first shard
- flytesnacks
Expand All @@ -298,7 +298,7 @@ The project and domain shard strategies, denoted by ``type: project`` and ``type
# pod and scanning configuration redacted
# ...
shard:
type: domain # use the "domain" shard strategy
type: Domain # use the "Domain" shard strategy
per-shard-mapping: # a list of per shard mappings - one shard is created for each element
- ids: # the list of ids to be managed by the first shard
- production
Expand Down
4 changes: 2 additions & 2 deletions docs/user_guide/basics/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ We create a task that computes the slope of a regression line:
```

:::{note}
Flytekit will assign a default name to the output variable like `out0`.
Flytekit will assign a default name to the output variable like `o0`.
In case of multiple outputs, each output will be numbered in the order
starting with 0, e.g., -> `out0, out1, out2, ...`.
starting with 0, e.g., `o0`, `o1`, `o2`, etc.
:::

You can execute a Flyte task just like any regular Python function:
Expand Down
20 changes: 20 additions & 0 deletions docs/user_guide/customizing_dependencies/imagespec.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,26 @@ image_spec = ImageSpec(
)
```

## Copy additional files or directories
You can specify files or directories to be copied into the container `/root`, allowing users to access the required files. The directory structure will match the relative path. Since Docker only supports relative paths, absolute paths and paths outside the current working directory (e.g., paths with "../") are not allowed.

```py
from flytekit.image_spec import ImageSpec
from flytekit import task, workflow

image_spec = ImageSpec(
name="image_with_copy",
registry="localhost:30000",
builder="default",
copy=["files/input.txt"],
)

@task(container_image=image_spec)
def my_task() -> str:
with open("/root/files/input.txt", "r") as f:
return f.read()
```

## Define ImageSpec in a YAML File

You can override the container image by providing an ImageSpec YAML file to the `pyflyte run` or `pyflyte register` command.
Expand Down
4 changes: 2 additions & 2 deletions docs/user_guide/data_types_and_io/flytefile.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

Files are one of the most fundamental entities that users of Python work with,
and they are fully supported by Flyte. In the IDL, they are known as
[Blob](https://github.com/flyteorg/flyteidl/blob/master/protos/flyteidl/core/literals.proto#L33)
[Blob](https://github.com/flyteorg/flyte/blob/master/flyteidl/protos/flyteidl/core/literals.proto#L33)
literals which are backed by the
[blob type](https://github.com/flyteorg/flyteidl/blob/master/protos/flyteidl/core/types.proto#L47).
[blob type](https://github.com/flyteorg/flyte/blob/master/flyteidl/protos/flyteidl/core/types.proto#L73)

Let's assume our mission here is pretty simple. We download a few CSV file
links, read them with the python built-in {py:class}`csv.DictReader` function,
Expand Down
4 changes: 2 additions & 2 deletions flyteadmin/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ include ../boilerplate/flyte/docker_build/Makefile
include ../boilerplate/flyte/golang_test_targets/Makefile
include ../boilerplate/flyte/end2end/Makefile

GIT_VERSION := $(shell git describe --always --tags)
GIT_VERSION := $(shell git describe --tags --long --match "v*" --first-parent)
GIT_HASH := $(shell git rev-parse --short HEAD)
TIMESTAMP := $(shell date '+%Y-%m-%d')
# TODO(monorepo): Do we need to change this? This is used in the service that provides a version.
PACKAGE ?=github.com/flyteorg/flytestdlib
PACKAGE ?=github.com/flyteorg/flyte/flytestdlib

LD_FLAGS="-s -w -X $(PACKAGE)/version.Version=$(GIT_VERSION) -X $(PACKAGE)/version.Build=$(GIT_HASH) -X $(PACKAGE)/version.BuildTime=$(TIMESTAMP)"

Expand Down
13 changes: 13 additions & 0 deletions flyteadmin/pkg/common/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type GormQueryExpr struct {
// Complete set of filters available for database queries.
const (
Contains FilterExpression = iota
NotLike
GreaterThan
GreaterThanOrEqual
LessThan
Expand All @@ -37,6 +38,7 @@ const (
joinArgsFormat = "%s.%s"
containsQuery = "%s LIKE ?"
containsArgs = "%%%s%%"
notLikeQuery = "%s NOT LIKE ?"
greaterThanQuery = "%s > ?"
greaterThanOrEqualQuery = "%s >= ?"
lessThanQuery = "%s < ?"
Expand All @@ -50,6 +52,7 @@ const (
// Set of available filters which exclusively accept a single argument value.
var singleValueFilters = map[FilterExpression]bool{
Contains: true,
NotLike: true,
GreaterThan: true,
GreaterThanOrEqual: true,
LessThan: true,
Expand All @@ -68,6 +71,7 @@ const EqualExpression = "eq"

var filterNameMappings = map[string]FilterExpression{
"contains": Contains,
"not_like": NotLike,
"gt": GreaterThan,
"gte": GreaterThanOrEqual,
"lt": LessThan,
Expand All @@ -80,6 +84,7 @@ var filterNameMappings = map[string]FilterExpression{

var filterQueryMappings = map[FilterExpression]string{
Contains: containsQuery,
NotLike: notLikeQuery,
GreaterThan: greaterThanQuery,
GreaterThanOrEqual: greaterThanOrEqualQuery,
LessThan: lessThanQuery,
Expand Down Expand Up @@ -117,6 +122,8 @@ func getFilterExpressionName(expression FilterExpression) string {
switch expression {
case Contains:
return "contains"
case NotLike:
return "not like"
case GreaterThan:
return "greater than"
case GreaterThanOrEqual:
Expand Down Expand Up @@ -208,6 +215,12 @@ func (f *inlineFilterImpl) getGormQueryExpr(formattedField string) (GormQueryExp
// args renders to something like: "%value%"
Args: fmt.Sprintf(containsArgs, f.value),
}, nil
case NotLike:
return GormQueryExpr{
// WHERE field NOT LIKE value
Query: fmt.Sprintf(notLikeQuery, formattedField),
Args: f.value,
}, nil
case GreaterThan:
return GormQueryExpr{
// WHERE field > value
Expand Down
16 changes: 16 additions & 0 deletions flyteadmin/pkg/common/filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func TestGetGormJoinTableQueryExpr(t *testing.T) {

var expectedArgsForFilters = map[FilterExpression]string{
Contains: "%value%",
NotLike: "value",
GreaterThan: "value",
GreaterThanOrEqual: "value",
LessThan: "value",
Expand Down Expand Up @@ -169,3 +170,18 @@ func TestWithDefaultValueFilter(t *testing.T) {
assert.Equal(t, "COALESCE(named_entity_metadata.state, 0) = ?", queryExpression.Query)
assert.Equal(t, 1, queryExpression.Args)
}

func TestNotLikeFilter(t *testing.T) {
filter, err := NewSingleValueFilter(NamedEntityMetadata, NotLike, "name", ".flytegen%")
assert.NoError(t, err)

queryExpression, err := filter.GetGormQueryExpr()
assert.NoError(t, err)
assert.Equal(t, "name NOT LIKE ?", queryExpression.Query)
assert.Equal(t, ".flytegen%", queryExpression.Args)

queryExpression, err = filter.GetGormJoinTableQueryExpr("named_entity_metadata")
assert.NoError(t, err)
assert.Equal(t, "named_entity_metadata.name NOT LIKE ?", queryExpression.Query)
assert.Equal(t, ".flytegen%", queryExpression.Args)
}
2 changes: 1 addition & 1 deletion flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ func (m *ExecutionManager) launchSingleTaskExecution(
return nil, nil, err
}

launchPlan, err := util.CreateOrGetLaunchPlan(ctx, m.db, m.config, taskIdentifier,
launchPlan, err := util.CreateOrGetLaunchPlan(ctx, m.db, m.config, m.namedEntityManager, taskIdentifier,
workflow.Closure.CompiledWorkflow.Primary.Template.Interface, workflowModel.ID, request.Spec)
if err != nil {
return nil, nil, err
Expand Down
21 changes: 7 additions & 14 deletions flyteadmin/pkg/manager/impl/named_entity_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package impl

import (
"context"
"fmt"
"strconv"
"strings"

Expand All @@ -17,21 +18,13 @@ import (
"github.com/flyteorg/flyte/flyteadmin/pkg/repositories/transformers"
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
)

const state = "state"

// System-generated workflows are meant to be hidden from the user by default. Therefore we always only show
// workflow-type named entities that have been user generated only.
var nonSystemGeneratedWorkflowsFilter, _ = common.NewSingleValueFilter(
common.NamedEntityMetadata, common.NotEqual, state, admin.NamedEntityState_SYSTEM_GENERATED)
var defaultWorkflowsFilter, _ = common.NewWithDefaultValueFilter(
strconv.Itoa(int(admin.NamedEntityState_NAMED_ENTITY_ACTIVE)), nonSystemGeneratedWorkflowsFilter)

type NamedEntityMetrics struct {
Scope promutils.Scope
}
Expand Down Expand Up @@ -75,12 +68,8 @@ func (m *NamedEntityManager) GetNamedEntity(ctx context.Context, request *admin.
return util.GetNamedEntity(ctx, m.db, request.ResourceType, request.Id)
}

func (m *NamedEntityManager) getQueryFilters(referenceEntity core.ResourceType, requestFilters string) ([]common.InlineFilter, error) {
func (m *NamedEntityManager) getQueryFilters(requestFilters string) ([]common.InlineFilter, error) {
filters := make([]common.InlineFilter, 0)
if referenceEntity == core.ResourceType_WORKFLOW {
filters = append(filters, defaultWorkflowsFilter)
}

if len(requestFilters) == 0 {
return filters, nil
}
Expand Down Expand Up @@ -111,10 +100,14 @@ func (m *NamedEntityManager) ListNamedEntities(ctx context.Context, request *adm
}
ctx = contextutils.WithProjectDomain(ctx, request.Project, request.Domain)

if len(request.Filters) == 0 {
// Add implicit filter to exclude system generated workflows
request.Filters = fmt.Sprintf("not_like(name,%s)", ".flytegen%")
}
// HACK: In order to filter by state (if requested) - we need to amend the filter to use COALESCE
// e.g. eq(state, 1) becomes 'WHERE (COALESCE(state, 0) = '1')' since not every NamedEntity necessarily
// has an entry, and therefore the default state value '0' (active), should be assumed.
filters, err := m.getQueryFilters(request.ResourceType, request.Filters)
filters, err := m.getQueryFilters(request.Filters)
if err != nil {
return nil, err
}
Expand Down
10 changes: 3 additions & 7 deletions flyteadmin/pkg/manager/impl/named_entity_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestNamedEntityManager_Get_BadRequest(t *testing.T) {
func TestNamedEntityManager_getQueryFilters(t *testing.T) {
repository := getMockRepositoryForNETest()
manager := NewNamedEntityManager(repository, getMockConfigForNETest(), mockScope.NewTestScope())
updatedFilters, err := manager.(*NamedEntityManager).getQueryFilters(core.ResourceType_TASK, "eq(state, 0)")
updatedFilters, err := manager.(*NamedEntityManager).getQueryFilters("eq(state, 0)")
assert.NoError(t, err)
assert.Len(t, updatedFilters, 1)

Expand All @@ -97,13 +97,9 @@ func TestNamedEntityManager_getQueryFilters(t *testing.T) {
assert.Equal(t, "COALESCE(state, 0) = ?", queryExp.Query)
assert.Equal(t, "0", queryExp.Args)

updatedFilters, err = manager.(*NamedEntityManager).getQueryFilters(core.ResourceType_WORKFLOW, "")
updatedFilters, err = manager.(*NamedEntityManager).getQueryFilters("")
assert.NoError(t, err)
assert.Len(t, updatedFilters, 1)
queryExp, err = updatedFilters[0].GetGormQueryExpr()
assert.NoError(t, err)
assert.Equal(t, "COALESCE(state, 0) <> ?", queryExp.Query)
assert.Equal(t, admin.NamedEntityState_SYSTEM_GENERATED, queryExp.Args)
assert.Len(t, updatedFilters, 0)
}

func TestNamedEntityManager_Update(t *testing.T) {
Expand Down
15 changes: 14 additions & 1 deletion flyteadmin/pkg/manager/impl/util/single_task_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func CreateOrGetWorkflowModel(
}

func CreateOrGetLaunchPlan(ctx context.Context,
db repositoryInterfaces.Repository, config runtimeInterfaces.Configuration, taskIdentifier *core.Identifier,
db repositoryInterfaces.Repository, config runtimeInterfaces.Configuration, namedEntityManager interfaces.NamedEntityInterface, taskIdentifier *core.Identifier,
workflowInterface *core.TypedInterface, workflowID uint, spec *admin.ExecutionSpec) (*admin.LaunchPlan, error) {
var launchPlan *admin.LaunchPlan
var err error
Expand Down Expand Up @@ -226,6 +226,19 @@ func CreateOrGetLaunchPlan(ctx context.Context,
logger.Errorf(ctx, "Failed to save launch plan model [%+v] with err: %v", launchPlanIdentifier, err)
return nil, err
}
_, err = namedEntityManager.UpdateNamedEntity(ctx, &admin.NamedEntityUpdateRequest{
ResourceType: core.ResourceType_LAUNCH_PLAN,
Id: &admin.NamedEntityIdentifier{
Project: launchPlan.GetId().GetProject(),
Domain: launchPlan.GetId().GetDomain(),
Name: launchPlan.GetId().GetName(),
},
Metadata: &admin.NamedEntityMetadata{State: admin.NamedEntityState_SYSTEM_GENERATED},
})
if err != nil {
logger.Warningf(ctx, "Failed to set launch plan state to system-generated: %v", err)
return nil, err
}
}

return launchPlan, nil
Expand Down
17 changes: 16 additions & 1 deletion flyteadmin/pkg/manager/impl/util/single_task_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,21 @@ func TestCreateOrGetLaunchPlan(t *testing.T) {
},
}
workflowID := uint(12)

mockNamedEntityManager := managerMocks.NamedEntityManager{}
mockNamedEntityManager.UpdateNamedEntityFunc = func(ctx context.Context, request *admin.NamedEntityUpdateRequest) (*admin.NamedEntityUpdateResponse, error) {
assert.Equal(t, request.ResourceType, core.ResourceType_LAUNCH_PLAN)
assert.True(t, proto.Equal(request.Id, &admin.NamedEntityIdentifier{
Project: "flytekit",
Domain: "production",
Name: ".flytegen.app.workflows.MyWorkflow.my_task",
}), fmt.Sprintf("%+v", request.Id))
assert.True(t, proto.Equal(request.Metadata, &admin.NamedEntityMetadata{
State: admin.NamedEntityState_SYSTEM_GENERATED,
}))
return &admin.NamedEntityUpdateResponse{}, nil
}

taskIdentifier := &core.Identifier{
ResourceType: core.ResourceType_TASK,
Project: "flytekit",
Expand All @@ -233,7 +248,7 @@ func TestCreateOrGetLaunchPlan(t *testing.T) {
},
}
launchPlan, err := CreateOrGetLaunchPlan(
context.Background(), repository, config, taskIdentifier, workflowInterface, workflowID, &spec)
context.Background(), repository, config, &mockNamedEntityManager, taskIdentifier, workflowInterface, workflowID, &spec)
assert.NoError(t, err)
assert.True(t, proto.Equal(&core.Identifier{
ResourceType: core.ResourceType_LAUNCH_PLAN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
)

var archivableResourceTypes = sets.NewInt32(int32(core.ResourceType_WORKFLOW), int32(core.ResourceType_TASK))
var archivableResourceTypes = sets.NewInt32(int32(core.ResourceType_WORKFLOW), int32(core.ResourceType_TASK), int32(core.ResourceType_LAUNCH_PLAN))

func ValidateNamedEntityGetRequest(request *admin.NamedEntityGetRequest) error {
if err := ValidateResourceType(request.ResourceType); err != nil {
Expand Down
Loading

0 comments on commit b282e5f

Please sign in to comment.