-
Notifications
You must be signed in to change notification settings - Fork 680
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change phase to queue on job submit for webapi plugins #5188
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #5188 +/- ##
==========================================
+ Coverage 59.07% 59.08% +0.01%
==========================================
Files 646 646
Lines 55739 55739
==========================================
+ Hits 32928 32935 +7
+ Misses 20215 20208 -7
Partials 2596 2596
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It works, and it can also show the message now.
return core.PhaseInfoRunningWithReason(core.DefaultPhaseVersion, resource.Message, taskInfo), nil
However, this is still great, I've tested it in the sensor agent.
return Resource(phase=TaskExecution.RUNNING, outputs=None, log_links=[TaskLog(uri="sensor_test_uri", name="Sensor Console").to_flyte_idl()])
This is my diff --git a/flyteplugins/go/tasks/pluginmachinery/core/phase.go b/flyteplugins/go/tasks/pluginmachinery/core/phase.go
index 6c80cc4d2..97ea98d41 100644
--- a/flyteplugins/go/tasks/pluginmachinery/core/phase.go
+++ b/flyteplugins/go/tasks/pluginmachinery/core/phase.go
@@ -256,6 +256,12 @@ func PhaseInfoRunning(version uint32, info *TaskInfo) PhaseInfo {
return phaseInfo(PhaseRunning, version, nil, info, false)
}
+func PhaseInfoRunningWithReason(version uint32, reason string, info *TaskInfo) PhaseInfo {
+ pi := phaseInfo(PhaseRunning, version, nil, info, false)
+ pi.reason = reason
+ return pi
+}
+
func PhaseInfoSuccess(info *TaskInfo) PhaseInfo {
return phaseInfo(PhaseSuccess, DefaultPhaseVersion, nil, info, false)
}
diff --git a/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go b/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go
index cc7f15bd8..a8ba8513f 100644
--- a/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go
+++ b/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go
@@ -249,7 +249,7 @@ func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase
case flyteIdl.TaskExecution_INITIALIZING:
return core.PhaseInfoInitializing(time.Now(), core.DefaultPhaseVersion, resource.Message, taskInfo), nil
case flyteIdl.TaskExecution_RUNNING:
- return core.PhaseInfoRunning(core.DefaultPhaseVersion, taskInfo), nil
+ return core.PhaseInfoRunningWithReason(core.DefaultPhaseVersion, resource.Message, taskInfo), nil
case flyteIdl.TaskExecution_SUCCEEDED:
err = writeOutput(ctx, taskCtx, resource.Outputs)
if err != nil {
:...skipping...
diff --git a/flyteplugins/go/tasks/pluginmachinery/core/phase.go b/flyteplugins/go/tasks/pluginmachinery/core/phase.go
index 6c80cc4d2..97ea98d41 100644
--- a/flyteplugins/go/tasks/pluginmachinery/core/phase.go
+++ b/flyteplugins/go/tasks/pluginmachinery/core/phase.go
@@ -256,6 +256,12 @@ func PhaseInfoRunning(version uint32, info *TaskInfo) PhaseInfo {
return phaseInfo(PhaseRunning, version, nil, info, false)
}
+func PhaseInfoRunningWithReason(version uint32, reason string, info *TaskInfo) PhaseInfo {
+ pi := phaseInfo(PhaseRunning, version, nil, info, false)
+ pi.reason = reason
+ return pi
+}
+
func PhaseInfoSuccess(info *TaskInfo) PhaseInfo {
return phaseInfo(PhaseSuccess, DefaultPhaseVersion, nil, info, false)
}
diff --git a/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go b/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go
index cc7f15bd8..a8ba8513f 100644
--- a/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go
+++ b/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go
@@ -249,7 +249,7 @@ func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase
case flyteIdl.TaskExecution_INITIALIZING:
return core.PhaseInfoInitializing(time.Now(), core.DefaultPhaseVersion, resource.Message, taskInfo), nil
case flyteIdl.TaskExecution_RUNNING:
- return core.PhaseInfoRunning(core.DefaultPhaseVersion, taskInfo), nil
+ return core.PhaseInfoRunningWithReason(core.DefaultPhaseVersion, resource.Message, taskInfo), nil
case flyteIdl.TaskExecution_SUCCEEDED:
err = writeOutput(ctx, taskCtx, resource.Outputs)
if err != nil {
~
~
~
~
~
~ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Signed-off-by: Kevin Su <[email protected]>
I changed the initial phase to queue; it will fix the Databricks issue as well. |
Signed-off-by: Kevin Su <[email protected]>
Tracking issue
#3936
Why are the changes needed?
log links are not shown up on FlyteConsole when the task status is running.
It will also fix other webAPI plugins.
propeller only sends the event when the current event is not equal to the previous event, so if
ResourceCache
sets the phase to running here without info, the log link will never show up on flyteconsole when the status is running.What changes were proposed in this pull request?
The phase should be
PhaseQueue
when there is no resource in the cacheItem.The async ResourceCache is empty if the
Get
request is not yet sent to the agent.How was this patch tested?
Local
Setup process
Screenshots
Related PRs
NA
Docs link
NA