Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(venona): report task status to te platform #522

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions charts/cf-runtime/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: v2
description: A Helm chart for Codefresh Runner
name: cf-runtime
version: 7.1.10
version: 7.2.0
keywords:
- codefresh
- runner
Expand All @@ -17,8 +17,8 @@ annotations:
artifacthub.io/containsSecurityUpdates: "false"
# Supported kinds: `added`, `changed`, `deprecated`, `removed`, `fixed`, `security`:
artifacthub.io/changes: |
- kind: fixed
description: "Using cache for multiple tags (Engine 1.174.19)"
- kind: added
description: "Chart version exposed to the `venona` and `engine` pods"
dependencies:
- name: cf-common
repository: oci://quay.io/codefresh/charts
Expand Down
2 changes: 1 addition & 1 deletion charts/cf-runtime/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
## Codefresh Runner

![Version: 7.1.10](https://img.shields.io/badge/Version-7.1.10-informational?style=flat-square)
![Version: 7.2.0](https://img.shields.io/badge/Version-7.2.0-informational?style=flat-square)

Helm chart for deploying [Codefresh Runner](https://codefresh.io/docs/docs/installation/codefresh-runner/) to Kubernetes.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ CODEFRESH_TOKEN:
name: {{ include "runner.fullname" . }}
key: agent-codefresh-token
DOCKER_REGISTRY: {{ .Values.global.imageRegistry }}
RUNTIME_CHART_VERSION: {{ .Chart.Version }}
{{- end }}

{{- define "runner.environment-variables" }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ runtimeScheduler:
CR_6177_FIXER: {{ include "runtime.runtimeImageName" (dict "registry" $imageRegistry "imageFullName" $engineContext.runtimeImages.CR_6177_FIXER) | squote }}
GC_BUILDER_IMAGE: {{ include "runtime.runtimeImageName" (dict "registry" $imageRegistry "imageFullName" $engineContext.runtimeImages.GC_BUILDER_IMAGE) | squote }}
COSIGN_IMAGE_SIGNER_IMAGE: {{ include "runtime.runtimeImageName" (dict "registry" $imageRegistry "imageFullName" $engineContext.runtimeImages.COSIGN_IMAGE_SIGNER_IMAGE) | squote }}
RUNTIME_CHART_VERSION: {{ .Chart.Version }}
{{- with $engineContext.userEnvVars }}
userEnvVars: {{- toYaml . | nindent 4 }}
{{- end }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ tests:
CR_6177_FIXER: 'somedomain.io/alpine:tagoverride'
GC_BUILDER_IMAGE: 'somedomain.io/codefresh/cf-gc-builder:tagoverride'
COSIGN_IMAGE_SIGNER_IMAGE: 'somedomain.io/codefresh/cf-cosign-image-signer:tagoverride'
RUNTIME_CHART_VERSION: [\w.-]+
workflowLimits:
MAXIMUM_ALLOWED_TIME_BEFORE_PRE_STEPS_SUCCESS: 600
MAXIMUM_ALLOWED_WORKFLOW_AGE_BEFORE_TERMINATION: 86400
Expand Down
1 change: 1 addition & 0 deletions charts/cf-runtime/tests/runtime/runtime_onprem_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ tests:
CR_6177_FIXER: 'alpine:tagoverride'
GC_BUILDER_IMAGE: 'quay.io/codefresh/cf-gc-builder:tagoverride'
COSIGN_IMAGE_SIGNER_IMAGE: 'quay.io/codefresh/cf-cosign-image-signer:tagoverride'
RUNTIME_CHART_VERSION: [\w.-]+
workflowLimits:
MAXIMUM_ALLOWED_TIME_BEFORE_PRE_STEPS_SUCCESS: 600
MAXIMUM_ALLOWED_WORKFLOW_AGE_BEFORE_TERMINATION: 86400
Expand Down
1 change: 1 addition & 0 deletions charts/cf-runtime/tests/runtime/runtime_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ tests:
CR_6177_FIXER: 'alpine:tagoverride'
GC_BUILDER_IMAGE: 'quay.io/codefresh/cf-gc-builder:tagoverride'
COSIGN_IMAGE_SIGNER_IMAGE: 'quay.io/codefresh/cf-cosign-image-signer:tagoverride'
RUNTIME_CHART_VERSION: [\w.-]+
userEnvVars:
- name: ALICE
valueFrom:
Expand Down
2 changes: 1 addition & 1 deletion venona/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.10.8
2.0.0
6 changes: 5 additions & 1 deletion venona/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,13 @@ func run(options startOptions) {

httpClient.Transport = monitor.NewRoundTripper(httpClient.Transport)

userAgent := fmt.Sprintf("cf-classic-runner/%s", version)
if runtimeVersion := os.Getenv("RUNTIME_CHART_VERSION"); runtimeVersion != "" {
userAgent += fmt.Sprintf(" cf-classic-runtime/%s", runtimeVersion)
}
httpHeaders := http.Header{}
{
httpHeaders.Add("User-Agent", fmt.Sprintf("codefresh-runner-%s", version))
httpHeaders.Add("User-Agent", userAgent)
}

cf = codefresh.New(codefresh.Options{
Expand Down
43 changes: 33 additions & 10 deletions venona/pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func New(opts *Options) (*Agent, error) {
Monitor: opts.Monitor,
Concurrency: opts.Concurrency,
BufferSize: opts.BufferSize,
Codefresh: opts.Codefresh,
})
return &Agent{
id: id,
Expand Down Expand Up @@ -192,7 +193,7 @@ func (a *Agent) startTaskPullerRoutine(ctx context.Context) {

// perform all agentTasks (in goroutine)
for i := range agentTasks {
a.handleAgentTask(&agentTasks[i])
a.handleAgentTask(ctx, &agentTasks[i])
}

// send all wfTasks to tasksQueue
Expand Down Expand Up @@ -241,6 +242,25 @@ func (a *Agent) reportStatus(ctx context.Context, status codefresh.AgentStatus)
}
}

func (a *Agent) reportTaskStatus(ctx context.Context, taskDef task.Task, err error) {
status := task.TaskStatus{
OccurredAt: time.Now(),
StatusRevision: taskDef.Metadata.CurrentStatusRevision + 1,
}
if err != nil {
status.Status = task.StatusError
status.Reason = err.Error()
status.IsRetriable = true // TODO: make this configurable depending on the error
} else {
status.Status = task.StatusSuccess
}

statusErr := a.cf.ReportTaskStatus(ctx, taskDef.Id, status)
if statusErr != nil {
a.log.Error("failed reporting task status", "error", statusErr, "task", taskDef.Id, "workflow", taskDef.Metadata.WorkflowId)
}
}

func (a *Agent) getTasks(ctx context.Context) (task.Tasks, []*workflow.Workflow) {
tasks := a.pullTasks(ctx)
return a.splitTasks(tasks)
Expand Down Expand Up @@ -276,18 +296,18 @@ func (a *Agent) splitTasks(tasks task.Tasks) (task.Tasks, []*workflow.Workflow)
t.Timeline.Pulled = pullTime
agentTasks = append(agentTasks, t)
case task.TypeCreatePod, task.TypeCreatePVC, task.TypeDeletePod, task.TypeDeletePVC:
wf, ok := wfMap[t.Metadata.Workflow]
wf, ok := wfMap[t.Metadata.WorkflowId]
if !ok {
wf = workflow.New(t.Metadata)
wfMap[t.Metadata.Workflow] = wf
wfMap[t.Metadata.WorkflowId] = wf
}

err := wf.AddTask(&t)
if err != nil {
a.log.Error("failed adding task to workflow", "error", err)
}
default:
a.log.Error("unrecognized task type", "type", t.Type, "tid", t.Metadata.Workflow, "runtime", t.Metadata.ReName)
a.log.Error("unrecognized task type", "type", t.Type, "tid", t.Metadata.WorkflowId, "runtime", t.Metadata.ReName)
}
}

Expand All @@ -313,14 +333,14 @@ func (a *Agent) splitTasks(tasks task.Tasks) (task.Tasks, []*workflow.Workflow)
return agentTasks, workflows
}

func (a *Agent) handleAgentTask(t *task.Task) {
a.log.Info("executing agent task", "tid", t.Metadata.Workflow)
func (a *Agent) handleAgentTask(ctx context.Context, t *task.Task) {
a.log.Info("executing agent task", "tid", t.Metadata.WorkflowId)
a.wg.Add(1)
go func() {
defer a.wg.Done()
txn := task.NewTaskTransaction(a.monitor, t.Metadata)
defer txn.End()
err := a.executeAgentTask(t)
err := a.executeAgentTask(ctx, t)

if err != nil {
a.log.Error(err.Error())
Expand All @@ -330,7 +350,7 @@ func (a *Agent) handleAgentTask(t *task.Task) {
}()
}

func (a *Agent) executeAgentTask(t *task.Task) error {
func (a *Agent) executeAgentTask(ctx context.Context, t *task.Task) error {
t.Timeline.Started = time.Now()
specJSON, err := json.Marshal(t.Spec)
if err != nil {
Expand All @@ -348,9 +368,12 @@ func (a *Agent) executeAgentTask(t *task.Task) error {
}

err = e(&spec, a.log)
if t.Metadata.ShouldReportStatus {
a.reportTaskStatus(ctx, *t, err)
}
sinceCreation, inRunner, processed := t.GetLatency()
a.log.Info("Done handling agent task",
"tid", t.Metadata.Workflow,
"tid", t.Metadata.WorkflowId,
"time since creation", sinceCreation,
"time in runner", inRunner,
"processing time", processed,
Expand Down Expand Up @@ -410,7 +433,7 @@ func proxyRequest(t *task.AgentTask, log logger.Logger) error {
func groupTasks(tasks task.Tasks) map[string]task.Tasks {
candidates := map[string]task.Tasks{}
for _, task := range tasks {
name := task.Metadata.Workflow
name := task.Metadata.WorkflowId
if name == "" {
// If for some reason the task is not related to any workflow
// Might heppen in older versions on Codefresh
Expand Down
14 changes: 7 additions & 7 deletions venona/pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,37 +38,37 @@ func Test_groupTasks(t *testing.T) {
tasks: task.Tasks{
{
Metadata: task.Metadata{
Workflow: "1",
WorkflowId: "1",
},
},
{
Metadata: task.Metadata{
Workflow: "2",
WorkflowId: "2",
},
},
{
Metadata: task.Metadata{
Workflow: "1",
WorkflowId: "1",
},
},
},
want: map[string]task.Tasks{
"1": {
{
Metadata: task.Metadata{
Workflow: "1",
WorkflowId: "1",
},
},
{
Metadata: task.Metadata{
Workflow: "1",
WorkflowId: "1",
},
},
},
"2": {
{
Metadata: task.Metadata{
Workflow: "2",
WorkflowId: "2",
},
},
},
Expand Down Expand Up @@ -259,7 +259,7 @@ func Test_executeAgentTask(t *testing.T) {
a := &Agent{
log: logger.New(logger.Options{}),
}
err := a.executeAgentTask(tt.task)
err := a.executeAgentTask(context.Background(), tt.task)
if err != nil || tt.wantErr != "" {
assert.EqualError(t, err, tt.wantErr)
}
Expand Down
39 changes: 32 additions & 7 deletions venona/pkg/codefresh/codefresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type (
// Codefresh API client
Codefresh interface {
Tasks(ctx context.Context) (task.Tasks, error)
ReportTaskStatus(ctx context.Context, id string, status task.TaskStatus) error
ReportStatus(ctx context.Context, status AgentStatus) error
Host() string
}
Expand Down Expand Up @@ -79,7 +80,10 @@ func New(opts Options) Codefresh {

// Tasks get from Codefresh all latest tasks
func (c cf) Tasks(ctx context.Context) (task.Tasks, error) {
res, err := c.doRequest(ctx, "GET", nil, "api", "agent", c.agentID, "tasks")
query := map[string]string{
"waitForStatusReport": "true",
}
res, err := c.doRequest(ctx, "GET", nil, query, "api", "agent", c.agentID, "tasks")
if err != nil {
return nil, err
}
Expand All @@ -92,6 +96,20 @@ func (c cf) Tasks(ctx context.Context) (task.Tasks, error) {
return tasks, nil
}

func (c cf) ReportTaskStatus(ctx context.Context, id string, status task.TaskStatus) error {
s, err := status.Marshal()
if err != nil {
return fmt.Errorf("failed marshalling when reporting task status: %w", err)
}

_, err = c.doRequest(ctx, "POST", bytes.NewBuffer(s), nil, "api", "agent", c.agentID, "tasks", string(id), "statuses")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is bytes.NewBuffer(s) needed, when s is already []byte?
maybe it is, i am not sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It requires io.Reader interface, so it seems that it's needed

if err != nil {
return fmt.Errorf("failed sending request when reporting task status: %w", err)
}

return nil
}

// Host returns the host
func (c cf) Host() string {
return c.host
Expand All @@ -104,7 +122,7 @@ func (c cf) ReportStatus(ctx context.Context, status AgentStatus) error {
return fmt.Errorf("failed marshalling when reporting status: %w", err)
}

_, err = c.doRequest(ctx, "PUT", bytes.NewBuffer(s), "api", "agent", c.agentID, "status")
_, err = c.doRequest(ctx, "PUT", bytes.NewBuffer(s), nil, "api", "agent", c.agentID, "status")
if err != nil {
return fmt.Errorf("failed sending request when reporting status: %w", err)
}
Expand All @@ -119,7 +137,7 @@ func (c cf) buildErrorFromResponse(status int, body []byte) error {
}
}

func (c cf) prepareURL(paths ...string) (*url.URL, error) {
func (c cf) prepareURL(query map[string]string, paths ...string) (*url.URL, error) {
u, err := url.Parse(c.host)
if err != nil {
return nil, err
Expand All @@ -135,11 +153,18 @@ func (c cf) prepareURL(paths ...string) (*url.URL, error) {

u.Path = path.Join(accPath...)
u.RawPath = path.Join(accRawPath...)

rawQuery := url.Values{}
for k, v := range query {
rawQuery.Set(k, v)
}
u.RawQuery = rawQuery.Encode()

return u, nil
}

func (c cf) prepareRequest(method string, data io.Reader, apis ...string) (*http.Request, error) {
u, err := c.prepareURL(apis...)
func (c cf) prepareRequest(method string, data io.Reader, query map[string]string, apis ...string) (*http.Request, error) {
u, err := c.prepareURL(query, apis...)
if err != nil {
return nil, err
}
Expand All @@ -158,8 +183,8 @@ func (c cf) prepareRequest(method string, data io.Reader, apis ...string) (*http
return req, nil
}

func (c cf) doRequest(ctx context.Context, method string, body io.Reader, apis ...string) ([]byte, error) {
req, err := c.prepareRequest(method, body, apis...)
func (c cf) doRequest(ctx context.Context, method string, body io.Reader, query map[string]string, apis ...string) ([]byte, error) {
req, err := c.prepareRequest(method, body, query, apis...)
if err != nil {
return nil, err
}
Expand Down
44 changes: 44 additions & 0 deletions venona/pkg/codefresh/codefresh_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading