From a4a2330224ebc1feb78a8d3f05327ad83ae354a0 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Mon, 27 Mar 2023 11:57:27 -0500 Subject: [PATCH] using consts for start-node and end-node Signed-off-by: Daniel Rammer --- pkg/manager/impl/metrics_manager.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/pkg/manager/impl/metrics_manager.go b/pkg/manager/impl/metrics_manager.go index 1b9a0be18..a6d010b1e 100644 --- a/pkg/manager/impl/metrics_manager.go +++ b/pkg/manager/impl/metrics_manager.go @@ -12,6 +12,8 @@ import ( "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + "github.com/flyteorg/flytestdlib/promutils" "github.com/golang/protobuf/ptypes/duration" @@ -254,7 +256,7 @@ func (m *MetricsManager) parseDynamicNodeExecution(ctx context.Context, nodeExec } } else { // between task execution(s) and node execution(s) overhead - startNode := nodeExecutions["start-node"] + startNode := nodeExecutions[v1alpha1.StartNodeID] *spans = append(*spans, createOperationSpan(taskExecutions[len(taskExecutions)-1].Closure.UpdatedAt, startNode.Closure.UpdatedAt, nodeReset)) @@ -270,7 +272,7 @@ func (m *MetricsManager) parseDynamicNodeExecution(ctx context.Context, nodeExec } // backened overhead - latestUpstreamNode := m.getLatestUpstreamNodeExecution("end-node", + latestUpstreamNode := m.getLatestUpstreamNodeExecution(v1alpha1.EndNodeID, nodeExecutionData.DynamicWorkflow.CompiledWorkflow.Primary.Connections.Upstream, nodeExecutions) if latestUpstreamNode != nil && !nodeExecution.Closure.UpdatedAt.AsTime().Before(latestUpstreamNode.Closure.UpdatedAt.AsTime()) { *spans = append(*spans, createOperationSpan(latestUpstreamNode.Closure.UpdatedAt, nodeExecution.Closure.UpdatedAt, nodeTeardown)) @@ -302,7 +304,7 @@ func (m *MetricsManager) parseExecution(ctx context.Context, execution *admin.Ex } // check if workflow has started - startNode := nodeExecutions["start-node"] + startNode := nodeExecutions[v1alpha1.StartNodeID] if startNode.Closure.UpdatedAt == nil || reflect.DeepEqual(startNode.Closure.UpdatedAt, emptyTimestamp) { spans = append(spans, createOperationSpan(execution.Closure.CreatedAt, execution.Closure.UpdatedAt, workflowSetup)) } else { @@ -315,7 +317,7 @@ func (m *MetricsManager) parseExecution(ctx context.Context, execution *admin.Ex } // compute backend overhead - latestUpstreamNode := m.getLatestUpstreamNodeExecution("end-node", + latestUpstreamNode := m.getLatestUpstreamNodeExecution(v1alpha1.EndNodeID, workflow.Closure.CompiledWorkflow.Primary.Connections.Upstream, nodeExecutions) if latestUpstreamNode != nil && !execution.Closure.UpdatedAt.AsTime().Before(latestUpstreamNode.Closure.UpdatedAt.AsTime()) { spans = append(spans, createOperationSpan(latestUpstreamNode.Closure.UpdatedAt, @@ -466,7 +468,7 @@ func (m *MetricsManager) parseNodeExecutions(ctx context.Context, nodeExecutions // iterate over sorted node executions for _, nodeExecution := range sortedNodeExecutions { specNodeID := nodeExecution.Metadata.SpecNodeId - if specNodeID == "start-node" || specNodeID == "end-node" { + if specNodeID == v1alpha1.StartNodeID || specNodeID == v1alpha1.EndNodeID { continue } @@ -523,7 +525,7 @@ func (m *MetricsManager) parseSubworkflowNodeExecution(ctx context.Context, *spans = append(*spans, createOperationSpan(nodeExecution.Closure.CreatedAt, nodeExecution.Closure.UpdatedAt, nodeSetup)) } else { // frontend overhead - startNode := nodeExecutions["start-node"] + startNode := nodeExecutions[v1alpha1.StartNodeID] *spans = append(*spans, createOperationSpan(nodeExecution.Closure.CreatedAt, startNode.Closure.UpdatedAt, nodeSetup)) // retrieve workflow @@ -539,7 +541,7 @@ func (m *MetricsManager) parseSubworkflowNodeExecution(ctx context.Context, } // backened overhead - latestUpstreamNode := m.getLatestUpstreamNodeExecution("end-node", + latestUpstreamNode := m.getLatestUpstreamNodeExecution(v1alpha1.EndNodeID, workflow.Closure.CompiledWorkflow.Primary.Connections.Upstream, nodeExecutions) if latestUpstreamNode != nil && !nodeExecution.Closure.UpdatedAt.AsTime().Before(latestUpstreamNode.Closure.UpdatedAt.AsTime()) { *spans = append(*spans, createOperationSpan(latestUpstreamNode.Closure.UpdatedAt, nodeExecution.Closure.UpdatedAt, nodeTeardown))