Skip to content

Commit

Permalink
feat(venona): report task status to te platform
Browse files Browse the repository at this point in the history
  • Loading branch information
masontikhonov committed Nov 13, 2024
1 parent 47eb2a8 commit bf2cab0
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 31 deletions.
2 changes: 1 addition & 1 deletion venona/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.10.7
2.0.0
2 changes: 1 addition & 1 deletion venona/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func run(options startOptions) {

httpHeaders := http.Header{}
{
httpHeaders.Add("User-Agent", fmt.Sprintf("codefresh-runner-%s", version))
httpHeaders.Add("User-Agent", fmt.Sprintf("cf-classic-runner/%s", version))
}

cf = codefresh.New(codefresh.Options{
Expand Down
20 changes: 14 additions & 6 deletions venona/pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func New(opts *Options) (*Agent, error) {
Monitor: opts.Monitor,
Concurrency: opts.Concurrency,
BufferSize: opts.BufferSize,
Codefresh: opts.Codefresh,
})
return &Agent{
id: id,
Expand Down Expand Up @@ -241,6 +242,13 @@ func (a *Agent) reportStatus(ctx context.Context, status codefresh.AgentStatus)
}
}

func (a *Agent) reportTaskStatus(ctx context.Context, id task.Id, status task.TaskStatus) {
err := a.cf.ReportTaskStatus(ctx, id, status)
if err != nil {
a.log.Error("Failed reporting task status", "error", err)
}
}

func (a *Agent) getTasks(ctx context.Context) (task.Tasks, []*workflow.Workflow) {
tasks := a.pullTasks(ctx)
return a.splitTasks(tasks)
Expand Down Expand Up @@ -276,18 +284,18 @@ func (a *Agent) splitTasks(tasks task.Tasks) (task.Tasks, []*workflow.Workflow)
t.Timeline.Pulled = pullTime
agentTasks = append(agentTasks, t)
case task.TypeCreatePod, task.TypeCreatePVC, task.TypeDeletePod, task.TypeDeletePVC:
wf, ok := wfMap[t.Metadata.Workflow]
wf, ok := wfMap[t.Metadata.WorkflowId]
if !ok {
wf = workflow.New(t.Metadata)
wfMap[t.Metadata.Workflow] = wf
wfMap[t.Metadata.WorkflowId] = wf
}

err := wf.AddTask(&t)
if err != nil {
a.log.Error("failed adding task to workflow", "error", err)
}
default:
a.log.Error("unrecognized task type", "type", t.Type, "tid", t.Metadata.Workflow, "runtime", t.Metadata.ReName)
a.log.Error("unrecognized task type", "type", t.Type, "tid", t.Metadata.WorkflowId, "runtime", t.Metadata.ReName)
}
}

Expand All @@ -314,7 +322,7 @@ func (a *Agent) splitTasks(tasks task.Tasks) (task.Tasks, []*workflow.Workflow)
}

