Skip to content

Commit

Permalink
Revert "Proto changes (#4778)"
Browse files Browse the repository at this point in the history
This reverts commit 8f516e8.
  • Loading branch information
squiishyy committed Feb 14, 2024
1 parent f991096 commit 5bea88d
Show file tree
Hide file tree
Showing 20 changed files with 852 additions and 199 deletions.
3 changes: 3 additions & 0 deletions flyte-single-binary-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ admin:
# and _also_, admin to talk to artifacts
endpoint: localhost:30080
insecure: true
flyteadmin:
featureGates:
enableArtifacts: true

catalog-cache:
endpoint: localhost:8081
Expand Down
29 changes: 29 additions & 0 deletions flyteadmin/pkg/artifacts/registry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package artifacts

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
)

func TestRegistryNoClient(t *testing.T) {
r := NewArtifactRegistry(context.Background(), nil)
assert.Nil(t, r.GetClient())
}

type Parent struct {
R *ArtifactRegistry
}

func TestPointerReceivers(t *testing.T) {
p := Parent{}
nilClient := p.R.GetClient()
assert.Nil(t, nilClient)
}

func TestNilCheck(t *testing.T) {
r := NewArtifactRegistry(context.Background(), nil)
err := r.RegisterTrigger(context.Background(), nil)
assert.NotNil(t, err)
}
69 changes: 69 additions & 0 deletions flyteadmin/pkg/manager/impl/exec_manager_other_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package impl

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"

"github.com/flyteorg/flyte/flyteadmin/pkg/artifacts"
eventWriterMocks "github.com/flyteorg/flyte/flyteadmin/pkg/async/events/mocks"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
mockScope "github.com/flyteorg/flyte/flytestdlib/promutils"
)

func TestResolveNotWorking(t *testing.T) {
mockConfig := getMockExecutionsConfigProvider()

execManager := NewExecutionManager(nil, nil, mockConfig, nil, mockScope.NewTestScope(), mockScope.NewTestScope(), nil, nil, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}, artifacts.NewArtifactRegistry(context.Background(), nil)).(*ExecutionManager)

pm, artifactIDs, err := execManager.ResolveParameterMapArtifacts(context.Background(), nil, nil)
assert.Nil(t, err)
fmt.Println(pm, artifactIDs)

}

func TestTrackingBitExtract(t *testing.T) {
mockConfig := getMockExecutionsConfigProvider()

execManager := NewExecutionManager(nil, nil, mockConfig, nil, mockScope.NewTestScope(), mockScope.NewTestScope(), nil, nil, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}, artifacts.NewArtifactRegistry(context.Background(), nil)).(*ExecutionManager)

lit := core.Literal{
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Primitive{
Primitive: &core.Primitive{
Value: &core.Primitive_Integer{
Integer: 1,
},
},
},
},
},
Metadata: map[string]string{"_ua": "proj/domain/name@version"},
}
inputMap := core.LiteralMap{
Literals: map[string]*core.Literal{
"a": &lit,
},
}
inputColl := core.LiteralCollection{
Literals: []*core.Literal{
&lit,
},
}

var trackers = make(map[string]string)
execManager.ExtractArtifactTrackers(trackers, &lit)
assert.Equal(t, 1, len(trackers))

trackers = make(map[string]string)
execManager.ExtractArtifactTrackers(trackers, &core.Literal{Value: &core.Literal_Map{Map: &inputMap}})
assert.Equal(t, 1, len(trackers))

trackers = make(map[string]string)
execManager.ExtractArtifactTrackers(trackers, &core.Literal{Value: &core.Literal_Collection{Collection: &inputColl}})
assert.Equal(t, 1, len(trackers))
assert.Equal(t, "", trackers["proj/domain/name@version"])
}
116 changes: 114 additions & 2 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/flyteorg/flyte/flytestdlib/storage"

"github.com/flyteorg/flyte/flyteadmin/auth"
"github.com/flyteorg/flyte/flyteadmin/pkg/artifacts"
cloudeventInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/async/cloudevent/interfaces"
eventWriter "github.com/flyteorg/flyte/flyteadmin/pkg/async/events/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications"
Expand All @@ -43,9 +44,11 @@ import (
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
workflowengineInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces"
"github.com/flyteorg/flyte/flyteadmin/plugins"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event"
)

const childContainerQueueKey = "child_queue"
const artifactTrackerKey = "_ua"

