Skip to content

Commit

Permalink
feat(venona): report task status to te platform (#522)
Browse files Browse the repository at this point in the history
  • Loading branch information
masontikhonov authored Dec 2, 2024
1 parent ab1f720 commit 56f3db4
Show file tree
Hide file tree
Showing 23 changed files with 389 additions and 67 deletions.
6 changes: 3 additions & 3 deletions charts/cf-runtime/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: v2
description: A Helm chart for Codefresh Runner
name: cf-runtime
version: 7.1.10
version: 7.2.0
keywords:
- codefresh
- runner
Expand All @@ -17,8 +17,8 @@ annotations:
artifacthub.io/containsSecurityUpdates: "false"
# Supported kinds: `added`, `changed`, `deprecated`, `removed`, `fixed`, `security`:
artifacthub.io/changes: |
- kind: fixed
description: "Using cache for multiple tags (Engine 1.174.19)"
- kind: added
description: "Chart version exposed to the `venona` and `engine` pods"
dependencies:
- name: cf-common
repository: oci://quay.io/codefresh/charts
Expand Down
2 changes: 1 addition & 1 deletion charts/cf-runtime/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
## Codefresh Runner

![Version: 7.1.10](https://img.shields.io/badge/Version-7.1.10-informational?style=flat-square)
![Version: 7.2.0](https://img.shields.io/badge/Version-7.2.0-informational?style=flat-square)

Helm chart for deploying [Codefresh Runner](https://codefresh.io/docs/docs/installation/codefresh-runner/) to Kubernetes.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ CODEFRESH_TOKEN:
name: {{ include "runner.fullname" . }}
key: agent-codefresh-token
DOCKER_REGISTRY: {{ .Values.global.imageRegistry }}
RUNTIME_CHART_VERSION: {{ .Chart.Version }}
{{- end }}

{{- define "runner.environment-variables" }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ runtimeScheduler:
CR_6177_FIXER: {{ include "runtime.runtimeImageName" (dict "registry" $imageRegistry "imageFullName" $engineContext.runtimeImages.CR_6177_FIXER) | squote }}
GC_BUILDER_IMAGE: {{ include "runtime.runtimeImageName" (dict "registry" $imageRegistry "imageFullName" $engineContext.runtimeImages.GC_BUILDER_IMAGE) | squote }}
COSIGN_IMAGE_SIGNER_IMAGE: {{ include "runtime.runtimeImageName" (dict "registry" $imageRegistry "imageFullName" $engineContext.runtimeImages.COSIGN_IMAGE_SIGNER_IMAGE) | squote }}
RUNTIME_CHART_VERSION: {{ .Chart.Version }}
{{- with $engineContext.userEnvVars }}
userEnvVars: {{- toYaml . | nindent 4 }}
{{- end }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ tests:
CR_6177_FIXER: 'somedomain.io/alpine:tagoverride'
GC_BUILDER_IMAGE: 'somedomain.io/codefresh/cf-gc-builder:tagoverride'
COSIGN_IMAGE_SIGNER_IMAGE: 'somedomain.io/codefresh/cf-cosign-image-signer:tagoverride'
RUNTIME_CHART_VERSION: [\w.-]+
workflowLimits:
MAXIMUM_ALLOWED_TIME_BEFORE_PRE_STEPS_SUCCESS: 600
MAXIMUM_ALLOWED_WORKFLOW_AGE_BEFORE_TERMINATION: 86400
Expand Down
1 change: 1 addition & 0 deletions charts/cf-runtime/tests/runtime/runtime_onprem_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ tests:
CR_6177_FIXER: 'alpine:tagoverride'
GC_BUILDER_IMAGE: 'quay.io/codefresh/cf-gc-builder:tagoverride'
COSIGN_IMAGE_SIGNER_IMAGE: 'quay.io/codefresh/cf-cosign-image-signer:tagoverride'
RUNTIME_CHART_VERSION: [\w.-]+
workflowLimits:
MAXIMUM_ALLOWED_TIME_BEFORE_PRE_STEPS_SUCCESS: 600
MAXIMUM_ALLOWED_WORKFLOW_AGE_BEFORE_TERMINATION: 86400
Expand Down
1 change: 1 addition & 0 deletions charts/cf-runtime/tests/runtime/runtime_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ tests:
CR_6177_FIXER: 'alpine:tagoverride'
GC_BUILDER_IMAGE: 'quay.io/codefresh/cf-gc-builder:tagoverride'
COSIGN_IMAGE_SIGNER_IMAGE: 'quay.io/codefresh/cf-cosign-image-signer:tagoverride'
RUNTIME_CHART_VERSION: [\w.-]+
userEnvVars:
- name: ALICE
valueFrom:
Expand Down
2 changes: 1 addition & 1 deletion venona/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.10.8
2.0.0
6 changes: 5 additions & 1 deletion venona/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,13 @@ func run(options startOptions) {

httpClient.Transport = monitor.NewRoundTripper(httpClient.Transport)

userAgent := fmt.Sprintf("cf-classic-runner/%s", version)
if runtimeVersion := os.Getenv("RUNTIME_CHART_VERSION"); runtimeVersion != "" {
userAgent += fmt.Sprintf(" cf-classic-runtime/%s", runtimeVersion)
}
httpHeaders := http.Header{}
{
httpHeaders.Add("User-Agent", fmt.Sprintf("codefresh-runner-%s", version))
httpHeaders.Add("User-Agent", userAgent)
}

cf = codefresh.New(codefresh.Options{
Expand Down
43 changes: 33 additions & 10 deletions venona/pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func New(opts *Options) (*Agent, error) {
Monitor: opts.Monitor,
Concurrency: opts.Concurrency,
BufferSize: opts.BufferSize,
Codefresh: opts.Codefresh,
})
return &Agent{
id: id,
Expand Down Expand Up @@ -192,7 +193,7 @@ func (a *Agent) startTaskPullerRoutine(ctx context.Context) {

// perform all agentTasks (in goroutine)
for i := range agentTasks {
a.handleAgentTask(&agentTasks[i])
a.handleAgentTask(ctx, &agentTasks[i])
}

// send all wfTasks to tasksQueue
Expand Down Expand Up @@ -241,6 +242,25 @@ func (a *Agent) reportStatus(ctx context.Context, status codefresh.AgentStatus)
}
}

func (a *Agent) reportTaskStatus(ctx context.Context, taskDef task.Task, err error) {
status := task.TaskStatus{
OccurredAt: time.Now(),
StatusRevision: taskDef.Metadata.CurrentStatusRevision + 1,
}
if err != nil {
status.Status = task.StatusError
status.Reason = err.Error()
status.IsRetriable = true // TODO: make this configurable depending on the error
} else {
status.Status = task.StatusSuccess
}

statusErr := a.cf.ReportTaskStatus(ctx, taskDef.Id, status)
if statusErr != nil {
a.log.Error("failed reporting task status", "error", statusErr, "task", taskDef.Id, "workflow", taskDef.Metadata.WorkflowId)
}
}

func (a *Agent) getTasks(ctx context.Context) (task.Tasks, []*workflow.Workflow) {
tasks := a.pullTasks(ctx)
return a.splitTasks(tasks)
Expand Down Expand Up @@ -276,18 +296,18 @@ func (a *Agent) splitTasks(tasks task.Tasks) (task.Tasks, []*workflow.Workflow)
t.Timeline.Pulled = pullTime
agentTasks = append(agentTasks, t)
case task.TypeCreatePod, task.TypeCreatePVC, task.TypeDeletePod, task.TypeDeletePVC:
wf, ok := wfMap[t.Metadata.Workflow]
wf, ok := wfMap[t.Metadata.WorkflowId]
if !ok {
wf = workflow.New(t.Metadata)
wfMap[t.Metadata.Workflow] = wf
wfMap[t.Metadata.WorkflowId] = wf
}

err := wf.AddTask(&t)
if err != nil {
a.log.Error("failed adding task to workflow", "error", err)
}
default:
a.log.Error("unrecognized task type", "type", t.Type, "tid", t.Metadata.Workflow, "runtime", t.Metadata.ReName)
a.log.Error("unrecognized task type", "type", t.Type, "tid", t.Metadata.WorkflowId, "runtime", t.Metadata.ReName)
}
}

Expand All @@ -313,14 +333,14 @@ func (a *Agent) splitTasks(tasks task.Tasks) (task.Tasks, []*workflow.Workflow)
return agentTasks, workflows
}

func (a *Agent) handleAgentTask(t *task.Task) {
a.log.Info("executing agent task", "tid", t.Metadata.Workflow)
func (a *Agent) handleAgentTask(ctx context.Context, t *task.Task) {
a.log.Info("executing agent task", "tid", t.Metadata.WorkflowId)
a.wg.Add(1)
go func() {
defer a.wg.Done()
txn := task.NewTaskTransaction(a.monitor, t.Metadata)
defer txn.End()
err := a.executeAgentTask(t)
err := a.executeAgentTask(ctx, t)

if err != nil {
a.log.Error(err.Error())
Expand All @@ -330,7 +350,7 @@ func (a *Agent) handleAgentTask(t *task.Task) {
}()
}

func (a *Agent) executeAgentTask(t *task.Task) error {
func (a *Agent) executeAgentTask(ctx context.Context, t *task.Task) error {
t.Timeline.Started = time.Now()
specJSON, err := json.Marshal(t.Spec)
if err != nil {
Expand All @@ -348,9 +368,12 @@ func (a *Agent) executeAgentTask(t *task.Task) error {
}

err = e(&spec, a.log)
if t.Metadata.ShouldReportStatus {
a.reportTaskStatus(ctx, *t, err)
}
sinceCreation, inRunner, processed := t.GetLatency()
a.log.Info("Done handling agent task",
"tid", t.Metadata.Workflow,
"tid", t.Metadata.WorkflowId,
"time since creation", sinceCreation,
"time in runner", inRunner,
"processing time", processed,
Expand Down Expand Up @@ -410,7 +433,7 @@ func proxyRequest(t *task.AgentTask, log logger.Logger) error {
func groupTasks(tasks task.Tasks) map[string]task.Tasks {
candidates := map[string]task.Tasks{}
for _, task := range tasks {
name := task.Metadata.Workflow
name := task.Metadata.WorkflowId
if name == "" {
// If for some reason the task is not related to any workflow
// Might heppen in older versions on Codefresh
Expand Down
14 changes: 7 additions & 7 deletions venona/pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,37 +38,37 @@ func Test_groupTasks(t *testing.T) {
tasks: task.Tasks{
{
Metadata: task.Metadata{
Workflow: "1",
WorkflowId: "1",
},
},
{
Metadata: task.Metadata{
Workflow: "2",
WorkflowId: "2",
},
},
{
Metadata: task.Metadata{
Workflow: "1",
WorkflowId: "1",
},
},
},
want: map[string]task.Tasks{
"1": {
{
Metadata: task.Metadata{
Workflow: "1",
WorkflowId: "1",
},
},
{
Metadata: task.Metadata{
Workflow: "1",
WorkflowId: "1",
},
},
},
"2": {
{
Metadata: task.Metadata{
Workflow: "2",
WorkflowId: "2",
},
},
},
Expand Down Expand Up @@ -259,7 +259,7 @@ func Test_executeAgentTask(t *testing.T) {
a := &Agent{
log: logger.New(logger.Options{}),
}
err := a.executeAgentTask(tt.task)
err := a.executeAgentTask(context.Background(), tt.task)
if err != nil || tt.wantErr != "" {
assert.EqualError(t, err, tt.wantErr)
}
Expand Down
39 changes: 32 additions & 7 deletions venona/pkg/codefresh/codefresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type (
// Codefresh API client
Codefresh interface {
Tasks(ctx context.Context) (task.Tasks, error)
ReportTaskStatus(ctx context.Context, id string, status task.TaskStatus) error
ReportStatus(ctx context.Context, status AgentStatus) error
Host() string
}
Expand Down Expand Up @@ -79,7 +80,10 @@ func New(opts Options) Codefresh {

// Tasks get from Codefresh all latest tasks
func (c cf) Tasks(ctx context.Context) (task.Tasks, error) {
res, err := c.doRequest(ctx, "GET", nil, "api", "agent", c.agentID, "tasks")
query := map[string]string{
"waitForStatusReport": "true",
}
res, err := c.doRequest(ctx, "GET", nil, query, "api", "agent", c.agentID, "tasks")
if err != nil {
return nil, err
}
Expand All @@ -92,6 +96,20 @@ func (c cf) Tasks(ctx context.Context) (task.Tasks, error) {
return tasks, nil
}

func (c cf) ReportTaskStatus(ctx context.Context, id string, status task.TaskStatus) error {
s, err := status.Marshal()
if err != nil {
return fmt.Errorf("failed marshalling when reporting task status: %w", err)
}

_, err = c.doRequest(ctx, "POST", bytes.NewBuffer(s), nil, "api", "agent", c.agentID, "tasks", string(id), "statuses")
if err != nil {
return fmt.Errorf("failed sending request when reporting task status: %w", err)
}

return nil
}

// Host returns the host
func (c cf) Host() string {
return c.host
Expand All @@ -104,7 +122,7 @@ func (c cf) ReportStatus(ctx context.Context, status AgentStatus) error {
return fmt.Errorf("failed marshalling when reporting status: %w", err)
}

_, err = c.doRequest(ctx, "PUT", bytes.NewBuffer(s), "api", "agent", c.agentID, "status")
_, err = c.doRequest(ctx, "PUT", bytes.NewBuffer(s), nil, "api", "agent", c.agentID, "status")
if err != nil {
return fmt.Errorf("failed sending request when reporting status: %w", err)
}
Expand All @@ -119,7 +137,7 @@ func (c cf) buildErrorFromResponse(status int, body []byte) error {
}
}

func (c cf) prepareURL(paths ...string) (*url.URL, error) {
func (c cf) prepareURL(query map[string]string, paths ...string) (*url.URL, error) {
u, err := url.Parse(c.host)
if err != nil {
return nil, err
Expand All @@ -135,11 +153,18 @@ func (c cf) prepareURL(paths ...string) (*url.URL, error) {

u.Path = path.Join(accPath...)
u.RawPath = path.Join(accRawPath...)

rawQuery := url.Values{}
for k, v := range query {
rawQuery.Set(k, v)
}
u.RawQuery = rawQuery.Encode()

return u, nil
}

func (c cf) prepareRequest(method string, data io.Reader, apis ...string) (*http.Request, error) {
u, err := c.prepareURL(apis...)
func (c cf) prepareRequest(method string, data io.Reader, query map[string]string, apis ...string) (*http.Request, error) {
u, err := c.prepareURL(query, apis...)
if err != nil {
return nil, err
}
Expand All @@ -158,8 +183,8 @@ func (c cf) prepareRequest(method string, data io.Reader, apis ...string) (*http
return req, nil
}

func (c cf) doRequest(ctx context.Context, method string, body io.Reader, apis ...string) ([]byte, error) {
req, err := c.prepareRequest(method, body, apis...)
func (c cf) doRequest(ctx context.Context, method string, body io.Reader, query map[string]string, apis ...string) ([]byte, error) {
req, err := c.prepareRequest(method, body, query, apis...)
if err != nil {
return nil, err
}
Expand Down
44 changes: 44 additions & 0 deletions venona/pkg/codefresh/codefresh_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 56f3db4

Please sign in to comment.