diff --git a/venona/VERSION b/venona/VERSION index ccff224b7..227cea215 100644 --- a/venona/VERSION +++ b/venona/VERSION @@ -1 +1 @@ -1.10.7 +2.0.0 diff --git a/venona/cmd/start.go b/venona/cmd/start.go index 566b8f190..d7a061f15 100644 --- a/venona/cmd/start.go +++ b/venona/cmd/start.go @@ -232,7 +232,7 @@ func run(options startOptions) { httpHeaders := http.Header{} { - httpHeaders.Add("User-Agent", fmt.Sprintf("codefresh-runner-%s", version)) + httpHeaders.Add("User-Agent", fmt.Sprintf("cf-classic-runner/%s", version)) } cf = codefresh.New(codefresh.Options{ diff --git a/venona/pkg/agent/agent.go b/venona/pkg/agent/agent.go index 398d0b128..cbf88c570 100644 --- a/venona/pkg/agent/agent.go +++ b/venona/pkg/agent/agent.go @@ -125,6 +125,7 @@ func New(opts *Options) (*Agent, error) { Monitor: opts.Monitor, Concurrency: opts.Concurrency, BufferSize: opts.BufferSize, + Codefresh: opts.Codefresh, }) return &Agent{ id: id, @@ -241,6 +242,13 @@ func (a *Agent) reportStatus(ctx context.Context, status codefresh.AgentStatus) } } +func (a *Agent) reportTaskStatus(ctx context.Context, id task.Id, status task.TaskStatus) { + err := a.cf.ReportTaskStatus(ctx, id, status) + if err != nil { + a.log.Error("Failed reporting task status", "error", err) + } +} + func (a *Agent) getTasks(ctx context.Context) (task.Tasks, []*workflow.Workflow) { tasks := a.pullTasks(ctx) return a.splitTasks(tasks) @@ -276,10 +284,10 @@ func (a *Agent) splitTasks(tasks task.Tasks) (task.Tasks, []*workflow.Workflow) t.Timeline.Pulled = pullTime agentTasks = append(agentTasks, t) case task.TypeCreatePod, task.TypeCreatePVC, task.TypeDeletePod, task.TypeDeletePVC: - wf, ok := wfMap[t.Metadata.Workflow] + wf, ok := wfMap[t.Metadata.WorkflowId] if !ok { wf = workflow.New(t.Metadata) - wfMap[t.Metadata.Workflow] = wf + wfMap[t.Metadata.WorkflowId] = wf } err := wf.AddTask(&t) @@ -287,7 +295,7 @@ func (a *Agent) splitTasks(tasks task.Tasks) (task.Tasks, []*workflow.Workflow) a.log.Error("failed adding task to workflow", "error", err) } default: - a.log.Error("unrecognized task type", "type", t.Type, "tid", t.Metadata.Workflow, "runtime", t.Metadata.ReName) + a.log.Error("unrecognized task type", "type", t.Type, "tid", t.Metadata.WorkflowId, "runtime", t.Metadata.ReName) } } @@ -314,7 +322,7 @@ func (a *Agent) splitTasks(tasks task.Tasks) (task.Tasks, []*workflow.Workflow) } func (a *Agent) handleAgentTask(t *task.Task) { - a.log.Info("executing agent task", "tid", t.Metadata.Workflow) + a.log.Info("executing agent task", "tid", t.Metadata.WorkflowId) a.wg.Add(1) go func() { defer a.wg.Done() @@ -350,7 +358,7 @@ func (a *Agent) executeAgentTask(t *task.Task) error { err = e(&spec, a.log) sinceCreation, inRunner, processed := t.GetLatency() a.log.Info("Done handling agent task", - "tid", t.Metadata.Workflow, + "tid", t.Metadata.WorkflowId, "time since creation", sinceCreation, "time in runner", inRunner, "processing time", processed, @@ -410,7 +418,7 @@ func proxyRequest(t *task.AgentTask, log logger.Logger) error { func groupTasks(tasks task.Tasks) map[string]task.Tasks { candidates := map[string]task.Tasks{} for _, task := range tasks { - name := task.Metadata.Workflow + name := task.Metadata.WorkflowId if name == "" { // If for some reason the task is not related to any workflow // Might heppen in older versions on Codefresh diff --git a/venona/pkg/agent/agent_test.go b/venona/pkg/agent/agent_test.go index 51576efe0..2d6a14380 100644 --- a/venona/pkg/agent/agent_test.go +++ b/venona/pkg/agent/agent_test.go @@ -38,17 +38,17 @@ func Test_groupTasks(t *testing.T) { tasks: task.Tasks{ { Metadata: task.Metadata{ - Workflow: "1", + WorkflowId: "1", }, }, { Metadata: task.Metadata{ - Workflow: "2", + WorkflowId: "2", }, }, { Metadata: task.Metadata{ - Workflow: "1", + WorkflowId: "1", }, }, }, @@ -56,19 +56,19 @@ func Test_groupTasks(t *testing.T) { "1": { { Metadata: task.Metadata{ - Workflow: "1", + WorkflowId: "1", }, }, { Metadata: task.Metadata{ - Workflow: "1", + WorkflowId: "1", }, }, }, "2": { { Metadata: task.Metadata{ - Workflow: "2", + WorkflowId: "2", }, }, }, diff --git a/venona/pkg/codefresh/codefresh.go b/venona/pkg/codefresh/codefresh.go index be30c7876..a794ab9b2 100644 --- a/venona/pkg/codefresh/codefresh.go +++ b/venona/pkg/codefresh/codefresh.go @@ -34,6 +34,7 @@ type ( // Codefresh API client Codefresh interface { Tasks(ctx context.Context) (task.Tasks, error) + ReportTaskStatus(ctx context.Context, id task.Id, status task.TaskStatus) error ReportStatus(ctx context.Context, status AgentStatus) error Host() string } @@ -92,6 +93,20 @@ func (c cf) Tasks(ctx context.Context) (task.Tasks, error) { return tasks, nil } +func (c cf) ReportTaskStatus(ctx context.Context, id task.Id, status task.TaskStatus) error { + s, err := status.Marshal() + if err != nil { + return fmt.Errorf("failed marshalling when reporting task status: %w", err) + } + + _, err = c.doRequest(ctx, "POST", bytes.NewBuffer(s), "api", "agent", c.agentID, "tasks", string(id), "statuses") + if err != nil { + return fmt.Errorf("failed sending request when reporting task status: %w", err) + } + + return nil +} + // Host returns the host func (c cf) Host() string { return c.host diff --git a/venona/pkg/queue/queue.go b/venona/pkg/queue/queue.go index 5386f9aaa..046cba6dc 100644 --- a/venona/pkg/queue/queue.go +++ b/venona/pkg/queue/queue.go @@ -20,6 +20,7 @@ import ( "sync" "time" + "github.com/codefresh-io/go/venona/pkg/codefresh" "github.com/codefresh-io/go/venona/pkg/logger" "github.com/codefresh-io/go/venona/pkg/metrics" "github.com/codefresh-io/go/venona/pkg/monitoring" @@ -45,6 +46,7 @@ type ( Monitor monitoring.Monitor Concurrency int BufferSize int + Codefresh codefresh.Codefresh } wfQueueImpl struct { @@ -57,6 +59,7 @@ type ( stop []chan bool activeWorkflows map[string]struct{} mutex sync.Mutex + cf codefresh.Codefresh } ) @@ -73,6 +76,7 @@ func New(opts *Options) WorkflowQueue { concurrency: opts.Concurrency, stop: make([]chan bool, opts.Concurrency), activeWorkflows: make(map[string]struct{}), + cf: opts.Codefresh, } } @@ -116,22 +120,22 @@ func (wfq *wfQueueImpl) handleChannel(ctx context.Context, stopChan chan bool, i ctxCancelled = true case wf := <-wfq.queue: wfq.mutex.Lock() - if _, ok := wfq.activeWorkflows[wf.Metadata.Workflow]; ok { + if _, ok := wfq.activeWorkflows[wf.Metadata.WorkflowId]; ok { // Workflow is already being handled, enqueue it again and skip processing wfq.mutex.Unlock() - wfq.log.Info("Workflow", wf.Metadata.Workflow, " is already being handled, enqueue it again and skip processing") + wfq.log.Info("Workflow", wf.Metadata.WorkflowId, " is already being handled, enqueue it again and skip processing") time.Sleep(100 * time.Millisecond) wfq.Enqueue(wf) continue } // Mark the workflow as active - wfq.activeWorkflows[wf.Metadata.Workflow] = struct{}{} + wfq.activeWorkflows[wf.Metadata.WorkflowId] = struct{}{} wfq.mutex.Unlock() - wfq.log.Info("handling workflow", "handlerId", id, "workflow", wf.Metadata.Workflow) + wfq.log.Info("handling workflow", "handlerId", id, "workflow", wf.Metadata.WorkflowId) wfq.handleWorkflow(ctx, wf) wfq.mutex.Lock() - delete(wfq.activeWorkflows, wf.Metadata.Workflow) + delete(wfq.activeWorkflows, wf.Metadata.WorkflowId) wfq.mutex.Unlock() default: if ctxCancelled { @@ -149,7 +153,7 @@ func (wfq *wfQueueImpl) handleWorkflow(ctx context.Context, wf *workflow.Workflo txn := task.NewTaskTransaction(wfq.monitor, wf.Metadata) defer txn.End() - workflow := wf.Metadata.Workflow + workflow := wf.Metadata.WorkflowId reName := wf.Metadata.ReName runtime, ok := wfq.runtimes[reName] if !ok { @@ -159,16 +163,31 @@ func (wfq *wfQueueImpl) handleWorkflow(ctx context.Context, wf *workflow.Workflo } for i := range wf.Tasks { - err := runtime.HandleTask(ctx, wf.Tasks[i]) + taskDef := wf.Tasks[i] + err := runtime.HandleTask(ctx, taskDef) + status := task.TaskStatus{ + OccurredAt: time.Now(), + StatusRevision: taskDef.Metadata.CurrentStatusRevision + 1, + } if err != nil { wfq.log.Error("failed handling task", "error", err, "workflow", workflow) txn.NoticeError(errRuntimeNotFound) + status.Status = task.StatusError + status.Reason = err.Error() + status.IsRetriable = true // TODO: make this configurable depending on the error + } else { + status.Status = task.StatusSuccess + } + statusErr := wfq.cf.ReportTaskStatus(ctx, taskDef.Id, status) + if statusErr != nil { + wfq.log.Error("failed reporting task status", "error", statusErr, "task", taskDef.Id, "workflow", workflow) + txn.NoticeError(statusErr) } } sinceCreation, inRunner, processed := wf.GetLatency() wfq.log.Info("Done handling workflow", - "workflow", wf.Metadata.Workflow, + "workflow", wf.Metadata.WorkflowId, "runtime", wf.Metadata.ReName, "time since creation", sinceCreation, "time in runner", inRunner, diff --git a/venona/pkg/queue/queue_test.go b/venona/pkg/queue/queue_test.go index ff0cf2aa8..b8e8f420d 100644 --- a/venona/pkg/queue/queue_test.go +++ b/venona/pkg/queue/queue_test.go @@ -33,8 +33,8 @@ import ( func makeWorkflow(wfID string, numOfTasks int) *workflow.Workflow { metadata := task.Metadata{ - Workflow: wfID, - ReName: "some-rt", + WorkflowId: wfID, + ReName: "some-rt", } wf := workflow.New(metadata) for i := 0; i < numOfTasks; i++ { diff --git a/venona/pkg/task/task.go b/venona/pkg/task/task.go index 1780d6df5..ed9073d39 100644 --- a/venona/pkg/task/task.go +++ b/venona/pkg/task/task.go @@ -31,6 +31,11 @@ const ( TypeAgentTask Type = "AgentTask" ) +const ( + StatusSuccess Status = "Success" + StatusError Status = "Error" +) + type ( // Tasks array Tasks []Task @@ -38,8 +43,15 @@ type ( // Type of the task Type string + // Task Id + Id string + + // Task status + Status string + // Task options Task struct { + Id Id `json:"_id"` Type Type `json:"type"` Metadata Metadata `json:"metadata"` Spec interface{} `json:"spec"` @@ -50,9 +62,10 @@ type ( // Metadata options Metadata struct { - CreatedAt string `json:"createdAt"` - ReName string `json:"reName"` - Workflow string `json:"workflow"` + CreatedAt string `json:"createdAt"` + ReName string `json:"reName"` + WorkflowId string `json:"workflowId"` + CurrentStatusRevision int `json:"currentStatusRevision"` } // Timeline values @@ -66,6 +79,14 @@ type ( Type string `json:"type"` Params map[string]interface{} `json:"params"` } + + TaskStatus struct { + Status Status `json:"status"` + OccurredAt time.Time `json:"occurredAt"` + StatusRevision int `json:"statusRevision"` + IsRetriable bool `json:"isRetriabe"` + Reason string `json:"reason"` + } ) // UnmarshalTasks with json @@ -80,6 +101,10 @@ func (r *Tasks) Marshal() ([]byte, error) { return json.Marshal(r) } +func (r *TaskStatus) Marshal() ([]byte, error) { + return json.Marshal(r) +} + // Less compares two tasks by their CreatedAt values func Less(task1 Task, task2 Task) bool { return task1.Metadata.CreatedAt < task2.Metadata.CreatedAt @@ -88,7 +113,7 @@ func Less(task1 Task, task2 Task) bool { // NewTaskTransaction creates a new transaction with task-specific attributes func NewTaskTransaction(monitor monitoring.Monitor, m Metadata) monitoring.Transaction { txn := monitor.NewTransaction("runner-tasks-execution") - txn.AddAttribute("tid", m.Workflow) + txn.AddAttribute("tid", m.WorkflowId) txn.AddAttribute("runtime-environment", m.ReName) return txn } diff --git a/venona/pkg/workflow/workflow.go b/venona/pkg/workflow/workflow.go index b6e14e46a..ea83c337d 100644 --- a/venona/pkg/workflow/workflow.go +++ b/venona/pkg/workflow/workflow.go @@ -52,8 +52,8 @@ func New(metadata task.Metadata) *Workflow { // AddTask adds a specific task to its matching parent worklow func (wf *Workflow) AddTask(t *task.Task) error { - if wf.Metadata.ReName != t.Metadata.ReName || wf.Metadata.Workflow != t.Metadata.Workflow { - return fmt.Errorf("mismatch runtime or workflow id, %s/%s is different from %s/%s", wf.Metadata.ReName, wf.Metadata.Workflow, t.Metadata.ReName, t.Metadata.Workflow) + if wf.Metadata.ReName != t.Metadata.ReName || wf.Metadata.WorkflowId != t.Metadata.WorkflowId { + return fmt.Errorf("mismatch runtime or workflow id, %s/%s is different from %s/%s", wf.Metadata.ReName, wf.Metadata.WorkflowId, t.Metadata.ReName, t.Metadata.WorkflowId) } if wf.Metadata.CreatedAt > t.Metadata.CreatedAt { diff --git a/venonactl/VERSION b/venonactl/VERSION index ccff224b7..227cea215 100644 --- a/venonactl/VERSION +++ b/venonactl/VERSION @@ -1 +1 @@ -1.10.7 +2.0.0