// Map of [project] -> map of [domain] -> stop watch
type projectDomainScopedStopWatchMap = map[string]map[string]*promutils.StopWatch
Expand Down Expand Up @@ -93,6 +96,7 @@ type ExecutionManager struct {
cloudEventPublisher notificationInterfaces.Publisher
dbEventWriter eventWriter.WorkflowExecutionEventWriter
pluginRegistry *plugins.Registry
artifactRegistry *artifacts.ArtifactRegistry
}

func getExecutionContext(ctx context.Context, id *core.WorkflowExecutionIdentifier) context.Context {
Expand Down Expand Up @@ -684,6 +688,28 @@ func resolveSecurityCtx(ctx context.Context, executionConfigSecurityCtx *core.Se
}
}

// ExtractArtifactTrackers pulls out artifact tracker strings from Literals for lineage
func (m *ExecutionManager) ExtractArtifactTrackers(artifactTrackers map[string]string, input *core.Literal) {

if input == nil {
return
}
if input.GetMetadata() != nil {
if tracker, ok := input.GetMetadata()[artifactTrackerKey]; ok {
artifactTrackers[tracker] = ""
}
}
if input.GetCollection() != nil {
for _, v := range input.GetCollection().Literals {
m.ExtractArtifactTrackers(artifactTrackers, v)
}
} else if input.GetMap() != nil {
for _, v := range input.GetMap().Literals {
m.ExtractArtifactTrackers(artifactTrackers, v)
}
}
}

// getStringFromInput should be called when a tag or partition value is a binding to an input. the input is looked up
// from the input map and the binding, and an error is returned if the input key is not in the map.
func (m *ExecutionManager) getStringFromInput(ctx context.Context, inputBinding core.InputBindingData, inputs map[string]*core.Literal) (string, error) {
Expand Down Expand Up @@ -896,6 +922,8 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
ctx context.Context, request admin.ExecutionCreateRequest, requestedAt time.Time) (
context.Context, *models.Execution, error) {

ctxPD := contextutils.WithProjectDomain(ctx, request.Project, request.Domain)

err := validation.ValidateExecutionRequest(ctx, request, m.db, m.config.ApplicationConfiguration())
if err != nil {
logger.Debugf(ctx, "Failed to validate ExecutionCreateRequest %+v with err %v", request, err)
Expand All @@ -919,9 +947,55 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
return nil, nil, err
}

// TODO: Artifact feature gate, remove when ready
var lpExpectedInputs *core.ParameterMap
var artifactTrackers = make(map[string]string)
var usedArtifactIDs []*core.ArtifactID
lpExpectedInputs = launchPlan.Closure.ExpectedInputs
if m.artifactRegistry.GetClient() != nil {
// Literals may have an artifact key in the metadata field. This is something the artifact service should have
// added. Pull these back out so we can keep track of them for lineage purposes. Use a dummy wrapper object for
// easier recursion.
requestInputMap := &core.Literal{
Value: &core.Literal_Map{Map: request.Inputs},
}
fixedInputMap := &core.Literal{
Value: &core.Literal_Map{Map: launchPlan.Spec.FixedInputs},
}
m.ExtractArtifactTrackers(artifactTrackers, requestInputMap)
m.ExtractArtifactTrackers(artifactTrackers, fixedInputMap)

// Put together the inputs that we've already resolved so that the artifact querying bit can fill them in.
// This is to support artifact queries that depend on other inputs using the {{ .inputs.var }} construct.
var inputsForQueryTemplating = make(map[string]*core.Literal)
if request.Inputs != nil {
for k, v := range request.Inputs.Literals {
inputsForQueryTemplating[k] = v
}
}
for k, v := range launchPlan.Spec.FixedInputs.Literals {
inputsForQueryTemplating[k] = v
}
logger.Debugf(ctx, "Inputs for query templating: [%+v]", inputsForQueryTemplating)

// Resolve artifact queries
// Within the launch plan, the artifact will be in the Parameter map, and can come in form of an ArtifactID,
// or as an ArtifactQuery.
// Also send in the inputsForQueryTemplating for two reasons, so we don't run queries for things we don't need to
// and so we can fill in template args.
// ArtifactIDs are also returned for lineage purposes.
lpExpectedInputs, usedArtifactIDs, err = m.ResolveParameterMapArtifacts(ctxPD, launchPlan.Closure.ExpectedInputs, inputsForQueryTemplating)
if err != nil {
logger.Errorf(ctx, "Error looking up launch plan closure parameter map: %v", err)
return nil, nil, err
}

logger.Debugf(ctx, "Resolved launch plan closure expected inputs from [%+v] to [%+v]", launchPlan.Closure.ExpectedInputs, lpExpectedInputs)
logger.Debugf(ctx, "Found artifact trackers: %v", artifactTrackers)
logger.Debugf(ctx, "Found artifact IDs: %v", usedArtifactIDs)

} else {
lpExpectedInputs = launchPlan.Closure.ExpectedInputs
}

// Artifacts retrieved will need to be stored somewhere to ensure that we can re-emit events if necessary
// in the future, and also to make sure that relaunch and recover can use it if necessary.
Expand Down Expand Up @@ -1069,6 +1143,13 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
notificationsSettings = make([]*admin.Notification, 0)
}

