Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' into bug/delete-image-pull-backoff-ng
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw committed Apr 18, 2023
2 parents 349ece0 + e4ca252 commit 6ed1729
Show file tree
Hide file tree
Showing 39 changed files with 755 additions and 571 deletions.
16 changes: 8 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ include boilerplate/flyte/docker_build/Makefile
include boilerplate/flyte/golang_test_targets/Makefile
include boilerplate/flyte/end2end/Makefile


.PHONY: update_boilerplate
update_boilerplate:
@curl https://raw.githubusercontent.com/flyteorg/boilerplate/master/boilerplate/update.sh -o boilerplate/update.sh
@boilerplate/update.sh

.PHONY: linux_compile
linux_compile: export CGO_ENABLED ?= 0
linux_compile: export GOOS ?= linux
linux_compile:
GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o /artifacts/flytepropeller ./cmd/controller/main.go
GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o /artifacts/flytepropeller-manager ./cmd/manager/main.go
GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o /artifacts/kubectl-flyte ./cmd/kubectl-flyte/main.go
go build -o /artifacts/flytepropeller ./cmd/controller/main.go
go build -o /artifacts/flytepropeller-manager ./cmd/manager/main.go
go build -o /artifacts/kubectl-flyte ./cmd/kubectl-flyte/main.go

.PHONY: compile
compile:
Expand All @@ -25,9 +26,9 @@ compile:
cross_compile:
@glide install
@mkdir -p ./bin/cross
GOOS=linux GOARCH=amd64 go build -o bin/cross/flytepropeller ./cmd/controller/main.go
GOOS=linux GOARCH=amd64 go build -o bin/cross/flytepropeller-manager ./cmd/manager/main.go
GOOS=linux GOARCH=amd64 go build -o bin/cross/kubectl-flyte ./cmd/kubectl-flyte/main.go
go build -o bin/cross/flytepropeller ./cmd/controller/main.go
go build -o bin/cross/flytepropeller-manager ./cmd/manager/main.go
go build -o bin/cross/kubectl-flyte ./cmd/kubectl-flyte/main.go

op_code_generate:
@RESOURCE_NAME=flyteworkflow OPERATOR_PKG=github.com/flyteorg/flytepropeller ./hack/update-codegen.sh
Expand All @@ -53,4 +54,3 @@ clean:
golden:
go test ./cmd/kubectl-flyte/cmd -update
go test ./pkg/compiler/test -update

8 changes: 8 additions & 0 deletions cmd/controller/cmd/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,14 @@ func runWebhook(origContext context.Context, propellerCfg *config.Config, cfg *w
return err
})

g.Go(func() error {
err := controller.StartControllerManager(childCtx, mgr)
if err != nil {
logger.Fatalf(childCtx, "Failed to start controller manager. Error: %v", err)
}
return err
})

