Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(venona): report task status to te platform #522

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions charts/cf-runtime/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: v2
description: A Helm chart for Codefresh Runner
name: cf-runtime
version: 7.1.8
version: 7.2.0
keywords:
- codefresh
- runner
Expand All @@ -14,7 +14,7 @@ maintainers:
url: https://codefresh-io.github.io/
annotations:
# 💡 Do not forget to update this annotation:
artifacthub.io/containsSecurityUpdates: "true"
artifacthub.io/containsSecurityUpdates: "false"
# Supported kinds: `added`, `changed`, `deprecated`, `removed`, `fixed`, `security`:
artifacthub.io/changes: |
- kind: security
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ CODEFRESH_TOKEN:
name: {{ include "runner.fullname" . }}
key: agent-codefresh-token
DOCKER_REGISTRY: {{ .Values.global.imageRegistry }}
RUNTIME_CHART_VERSION: {{ .Chart.Version }}
{{- end }}

{{- define "runner.environment-variables" }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ runtimeScheduler:
CR_6177_FIXER: {{ include "runtime.runtimeImageName" (dict "registry" $imageRegistry "imageFullName" $engineContext.runtimeImages.CR_6177_FIXER) | squote }}
GC_BUILDER_IMAGE: {{ include "runtime.runtimeImageName" (dict "registry" $imageRegistry "imageFullName" $engineContext.runtimeImages.GC_BUILDER_IMAGE) | squote }}
COSIGN_IMAGE_SIGNER_IMAGE: {{ include "runtime.runtimeImageName" (dict "registry" $imageRegistry "imageFullName" $engineContext.runtimeImages.COSIGN_IMAGE_SIGNER_IMAGE) | squote }}
RUNTIME_CHART_VERSION: {{ .Chart.Version }}
{{- with $engineContext.userEnvVars }}
userEnvVars: {{- toYaml . | nindent 4 }}
{{- end }}
Expand Down
2 changes: 1 addition & 1 deletion venona/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.10.8
2.0.0
2 changes: 1 addition & 1 deletion venona/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func run(options startOptions) {

httpHeaders := http.Header{}
{
httpHeaders.Add("User-Agent", fmt.Sprintf("codefresh-runner-%s", version))
httpHeaders.Add("User-Agent", fmt.Sprintf("cf-classic-runner/%s,cf-classic-runtime/%s", version, os.Getenv("RUNTIME_CHART_VERSION")))
}

cf = codefresh.New(codefresh.Options{
Expand Down
43 changes: 33 additions & 10 deletions venona/pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func New(opts *Options) (*Agent, error) {
Monitor: opts.Monitor,
Concurrency: opts.Concurrency,
BufferSize: opts.BufferSize,
Codefresh: opts.Codefresh,
})
return &Agent{
id: id,
Expand Down Expand Up @@ -192,7 +193,7 @@ func (a *Agent) startTaskPullerRoutine(ctx context.Context) {

// perform all agentTasks (in goroutine)
for i := range agentTasks {
a.handleAgentTask(&agentTasks[i])
a.handleAgentTask(ctx, &agentTasks[i])
}

// send all wfTasks to tasksQueue
Expand Down Expand Up @@ -241,6 +242,25 @@ func (a *Agent) reportStatus(ctx context.Context, status codefresh.AgentStatus)
}
}

func (a *Agent) reportTaskStatus(ctx context.Context, taskDef task.Task, err error) {
status := task.TaskStatus{
OccurredAt: time.Now(),
StatusRevision: taskDef.Metadata.CurrentStatusRevision + 1,
}
if err != nil {
status.Status = task.StatusError
status.Reason = err.Error()
status.IsRetriable = true // TODO: make this configurable depending on the error
} else {
status.Status = task.StatusSuccess
}

statusErr := a.cf.ReportTaskStatus(ctx, taskDef.Id, status)
if statusErr != nil {
a.log.Error("failed reporting task status", "error", statusErr, "task", taskDef.Id, "workflow", taskDef.Metadata.WorkflowId)
}
}

func (a *Agent) getTasks(ctx context.Context) (task.Tasks, []*workflow.Workflow) {
tasks := a.pullTasks(ctx)
return a.splitTasks(tasks)
Expand Down Expand Up @@ -276,18 +296,18 @@ func (a *Agent) splitTasks(tasks task.Tasks) (task.Tasks, []*workflow.Workflow)
t.Timeline.Pulled = pullTime
agentTasks = append(agentTasks, t)
case task.TypeCreatePod, task.TypeCreatePVC, task.TypeDeletePod, task.TypeDeletePVC:
wf, ok := wfMap[t.Metadata.Workflow]
wf, ok := wfMap[t.Metadata.WorkflowId]
if !ok {
wf = workflow.New(t.Metadata)
wfMap[t.Metadata.Workflow] = wf
wfMap[t.Metadata.WorkflowId] = wf
}

err := wf.AddTask(&t)
if err != nil {
a.log.Error("failed adding task to workflow", "error", err)
}
default:
a.log.Error("unrecognized task type", "type", t.Type, "tid", t.Metadata.Workflow, "runtime", t.Metadata.ReName)
a.log.Error("unrecognized task type", "type", t.Type, "tid", t.Metadata.WorkflowId, "runtime", t.Metadata.ReName)
}
}

Expand All @@ -313,14 +333,14 @@ func (a *Agent) splitTasks(tasks task.Tasks) (task.Tasks, []*workflow.Workflow)
return agentTasks, workflows
}

func (a *Agent) handleAgentTask(t *task.Task) {
a.log.Info("executing agent task", "tid", t.Metadata.Workflow)
func (a *Agent) handleAgentTask(ctx context.Context, t *task.Task) {
a.log.Info("executing agent task", "tid", t.Metadata.WorkflowId)
a.wg.Add(1)
go func() {
defer a.wg.Done()
txn := task.NewTaskTransaction(a.monitor, t.Metadata)
defer txn.End()
err := a.executeAgentTask(t)
err := a.executeAgentTask(ctx, t)

if err != nil {
a.log.Error(err.Error())
Expand All @@ -330,7 +350,7 @@ func (a *Agent) handleAgentTask(t *task.Task) {
}()
}

func (a *Agent) executeAgentTask(t *task.Task) error {
func (a *Agent) executeAgentTask(ctx context.Context, t *task.Task) error {
t.Timeline.Started = time.Now()
specJSON, err := json.Marshal(t.Spec)
if err != nil {
Expand All @@ -348,9 +368,12 @@ func (a *Agent) executeAgentTask(t *task.Task) error {
}

err = e(&spec, a.log)
if t.Metadata.ShouldReportStatus {
a.reportTaskStatus(ctx, *t, err)
}
sinceCreation, inRunner, processed := t.GetLatency()
a.log.Info("Done handling agent task",
"tid", t.Metadata.Workflow,
"tid", t.Metadata.WorkflowId,
"time since creation", sinceCreation,
"time in runner", inRunner,
"processing time", processed,
Expand Down Expand Up @@ -410,7 +433,7 @@ func proxyRequest(t *task.AgentTask, log logger.Logger) error {
func groupTasks(tasks task.Tasks) map[string]task.Tasks {
candidates := map[string]task.Tasks{}
for _, task := range tasks {
name := task.Metadata.Workflow
name := task.Metadata.WorkflowId
if name == "" {
// If for some reason the task is not related to any workflow
// Might heppen in older versions on Codefresh
Expand Down
12 changes: 6 additions & 6 deletions venona/pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,37 +38,37 @@ func Test_groupTasks(t *testing.T) {
tasks: task.Tasks{
{
Metadata: task.Metadata{
Workflow: "1",
WorkflowId: "1",
},
},
{
Metadata: task.Metadata{
Workflow: "2",
WorkflowId: "2",
},
},
{
Metadata: task.Metadata{
Workflow: "1",
WorkflowId: "1",
},
},
},
want: map[string]task.Tasks{
"1": {
{
Metadata: task.Metadata{
Workflow: "1",
WorkflowId: "1",
},
},
{
Metadata: task.Metadata{
Workflow: "1",
WorkflowId: "1",
},
},
},
"2": {
{
Metadata: task.Metadata{
Workflow: "2",
WorkflowId: "2",
},
},
},
Expand Down
39 changes: 32 additions & 7 deletions venona/pkg/codefresh/codefresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type (
// Codefresh API client
Codefresh interface {
Tasks(ctx context.Context) (task.Tasks, error)
ReportTaskStatus(ctx context.Context, id task.Id, status task.TaskStatus) error
ReportStatus(ctx context.Context, status AgentStatus) error
Host() string
}
Expand Down Expand Up @@ -79,7 +80,10 @@ func New(opts Options) Codefresh {

// Tasks get from Codefresh all latest tasks
func (c cf) Tasks(ctx context.Context) (task.Tasks, error) {
res, err := c.doRequest(ctx, "GET", nil, "api", "agent", c.agentID, "tasks")
query := map[string]string{
"waitForStatusReport": "true",
}
res, err := c.doRequest(ctx, "GET", nil, query, "api", "agent", c.agentID, "tasks")
if err != nil {
return nil, err
}
Expand All @@ -92,6 +96,20 @@ func (c cf) Tasks(ctx context.Context) (task.Tasks, error) {
return tasks, nil
}

func (c cf) ReportTaskStatus(ctx context.Context, id task.Id, status task.TaskStatus) error {
s, err := status.Marshal()
if err != nil {
return fmt.Errorf("failed marshalling when reporting task status: %w", err)
}

_, err = c.doRequest(ctx, "POST", bytes.NewBuffer(s), nil, "api", "agent", c.agentID, "tasks", string(id), "statuses")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is bytes.NewBuffer(s) needed, when s is already []byte?
maybe it is, i am not sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It requires io.Reader interface, so it seems that it's needed

if err != nil {
return fmt.Errorf("failed sending request when reporting task status: %w", err)
}

return nil
}

// Host returns the host
func (c cf) Host() string {
return c.host
Expand All @@ -104,7 +122,7 @@ func (c cf) ReportStatus(ctx context.Context, status AgentStatus) error {
return fmt.Errorf("failed marshalling when reporting status: %w", err)
}

_, err = c.doRequest(ctx, "PUT", bytes.NewBuffer(s), "api", "agent", c.agentID, "status")
_, err = c.doRequest(ctx, "PUT", bytes.NewBuffer(s), nil, "api", "agent", c.agentID, "status")
if err != nil {
return fmt.Errorf("failed sending request when reporting status: %w", err)
}
Expand All @@ -119,7 +137,7 @@ func (c cf) buildErrorFromResponse(status int, body []byte) error {
}
}

func (c cf) prepareURL(paths ...string) (*url.URL, error) {
func (c cf) prepareURL(query map[string]string, paths ...string) (*url.URL, error) {
u, err := url.Parse(c.host)
if err != nil {
return nil, err
Expand All @@ -135,11 +153,18 @@ func (c cf) prepareURL(paths ...string) (*url.URL, error) {

u.Path = path.Join(accPath...)
u.RawPath = path.Join(accRawPath...)

rawQuery := url.Values{}
for k, v := range query {
rawQuery.Set(k, v)
}
u.RawQuery = rawQuery.Encode()

return u, nil
}

func (c cf) prepareRequest(method string, data io.Reader, apis ...string) (*http.Request, error) {
u, err := c.prepareURL(apis...)
func (c cf) prepareRequest(method string, data io.Reader, query map[string]string, apis ...string) (*http.Request, error) {
u, err := c.prepareURL(query, apis...)
if err != nil {
return nil, err
}
Expand All @@ -158,8 +183,8 @@ func (c cf) prepareRequest(method string, data io.Reader, apis ...string) (*http
return req, nil
}

func (c cf) doRequest(ctx context.Context, method string, body io.Reader, apis ...string) ([]byte, error) {
req, err := c.prepareRequest(method, body, apis...)
func (c cf) doRequest(ctx context.Context, method string, body io.Reader, query map[string]string, apis ...string) ([]byte, error) {
req, err := c.prepareRequest(method, body, query, apis...)
if err != nil {
return nil, err
}
Expand Down
26 changes: 25 additions & 1 deletion venona/pkg/codefresh/codefresh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func Test_cf_prepareURL(t *testing.T) {
tests := map[string]struct {
fields args
paths []string
query map[string]string
want *url.URL
wantErr bool
}{
Expand All @@ -94,6 +95,29 @@ func Test_cf_prepareURL(t *testing.T) {
wantErr: false,
want: mustURL("http://url/docker:desktop%2Fserver"),
},
"Append query": {
query: map[string]string{
"key": "value",
"keyTwo": "valueTwo",
},
paths: []string{"docker:desktop/server"},
fields: args{
host: "http://url",
},
wantErr: false,
want: mustURL("http://url/docker:desktop%2Fserver?key=value&keyTwo=valueTwo"),
},
"Escape query": {
query: map[string]string{
"ke+y": "va+lu=e",
},
paths: []string{"docker:desktop/server"},
fields: args{
host: "http://url",
},
wantErr: false,
want: mustURL("http://url/docker:desktop%2Fserver?ke%2By=va%2Blu%3De"),
},
}
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
Expand All @@ -103,7 +127,7 @@ func Test_cf_prepareURL(t *testing.T) {
agentID: tt.fields.agentID,
httpClient: tt.fields.httpClient,
}
url, err := c.prepareURL(tt.paths...)
url, err := c.prepareURL(tt.query, tt.paths...)
if tt.wantErr {
assert.Error(t, err)
}
Expand Down
Loading