Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
using consts for start-node and end-node
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw committed Mar 27, 2023
1 parent 047f18f commit a4a2330
Showing 1 changed file with 9 additions and 7 deletions.
16 changes: 9 additions & 7 deletions pkg/manager/impl/metrics_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"

"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"

"github.com/flyteorg/flytestdlib/promutils"

"github.com/golang/protobuf/ptypes/duration"
Expand Down Expand Up @@ -254,7 +256,7 @@ func (m *MetricsManager) parseDynamicNodeExecution(ctx context.Context, nodeExec
}
} else {
// between task execution(s) and node execution(s) overhead
startNode := nodeExecutions["start-node"]
startNode := nodeExecutions[v1alpha1.StartNodeID]
*spans = append(*spans, createOperationSpan(taskExecutions[len(taskExecutions)-1].Closure.UpdatedAt,
startNode.Closure.UpdatedAt, nodeReset))

Expand All @@ -270,7 +272,7 @@ func (m *MetricsManager) parseDynamicNodeExecution(ctx context.Context, nodeExec
}

// backened overhead
latestUpstreamNode := m.getLatestUpstreamNodeExecution("end-node",
latestUpstreamNode := m.getLatestUpstreamNodeExecution(v1alpha1.EndNodeID,
nodeExecutionData.DynamicWorkflow.CompiledWorkflow.Primary.Connections.Upstream, nodeExecutions)
if latestUpstreamNode != nil && !nodeExecution.Closure.UpdatedAt.AsTime().Before(latestUpstreamNode.Closure.UpdatedAt.AsTime()) {
*spans = append(*spans, createOperationSpan(latestUpstreamNode.Closure.UpdatedAt, nodeExecution.Closure.UpdatedAt, nodeTeardown))
Expand Down Expand Up @@ -302,7 +304,7 @@ func (m *MetricsManager) parseExecution(ctx context.Context, execution *admin.Ex
}

// check if workflow has started
startNode := nodeExecutions["start-node"]
startNode := nodeExecutions[v1alpha1.StartNodeID]
if startNode.Closure.UpdatedAt == nil || reflect.DeepEqual(startNode.Closure.UpdatedAt, emptyTimestamp) {
spans = append(spans, createOperationSpan(execution.Closure.CreatedAt, execution.Closure.UpdatedAt, workflowSetup))
} else {
Expand All @@ -315,7 +317,7 @@ func (m *MetricsManager) parseExecution(ctx context.Context, execution *admin.Ex
}

// compute backend overhead
latestUpstreamNode := m.getLatestUpstreamNodeExecution("end-node",
latestUpstreamNode := m.getLatestUpstreamNodeExecution(v1alpha1.EndNodeID,
workflow.Closure.CompiledWorkflow.Primary.Connections.Upstream, nodeExecutions)
if latestUpstreamNode != nil && !execution.Closure.UpdatedAt.AsTime().Before(latestUpstreamNode.Closure.UpdatedAt.AsTime()) {
spans = append(spans, createOperationSpan(latestUpstreamNode.Closure.UpdatedAt,
Expand Down Expand Up @@ -466,7 +468,7 @@ func (m *MetricsManager) parseNodeExecutions(ctx context.Context, nodeExecutions
// iterate over sorted node executions
for _, nodeExecution := range sortedNodeExecutions {
specNodeID := nodeExecution.Metadata.SpecNodeId
if specNodeID == "start-node" || specNodeID == "end-node" {
if specNodeID == v1alpha1.StartNodeID || specNodeID == v1alpha1.EndNodeID {
continue
}

Expand Down Expand Up @@ -523,7 +525,7 @@ func (m *MetricsManager) parseSubworkflowNodeExecution(ctx context.Context,
*spans = append(*spans, createOperationSpan(nodeExecution.Closure.CreatedAt, nodeExecution.Closure.UpdatedAt, nodeSetup))
} else {
// frontend overhead
startNode := nodeExecutions["start-node"]
startNode := nodeExecutions[v1alpha1.StartNodeID]
*spans = append(*spans, createOperationSpan(nodeExecution.Closure.CreatedAt, startNode.Closure.UpdatedAt, nodeSetup))

// retrieve workflow
Expand All @@ -539,7 +541,7 @@ func (m *MetricsManager) parseSubworkflowNodeExecution(ctx context.Context,
}

// backened overhead
latestUpstreamNode := m.getLatestUpstreamNodeExecution("end-node",
latestUpstreamNode := m.getLatestUpstreamNodeExecution(v1alpha1.EndNodeID,
workflow.Closure.CompiledWorkflow.Primary.Connections.Upstream, nodeExecutions)
if latestUpstreamNode != nil && !nodeExecution.Closure.UpdatedAt.AsTime().Before(latestUpstreamNode.Closure.UpdatedAt.AsTime()) {
*spans = append(*spans, createOperationSpan(latestUpstreamNode.Closure.UpdatedAt, nodeExecution.Closure.UpdatedAt, nodeTeardown))
Expand Down

0 comments on commit a4a2330

Please sign in to comment.