g.Go(func() error {
err := webhook.Run(childCtx, propellerCfg, cfg, defaultNamespace, &webhookScope, mgr)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.13.0
github.com/flyteorg/flyteidl v1.3.14
github.com/flyteorg/flyteplugins v1.0.40
github.com/flyteorg/flyteplugins v1.0.49
github.com/flyteorg/flytestdlib v1.0.15
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
Expand Down
2 changes: 0 additions & 2 deletions pkg/compiler/transformers/k8s/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ func buildNodeSpec(n *core.Node, tasks []*core.CompiledTask, errs errors.Compile

if n.GetTaskNode().Overrides != nil && n.GetTaskNode().Overrides.Resources != nil {
resources = n.GetTaskNode().Overrides.Resources
} else {
resources = getResources(task)
}
}

Expand Down
16 changes: 0 additions & 16 deletions pkg/compiler/transformers/k8s/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,22 +93,6 @@ func TestBuildNodeSpec(t *testing.T) {
mustBuild(t, n, 1, errs.NewScope())
})

t.Run("Task with resources", func(t *testing.T) {
expectedCPU := resource.MustParse("10Mi")
n.Node.Target = &core.Node_TaskNode{
TaskNode: &core.TaskNode{
Reference: &core.TaskNode_ReferenceId{
ReferenceId: &core.Identifier{Name: "ref_2"},
},
},
}

spec := mustBuild(t, n, 1, errs.NewScope())
assert.NotNil(t, spec.Resources)
assert.NotNil(t, spec.Resources.Requests.Cpu())
assert.Equal(t, expectedCPU.Value(), spec.Resources.Requests.Cpu().Value())
})

t.Run("node with resource overrides", func(t *testing.T) {
expectedCPU := resource.MustParse("20Mi")
n.Node.Target = &core.Node_TaskNode{
Expand Down
12 changes: 0 additions & 12 deletions pkg/compiler/transformers/k8s/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,6 @@ func computeDeadline(n *core.Node) (*v1.Duration, error) {
return deadline, nil
}

func getResources(task *core.TaskTemplate) *core.Resources {
if task == nil {
return nil
}

if task.GetContainer() == nil {
return nil
}

return task.GetContainer().Resources
}

func toAliasValueArray(aliases []*core.Alias) []v1alpha1.Alias {
if aliases == nil {
return nil
Expand Down
82 changes: 82 additions & 0 deletions pkg/controller/executors/mocks/node_lookup.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 15 additions & 1 deletion pkg/controller/executors/node_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,27 @@ import (
type NodeLookup interface {
GetNode(nodeID v1alpha1.NodeID) (v1alpha1.ExecutableNode, bool)
GetNodeExecutionStatus(ctx context.Context, id v1alpha1.NodeID) v1alpha1.ExecutableNodeStatus
// Lookup for upstream edges, find all node ids from which this node can be reached.
ToNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, error)
// Lookup for downstream edges, find all node ids that can be reached from the given node id.
FromNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, error)
}

// Implements a contextual NodeLookup that can be composed of a disparate NodeGetter and a NodeStatusGetter
type contextualNodeLookup struct {
v1alpha1.NodeGetter
v1alpha1.NodeStatusGetter
DAGStructure
}

// Returns a Contextual NodeLookup using the given NodeGetter and a separate NodeStatusGetter.
// Very useful in Subworkflows where the Subworkflow is the reservoir of the nodes, but the status for these nodes
// maybe stored int he Top-level workflow node itself.
func NewNodeLookup(n v1alpha1.NodeGetter, s v1alpha1.NodeStatusGetter) NodeLookup {
func NewNodeLookup(n v1alpha1.NodeGetter, s v1alpha1.NodeStatusGetter, d DAGStructure) NodeLookup {
return contextualNodeLookup{
NodeGetter: n,
NodeStatusGetter: s,
DAGStructure: d,
}
}

Expand All @@ -45,6 +51,14 @@ func (s staticNodeLookup) GetNodeExecutionStatus(_ context.Context, id v1alpha1.
return s.status[id]
}

func (s staticNodeLookup) ToNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, error) {
return nil, nil
}

func (s staticNodeLookup) FromNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, error) {
return nil, nil
}

// Returns a new NodeLookup useful in Testing. Not recommended to be used in production
func NewTestNodeLookup(nodes map[v1alpha1.NodeID]v1alpha1.ExecutableNode, status map[v1alpha1.NodeID]v1alpha1.ExecutableNodeStatus) NodeLookup {
return staticNodeLookup{
Expand Down
8 changes: 7 additions & 1 deletion pkg/controller/executors/node_lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@ type nsg struct {
v1alpha1.NodeStatusGetter
}

type dag struct {
DAGStructure
}

func TestNewNodeLookup(t *testing.T) {
n := ng{}
ns := nsg{}
nl := NewNodeLookup(n, ns)
d := dag{}
nl := NewNodeLookup(n, ns, d)
assert.NotNil(t, nl)
typed := nl.(contextualNodeLookup)
assert.Equal(t, n, typed.NodeGetter)
assert.Equal(t, ns, typed.NodeStatusGetter)
assert.Equal(t, d, typed.DAGStructure)
}

func TestNewTestNodeLookup(t *testing.T) {
Expand Down
18 changes: 15 additions & 3 deletions pkg/controller/nodes/branch/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,11 @@ func (b *branchHandler) recurseDownstream(ctx context.Context, nCtx handler.Node
childNodeStatus := nl.GetNodeExecutionStatus(ctx, branchTakenNode.GetID())
childNodeStatus.SetDataDir(nodeStatus.GetDataDir())
childNodeStatus.SetOutputDir(nodeStatus.GetOutputDir())
dag := executors.NewLeafNodeDAGStructure(branchTakenNode.GetID(), nCtx.NodeID())
upstreamNodeIds, err := nCtx.ContextualNodeLookup().ToNode(branchTakenNode.GetID())
if err != nil {
return handler.UnknownTransition, err
}
dag := executors.NewLeafNodeDAGStructure(branchTakenNode.GetID(), append(upstreamNodeIds, nCtx.NodeID())...)
execContext, err := b.getExecutionContextForDownstream(nCtx)
if err != nil {
return handler.UnknownTransition, err
Expand Down Expand Up @@ -196,7 +200,11 @@ func (b *branchHandler) Abort(ctx context.Context, nCtx handler.NodeExecutionCon
// TODO we should replace the call to RecursiveNodeHandler with a call to SingleNode Handler. The inputs are also already known ahead of time
// There is no DAGStructure for the branch nodes, the branch taken node is the leaf node. The node itself may be arbitrarily complex, but in that case the node should reference a subworkflow etc
// The parent of the BranchTaken Node is the actual Branch Node and all the data is just forwarded from the Branch to the executed node.
dag := executors.NewLeafNodeDAGStructure(branchTakenNode.GetID(), nCtx.NodeID())
upstreamNodeIds, err := nCtx.ContextualNodeLookup().ToNode(branchTakenNode.GetID())
if err != nil {
return err
}
dag := executors.NewLeafNodeDAGStructure(branchTakenNode.GetID(), append(upstreamNodeIds, nCtx.NodeID())...)
execContext, err := b.getExecutionContextForDownstream(nCtx)
if err != nil {
return err
Expand Down Expand Up @@ -236,7 +244,11 @@ func (b *branchHandler) Finalize(ctx context.Context, nCtx handler.NodeExecution
// TODO we should replace the call to RecursiveNodeHandler with a call to SingleNode Handler. The inputs are also already known ahead of time
// There is no DAGStructure for the branch nodes, the branch taken node is the leaf node. The node itself may be arbitrarily complex, but in that case the node should reference a subworkflow etc
// The parent of the BranchTaken Node is the actual Branch Node and all the data is just forwarded from the Branch to the executed node.
dag := executors.NewLeafNodeDAGStructure(branchTakenNode.GetID(), nCtx.NodeID())
upstreamNodeIds, err := nCtx.ContextualNodeLookup().ToNode(branchTakenNode.GetID())
if err != nil {
return err
}
dag := executors.NewLeafNodeDAGStructure(branchTakenNode.GetID(), append(upstreamNodeIds, nCtx.NodeID())...)
execContext, err := b.getExecutionContextForDownstream(nCtx)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 6ed1729

Please sign in to comment.