Skip to content

Commit

Permalink
fix: do not send setStatut + add memory cache on commit status list (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sguiheux authored Mar 18, 2020
1 parent 268c001 commit 06bdf35
Show file tree
Hide file tree
Showing 7 changed files with 249 additions and 170 deletions.
14 changes: 0 additions & 14 deletions engine/api/event/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,6 @@ func publishEvent(ctx context.Context, e sdk.Event) error {
if err := store.Enqueue("events", e); err != nil {
return err
}

// send to cache for cds repositories manager
var toSkipSendReposManager bool
// the StatusWaiting is not useful to be sent on repomanager.
// the building status (or success / failed) is already sent just after
if e.EventType == fmt.Sprintf("%T", sdk.EventRunWorkflowNode{}) && e.Status == sdk.StatusWaiting {
toSkipSendReposManager = true
}
if !toSkipSendReposManager {
if err := store.Enqueue("events_repositoriesmanager", e); err != nil {
return err
}
}

b, err := json.Marshal(e)
if err != nil {
return sdk.WrapError(err, "Cannot marshal event %+v", e)
Expand Down
218 changes: 105 additions & 113 deletions engine/api/workflow/workflow_run_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ import (
"github.com/ovh/cds/sdk/log"
)

type VCSEventMessenger struct {
commitsStatuses map[string][]sdk.VCSCommitStatus
vcsClient sdk.VCSAuthorizedClient
}

// ResyncCommitStatus resync commit status for a workflow run
func ResyncCommitStatus(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj sdk.Project, wr *sdk.WorkflowRun) error {
_, end := observability.Span(ctx, "workflow.resyncCommitStatus",
Expand All @@ -24,91 +29,108 @@ func ResyncCommitStatus(ctx context.Context, db gorp.SqlExecutor, store cache.St
)
defer end()

for nodeID, nodeRuns := range wr.WorkflowNodeRuns {
eventMessenger := &VCSEventMessenger{commitsStatuses: make(map[string][]sdk.VCSCommitStatus)}
for _, nodeRuns := range wr.WorkflowNodeRuns {
sort.Slice(nodeRuns, func(i, j int) bool {
return nodeRuns[i].SubNumber >= nodeRuns[j].SubNumber
})
nodeRun := nodeRuns[0]

if !sdk.StatusIsTerminated(nodeRun.Status) {
continue
if err := eventMessenger.SendVCSEvent(ctx, db, store, proj, *wr, nodeRun); err != nil {
log.Error(ctx, "resyncCommitStatus > unable to send vcs event: %v", err)
}
}

return nil
}

func (e *VCSEventMessenger) SendVCSEvent(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj sdk.Project, wr sdk.WorkflowRun, nodeRun sdk.WorkflowNodeRun) error {
if nodeRun.Status == sdk.StatusWaiting {
return nil
}

if e.commitsStatuses == nil {
e.commitsStatuses = make(map[string][]sdk.VCSCommitStatus)
}

node := wr.Workflow.WorkflowData.NodeByID(nodeID)
if !node.IsLinkedToRepo(&wr.Workflow) {
node := wr.Workflow.WorkflowData.NodeByID(nodeRun.WorkflowNodeID)
if !node.IsLinkedToRepo(&wr.Workflow) {
return nil
}

var notif *sdk.WorkflowNotification
// browse notification to find vcs one
loopNotif:
for _, n := range wr.Workflow.Notifications {
if n.Type != sdk.VCSUserNotification {
continue
}

var notif *sdk.WorkflowNotification
// browse notification to find vcs one
loopNotif:
for _, n := range wr.Workflow.Notifications {
if n.Type != sdk.VCSUserNotification {
continue
}
// If list of node is nill, send notification to all of them
if len(n.NodeIDs) == 0 {
// If list of node is nill, send notification to all of them
if len(n.NodeIDs) == 0 {
notif = &n
break
}
// browser source node id
for _, src := range n.NodeIDs {
if src == node.ID {
notif = &n
break
}
// browser source node id
for _, src := range n.NodeIDs {
if src == node.ID {
notif = &n
break loopNotif
}
break loopNotif
}
}
}

vcsServerName := wr.Workflow.Applications[node.Context.ApplicationID].VCSServer
repoFullName := wr.Workflow.Applications[node.Context.ApplicationID].RepositoryFullname
if notif == nil {
return nil
}

vcsServer := repositoriesmanager.GetProjectVCSServer(proj, vcsServerName)
if vcsServer == nil {
return nil
}
vcsServerName := wr.Workflow.Applications[node.Context.ApplicationID].VCSServer
repoFullName := wr.Workflow.Applications[node.Context.ApplicationID].RepositoryFullname

details := fmt.Sprintf("on project:%s workflow:%s node:%s num:%d sub:%d vcs:%s", proj.Name, wr.Workflow.Name, nodeRun.WorkflowNodeName, nodeRun.Number, nodeRun.SubNumber, vcsServer.Name)
vcsServer := repositoriesmanager.GetProjectVCSServer(proj, vcsServerName)
if vcsServer == nil {
return nil
}

//Get the RepositoriesManager Client
client, errClient := repositoriesmanager.AuthorizedClient(ctx, db, store, proj.Key, vcsServer)
if errClient != nil {
return sdk.WrapError(errClient, "resyncCommitStatus> Cannot get client %s", details)
//Get the RepositoriesManager Client
if e.vcsClient == nil {
var err error
e.vcsClient, err = repositoriesmanager.AuthorizedClient(ctx, db, store, proj.Key, vcsServer)
if err != nil {
return err
}
}

ref := nodeRun.VCSHash
if nodeRun.VCSTag != "" {
ref = nodeRun.VCSTag
}
ref := nodeRun.VCSHash
if nodeRun.VCSTag != "" {
ref = nodeRun.VCSTag
}

statuses, errStatuses := client.ListStatuses(ctx, repoFullName, ref)
if errStatuses != nil {
return sdk.WrapError(errStatuses, "resyncCommitStatus> Cannot get statuses %s", details)
statuses, ok := e.commitsStatuses[ref]
if !ok {
var err error
statuses, err = e.vcsClient.ListStatuses(ctx, repoFullName, ref)
if err != nil {
return err
}

var statusFound *sdk.VCSCommitStatus
expected := sdk.VCSCommitStatusDescription(proj.Key, wr.Workflow.Name, sdk.EventRunWorkflowNode{
NodeName: nodeRun.WorkflowNodeName,
})

for i, status := range statuses {
if status.Decription == expected {
statusFound = &statuses[i]
break
}
e.commitsStatuses[ref] = statuses
}
expected := sdk.VCSCommitStatusDescription(proj.Key, wr.Workflow.Name, sdk.EventRunWorkflowNode{
NodeName: nodeRun.WorkflowNodeName,
})

var statusFound *sdk.VCSCommitStatus
for i, status := range statuses {
if status.Decription == expected {
statusFound = &statuses[i]
break
}
}

if statusFound == nil || statusFound.State == "" {
if err := sendVCSEventStatus(ctx, db, store, proj, wr, &nodeRun, notif); err != nil {
log.Error(ctx, "resyncCommitStatus> Error sending status %s err: %v", details, err)
}

if err := sendVCSPullRequestComment(ctx, db, store, proj, wr, &nodeRun, notif); err != nil {
log.Error(ctx, "resyncCommitStatus> Error sending pr comments %s %s err:%v", statusFound.State, details, err)
}
continue
if statusFound == nil || statusFound.State == "" {
if err := e.sendVCSEventStatus(ctx, db, store, proj.Key, wr, &nodeRun, notif, vcsServer.Name); err != nil {
return err
}

} else {
skipStatus := false
switch statusFound.State {
case sdk.StatusSuccess:
Expand All @@ -130,21 +152,20 @@ func ResyncCommitStatus(ctx context.Context, db gorp.SqlExecutor, store cache.St
}

if !skipStatus {
if err := sendVCSEventStatus(ctx, db, store, proj, wr, &nodeRun, notif); err != nil {
log.Error(ctx, "resyncCommitStatus> Error sending status %s %s err:%v", statusFound.State, details, err)
if err := e.sendVCSEventStatus(ctx, db, store, proj.Key, wr, &nodeRun, notif, vcsServer.Name); err != nil {
return err
}
}
}

if err := sendVCSPullRequestComment(ctx, db, store, proj, wr, &nodeRun, notif); err != nil {
log.Error(ctx, "resyncCommitStatus> Error sending pr comments %s %s err:%v", statusFound.State, details, err)
}

if err := e.sendVCSPullRequestComment(ctx, db, wr, &nodeRun, notif, vcsServer.Name); err != nil {
return err
}
return nil
}

// sendVCSEventStatus send status
func sendVCSEventStatus(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj sdk.Project, wr *sdk.WorkflowRun, nodeRun *sdk.WorkflowNodeRun, notif *sdk.WorkflowNotification) error {
func (e *VCSEventMessenger) sendVCSEventStatus(ctx context.Context, db gorp.SqlExecutor, store cache.Store, projectKey string, wr sdk.WorkflowRun, nodeRun *sdk.WorkflowNodeRun, notif *sdk.WorkflowNotification, vcsServerName string) error {
if notif == nil || notif.Settings.Template == nil || (notif.Settings.Template.DisableStatus != nil && *notif.Settings.Template.DisableStatus) {
return nil
}
Expand All @@ -166,17 +187,6 @@ func sendVCSEventStatus(ctx context.Context, db gorp.SqlExecutor, store cache.St
env = wr.Workflow.Environments[node.Context.EnvironmentID]
}

vcsServer := repositoriesmanager.GetProjectVCSServer(proj, app.VCSServer)
if vcsServer == nil {
return nil
}

//Get the RepositoriesManager Client
client, errClient := repositoriesmanager.AuthorizedClient(ctx, db, store, proj.Key, vcsServer)
if errClient != nil {
return sdk.WrapError(errClient, "sendVCSEventStatus> Cannot get client")
}

var eventWNR = sdk.EventRunWorkflowNode{
ID: nodeRun.ID,
Number: nodeRun.Number,
Expand Down Expand Up @@ -214,12 +224,11 @@ func sendVCSEventStatus(ctx context.Context, db gorp.SqlExecutor, store cache.St

report, err := nodeRun.Report()
if err != nil {
log.Error(ctx, "sendVCSEventStatus> unable to compute node run report%v", err)
return nil
return err
}

// Check if it's a gerrit or not
vcsConf, err := repositoriesmanager.LoadByName(ctx, db, vcsServer.Name)
vcsConf, err := repositoriesmanager.LoadByName(ctx, db, vcsServerName)
if err != nil {
return err
}
Expand Down Expand Up @@ -266,24 +275,24 @@ func sendVCSEventStatus(ctx context.Context, db gorp.SqlExecutor, store cache.St
EventType: fmt.Sprintf("%T", eventWNR),
Payload: payload,
Timestamp: time.Now(),
ProjectKey: proj.Key,
ProjectKey: projectKey,
WorkflowName: wr.Workflow.Name,
PipelineName: pipName,
ApplicationName: appName,
EnvironmentName: envName,
}

if err := client.SetStatus(ctx, evt); err != nil {
if err := e.vcsClient.SetStatus(ctx, evt); err != nil {
if err2 := repositoriesmanager.RetryEvent(&evt, err, store); err2 != nil {
log.Error(ctx, "sendEvent>processEvent> err while retry event: %v", err2)
return err2
}
log.Error(ctx, "sendEvent> err:%v", err)
return err
}

return nil
}

func sendVCSPullRequestComment(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj sdk.Project, wr *sdk.WorkflowRun, nodeRun *sdk.WorkflowNodeRun, notif *sdk.WorkflowNotification) error {
func (e *VCSEventMessenger) sendVCSPullRequestComment(ctx context.Context, db gorp.SqlExecutor, wr sdk.WorkflowRun, nodeRun *sdk.WorkflowNodeRun, notif *sdk.WorkflowNotification, vcsServerName string) error {
if notif == nil || notif.Settings.Template == nil || (notif.Settings.Template.DisableComment != nil && *notif.Settings.Template.DisableComment) {
return nil
}
Expand All @@ -308,23 +317,11 @@ func sendVCSPullRequestComment(ctx context.Context, db gorp.SqlExecutor, store c

report, err := nodeRun.Report()
if err != nil {
log.Error(ctx, "sendVCSPullRequestComment> unable to compute node run report%v", err)
return nil
}

vcsServer := repositoriesmanager.GetProjectVCSServer(proj, app.VCSServer)
if vcsServer == nil {
return nil
}

//Get the RepositoriesManager Client
client, errClient := repositoriesmanager.AuthorizedClient(ctx, db, store, proj.Key, vcsServer)
if errClient != nil {
return errClient
return err
}

// Check if it's a gerrit or not
vcsConf, err := repositoriesmanager.LoadByName(ctx, db, vcsServer.Name)
vcsConf, err := repositoriesmanager.LoadByName(ctx, db, vcsServerName)
if err != nil {
return err
}
Expand All @@ -347,31 +344,26 @@ func sendVCSPullRequestComment(ctx context.Context, db gorp.SqlExecutor, store c
// If we are on Gerrit
if changeID != "" && vcsConf.Type == "gerrit" {
reqComment.ChangeID = changeID
if err := client.PullRequestComment(ctx, app.RepositoryFullname, reqComment); err != nil {
log.Error(ctx, "sendVCSPullRequestComment> unable to send PR report:%v", err)
return nil
if err := e.vcsClient.PullRequestComment(ctx, app.RepositoryFullname, reqComment); err != nil {
return err
}
} else if vcsConf.Type != "gerrit" {
//Check if this branch and this commit is a pullrequest
prs, err := client.PullRequests(ctx, app.RepositoryFullname)
prs, err := e.vcsClient.PullRequests(ctx, app.RepositoryFullname)
if err != nil {
log.Error(ctx, "sendVCSPullRequestComment> unable to get pull requests on repo %s: %v", app.RepositoryFullname, err)
return nil
return err
}

//Send comment on pull request
for _, pr := range prs {
if pr.Head.Branch.DisplayID == nodeRun.VCSBranch && pr.Head.Branch.LatestCommit == nodeRun.VCSHash && !pr.Merged && !pr.Closed {
reqComment.ID = pr.ID
if err := client.PullRequestComment(ctx, app.RepositoryFullname, reqComment); err != nil {
log.Error(ctx, "sendVCSPullRequestComment> unable to send PR report:%v", err)
return nil
if err := e.vcsClient.PullRequestComment(ctx, app.RepositoryFullname, reqComment); err != nil {
return err
}
// if we found the pull request for head branch we can break (only one PR for the branch should exist)
break
}
}
}

return nil
}
Loading

0 comments on commit 06bdf35

Please sign in to comment.