func (a *Agent) handleAgentTask(t *task.Task) {
a.log.Info("executing agent task", "tid", t.Metadata.Workflow)
a.log.Info("executing agent task", "tid", t.Metadata.WorkflowId)
a.wg.Add(1)
go func() {
defer a.wg.Done()
Expand Down Expand Up @@ -350,7 +358,7 @@ func (a *Agent) executeAgentTask(t *task.Task) error {
err = e(&spec, a.log)
sinceCreation, inRunner, processed := t.GetLatency()
a.log.Info("Done handling agent task",
"tid", t.Metadata.Workflow,
"tid", t.Metadata.WorkflowId,
"time since creation", sinceCreation,
"time in runner", inRunner,
"processing time", processed,
Expand Down Expand Up @@ -410,7 +418,7 @@ func proxyRequest(t *task.AgentTask, log logger.Logger) error {
func groupTasks(tasks task.Tasks) map[string]task.Tasks {
candidates := map[string]task.Tasks{}
for _, task := range tasks {
name := task.Metadata.Workflow
name := task.Metadata.WorkflowId
if name == "" {
// If for some reason the task is not related to any workflow
// Might heppen in older versions on Codefresh
Expand Down
12 changes: 6 additions & 6 deletions venona/pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,37 +38,37 @@ func Test_groupTasks(t *testing.T) {
tasks: task.Tasks{
{
Metadata: task.Metadata{
Workflow: "1",
WorkflowId: "1",
},
},
{
Metadata: task.Metadata{
Workflow: "2",
WorkflowId: "2",
},
},
{
Metadata: task.Metadata{
Workflow: "1",
WorkflowId: "1",
},
},
},
want: map[string]task.Tasks{
"1": {
{
Metadata: task.Metadata{
Workflow: "1",
WorkflowId: "1",
},
},
{
Metadata: task.Metadata{
Workflow: "1",
WorkflowId: "1",
},
},
},
"2": {
{
Metadata: task.Metadata{
Workflow: "2",
WorkflowId: "2",
},
},
},
Expand Down
15 changes: 15 additions & 0 deletions venona/pkg/codefresh/codefresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type (
// Codefresh API client
Codefresh interface {
Tasks(ctx context.Context) (task.Tasks, error)
ReportTaskStatus(ctx context.Context, id task.Id, status task.TaskStatus) error
ReportStatus(ctx context.Context, status AgentStatus) error
Host() string
}
Expand Down Expand Up @@ -92,6 +93,20 @@ func (c cf) Tasks(ctx context.Context) (task.Tasks, error) {
return tasks, nil
}

func (c cf) ReportTaskStatus(ctx context.Context, id task.Id, status task.TaskStatus) error {
s, err := status.Marshal()
if err != nil {
return fmt.Errorf("failed marshalling when reporting task status: %w", err)
}

_, err = c.doRequest(ctx, "POST", bytes.NewBuffer(s), "api", "agent", c.agentID, "tasks", string(id), "statuses")
if err != nil {
return fmt.Errorf("failed sending request when reporting task status: %w", err)
}

return nil
}

// Host returns the host
func (c cf) Host() string {
return c.host
Expand Down
35 changes: 27 additions & 8 deletions venona/pkg/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"
"time"

"github.com/codefresh-io/go/venona/pkg/codefresh"
"github.com/codefresh-io/go/venona/pkg/logger"
"github.com/codefresh-io/go/venona/pkg/metrics"
"github.com/codefresh-io/go/venona/pkg/monitoring"
Expand All @@ -45,6 +46,7 @@ type (
Monitor monitoring.Monitor
Concurrency int
BufferSize int
Codefresh codefresh.Codefresh
}

wfQueueImpl struct {
Expand All @@ -57,6 +59,7 @@ type (
stop []chan bool
activeWorkflows map[string]struct{}
mutex sync.Mutex
cf codefresh.Codefresh
}
)

Expand All @@ -73,6 +76,7 @@ func New(opts *Options) WorkflowQueue {
concurrency: opts.Concurrency,
stop: make([]chan bool, opts.Concurrency),
activeWorkflows: make(map[string]struct{}),
cf: opts.Codefresh,
}
}

Expand Down Expand Up @@ -116,22 +120,22 @@ func (wfq *wfQueueImpl) handleChannel(ctx context.Context, stopChan chan bool, i
ctxCancelled = true
case wf := <-wfq.queue:
wfq.mutex.Lock()
if _, ok := wfq.activeWorkflows[wf.Metadata.Workflow]; ok {
if _, ok := wfq.activeWorkflows[wf.Metadata.WorkflowId]; ok {
// Workflow is already being handled, enqueue it again and skip processing
wfq.mutex.Unlock()
wfq.log.Info("Workflow", wf.Metadata.Workflow, " is already being handled, enqueue it again and skip processing")
wfq.log.Info("Workflow", wf.Metadata.WorkflowId, " is already being handled, enqueue it again and skip processing")
time.Sleep(100 * time.Millisecond)
wfq.Enqueue(wf)
continue
}
// Mark the workflow as active
wfq.activeWorkflows[wf.Metadata.Workflow] = struct{}{}
wfq.activeWorkflows[wf.Metadata.WorkflowId] = struct{}{}
wfq.mutex.Unlock()

wfq.log.Info("handling workflow", "handlerId", id, "workflow", wf.Metadata.Workflow)
wfq.log.Info("handling workflow", "handlerId", id, "workflow", wf.Metadata.WorkflowId)
wfq.handleWorkflow(ctx, wf)
wfq.mutex.Lock()
delete(wfq.activeWorkflows, wf.Metadata.Workflow)
delete(wfq.activeWorkflows, wf.Metadata.WorkflowId)
wfq.mutex.Unlock()
default:
if ctxCancelled {
Expand All @@ -149,7 +153,7 @@ func (wfq *wfQueueImpl) handleWorkflow(ctx context.Context, wf *workflow.Workflo
txn := task.NewTaskTransaction(wfq.monitor, wf.Metadata)
defer txn.End()

workflow := wf.Metadata.Workflow
workflow := wf.Metadata.WorkflowId
reName := wf.Metadata.ReName
runtime, ok := wfq.runtimes[reName]
if !ok {
Expand All @@ -159,16 +163,31 @@ func (wfq *wfQueueImpl) handleWorkflow(ctx context.Context, wf *workflow.Workflo
}

for i := range wf.Tasks {
err := runtime.HandleTask(ctx, wf.Tasks[i])
taskDef := wf.Tasks[i]
err := runtime.HandleTask(ctx, taskDef)
status := task.TaskStatus{
OccurredAt: time.Now(),
StatusRevision: taskDef.Metadata.CurrentStatusRevision + 1,
}
if err != nil {
wfq.log.Error("failed handling task", "error", err, "workflow", workflow)
txn.NoticeError(errRuntimeNotFound)
status.Status = task.StatusError
status.Reason = err.Error()
status.IsRetriable = true // TODO: make this configurable depending on the error
} else {
status.Status = task.StatusSuccess
}
statusErr := wfq.cf.ReportTaskStatus(ctx, taskDef.Id, status)
if statusErr != nil {
wfq.log.Error("failed reporting task status", "error", statusErr, "task", taskDef.Id, "workflow", workflow)
txn.NoticeError(statusErr)
}
}

sinceCreation, inRunner, processed := wf.GetLatency()
wfq.log.Info("Done handling workflow",
"workflow", wf.Metadata.Workflow,
"workflow", wf.Metadata.WorkflowId,
"runtime", wf.Metadata.ReName,
"time since creation", sinceCreation,
"time in runner", inRunner,
Expand Down
4 changes: 2 additions & 2 deletions venona/pkg/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (

func makeWorkflow(wfID string, numOfTasks int) *workflow.Workflow {
metadata := task.Metadata{
Workflow: wfID,
ReName: "some-rt",
WorkflowId: wfID,
ReName: "some-rt",
}
wf := workflow.New(metadata)
for i := 0; i < numOfTasks; i++ {
Expand Down
33 changes: 29 additions & 4 deletions venona/pkg/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,27 @@ const (
TypeAgentTask Type = "AgentTask"
)

const (
StatusSuccess Status = "Success"
StatusError Status = "Error"
)

type (
// Tasks array
Tasks []Task

// Type of the task
Type string

// Task Id
Id string

// Task status
Status string

// Task options
Task struct {
Id Id `json:"_id"`
Type Type `json:"type"`
Metadata Metadata `json:"metadata"`
Spec interface{} `json:"spec"`
Expand All @@ -50,9 +62,10 @@ type (

// Metadata options
Metadata struct {
CreatedAt string `json:"createdAt"`
ReName string `json:"reName"`
Workflow string `json:"workflow"`
CreatedAt string `json:"createdAt"`
ReName string `json:"reName"`
WorkflowId string `json:"workflowId"`
CurrentStatusRevision int `json:"currentStatusRevision"`
}

// Timeline values
Expand All @@ -66,6 +79,14 @@ type (
Type string `json:"type"`
Params map[string]interface{} `json:"params"`
}

TaskStatus struct {
Status Status `json:"status"`
OccurredAt time.Time `json:"occurredAt"`
StatusRevision int `json:"statusRevision"`
IsRetriable bool `json:"isRetriabe"`
Reason string `json:"reason"`
}
)

// UnmarshalTasks with json
Expand All @@ -80,6 +101,10 @@ func (r *Tasks) Marshal() ([]byte, error) {
return json.Marshal(r)
}

func (r *TaskStatus) Marshal() ([]byte, error) {
return json.Marshal(r)
}

// Less compares two tasks by their CreatedAt values
func Less(task1 Task, task2 Task) bool {
return task1.Metadata.CreatedAt < task2.Metadata.CreatedAt
Expand All @@ -88,7 +113,7 @@ func Less(task1 Task, task2 Task) bool {
// NewTaskTransaction creates a new transaction with task-specific attributes
func NewTaskTransaction(monitor monitoring.Monitor, m Metadata) monitoring.Transaction {
txn := monitor.NewTransaction("runner-tasks-execution")
txn.AddAttribute("tid", m.Workflow)
txn.AddAttribute("tid", m.WorkflowId)
txn.AddAttribute("runtime-environment", m.ReName)
return txn
}
Expand Down
4 changes: 2 additions & 2 deletions venona/pkg/workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func New(metadata task.Metadata) *Workflow {

// AddTask adds a specific task to its matching parent worklow
func (wf *Workflow) AddTask(t *task.Task) error {
if wf.Metadata.ReName != t.Metadata.ReName || wf.Metadata.Workflow != t.Metadata.Workflow {
return fmt.Errorf("mismatch runtime or workflow id, %s/%s is different from %s/%s", wf.Metadata.ReName, wf.Metadata.Workflow, t.Metadata.ReName, t.Metadata.Workflow)
if wf.Metadata.ReName != t.Metadata.ReName || wf.Metadata.WorkflowId != t.Metadata.WorkflowId {
return fmt.Errorf("mismatch runtime or workflow id, %s/%s is different from %s/%s", wf.Metadata.ReName, wf.Metadata.WorkflowId, t.Metadata.ReName, t.Metadata.WorkflowId)
}

if wf.Metadata.CreatedAt > t.Metadata.CreatedAt {
Expand Down
2 changes: 1 addition & 1 deletion venonactl/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.10.7
2.0.0

0 comments on commit bf2cab0

Please sign in to comment.