// Publish of event is also gated on the artifact client being available, even though it's not directly required.
// TODO: Artifact feature gate, remove when ready
if m.artifactRegistry.GetClient() != nil {
// TODO: Add principal
m.publishExecutionStart(ctx, workflowExecutionID, request.Spec.LaunchPlan, workflow.Id, artifactTrackers, usedArtifactIDs)
}

createExecModelInput := transformers.CreateExecutionModelInput{
WorkflowExecutionID: workflowExecutionID,
RequestSpec: requestSpec,
Expand Down Expand Up @@ -1118,6 +1199,36 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
return ctx, executionModel, nil
}

// publishExecutionStart is an event that Admin publishes for artifact lineage.
func (m *ExecutionManager) publishExecutionStart(ctx context.Context, executionID core.WorkflowExecutionIdentifier,
launchPlanID *core.Identifier, workflowID *core.Identifier, artifactTrackers map[string]string, usedArtifactIDs []*core.ArtifactID) {

var artifactTrackerList []string
// Use a list instead of the fake set
for k := range artifactTrackers {
artifactTrackerList = append(artifactTrackerList, k)
}

if len(artifactTrackerList) > 0 || len(usedArtifactIDs) > 0 {
logger.Debugf(ctx, "Sending execution start event for execution [%+v] with trackers [%+v] and artifact ids [%+v]", executionID, artifactTrackerList, usedArtifactIDs)

request := event.CloudEventExecutionStart{
ExecutionId: &executionID,
LaunchPlanId: launchPlanID,
WorkflowId: workflowID,
ArtifactIds: usedArtifactIDs,
ArtifactTrackers: artifactTrackerList,
}
go func() {
ceCtx := context.TODO()
if err := m.cloudEventPublisher.Publish(ceCtx, proto.MessageName(&request), &request); err != nil {
m.systemMetrics.PublishEventError.Inc()
logger.Infof(ctx, "error publishing cloud event [%+v] with err: [%v]", request, err)
}
}()
}
}

// Inserts an execution model into the database store and emits platform metrics.
func (m *ExecutionManager) createExecutionModel(
ctx context.Context, executionModel *models.Execution) (*core.WorkflowExecutionIdentifier, error) {
Expand Down Expand Up @@ -1842,7 +1953,7 @@ func NewExecutionManager(db repositoryInterfaces.Repository, pluginRegistry *plu
publisher notificationInterfaces.Publisher, urlData dataInterfaces.RemoteURLInterface,
workflowManager interfaces.WorkflowInterface, namedEntityManager interfaces.NamedEntityInterface,
eventPublisher notificationInterfaces.Publisher, cloudEventPublisher cloudeventInterfaces.Publisher,
eventWriter eventWriter.WorkflowExecutionEventWriter) interfaces.ExecutionInterface {
eventWriter eventWriter.WorkflowExecutionEventWriter, artifactRegistry *artifacts.ArtifactRegistry) interfaces.ExecutionInterface {

queueAllocator := executions.NewQueueAllocator(config, db)
systemMetrics := newExecutionSystemMetrics(systemScope)
Expand Down Expand Up @@ -1876,6 +1987,7 @@ func NewExecutionManager(db repositoryInterfaces.Repository, pluginRegistry *plu
cloudEventPublisher: cloudEventPublisher,
dbEventWriter: eventWriter,
pluginRegistry: pluginRegistry,
artifactRegistry: artifactRegistry,
}
}

Expand Down
Loading

0 comments on commit 5bea88d

Please sign in to comment.