Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(api, worker): auto build info #6264

Merged
merged 18 commits into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions contrib/integrations/artifactory/artifactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"github.com/jfrog/jfrog-client-go/config"
"github.com/jfrog/jfrog-client-go/distribution"
authdistrib "github.com/jfrog/jfrog-client-go/distribution/auth"
"github.com/jfrog/jfrog-client-go/utils/log"
"github.com/pkg/errors"
"github.com/rockbears/log"

"github.com/ovh/cds/engine/api/integration/artifact_manager"
"github.com/ovh/cds/sdk"
Expand Down Expand Up @@ -236,7 +236,7 @@ type BuildInfoRequest struct {

func PrepareBuildInfo(ctx context.Context, artiClient artifact_manager.ArtifactManager, r BuildInfoRequest) (*buildinfo.BuildInfo, error) {
buildInfoName := fmt.Sprintf("%s/%s/%s", r.BuildInfoPrefix, r.ProjectKey, r.WorkflowName)
log.Debug(ctx, "PrepareBuildInfo %q", buildInfoName)
log.Debug(ctx, "PrepareBuildInfo %q maturity:%q", buildInfoName, r.LowMaturitySuffix)

buildInfoRequest := &buildinfo.BuildInfo{
Properties: map[string]string{},
Expand Down Expand Up @@ -272,6 +272,7 @@ func PrepareBuildInfo(ctx context.Context, artiClient artifact_manager.ArtifactM
version: r.Version,
projectKey: r.ProjectKey,
}
log.Info(ctx, "compute build infos for %+v", execContext)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug ?

modules, err := computeBuildInfoModules(ctx, artiClient, execContext, r.RunResults)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions engine/api/api_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ func (api *API) InitRouter() {
r.Handle("/queue/workflows/{permJobID}/coverage", Scope(sdk.AuthConsumerScopeRunExecution), r.POSTEXECUTE(api.postWorkflowJobCoverageResultsHandler, MaintenanceAware()))
r.Handle("/queue/workflows/{permJobID}/run/results", Scope(sdk.AuthConsumerScopeRunExecution), r.POSTEXECUTE(api.postWorkflowRunResultsHandler))
r.Handle("/queue/workflows/{permJobID}/run/results/check", Scope(sdk.AuthConsumerScopeRunExecution), r.POSTEXECUTE(api.workflowRunResultCheckUploadHandler))
r.Handle("/queue/workflows/{permJobID}/run/results/promote", Scope(sdk.AuthConsumerScopeRunExecution), r.POSTEXECUTE(api.workflowRunResultPromoteHandler))
r.Handle("/queue/workflows/{permJobID}/run/results/release", Scope(sdk.AuthConsumerScopeRunExecution), r.POSTEXECUTE(api.workflowRunResultReleaseHandler))
r.Handle("/queue/workflows/{permJobID}/test", Scope(sdk.AuthConsumerScopeRunExecution), r.POSTEXECUTE(api.postWorkflowJobTestsResultsHandler, MaintenanceAware()))
r.Handle("/queue/workflows/{permJobID}/tag", Scope(sdk.AuthConsumerScopeRunExecution), r.POSTEXECUTE(api.postWorkflowJobTagsHandler, MaintenanceAware()))
Expand Down
126 changes: 79 additions & 47 deletions engine/api/workflow/workflow_run_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/go-gorp/gorp"
"github.com/lib/pq"
"github.com/rockbears/log"

art "github.com/ovh/cds/contrib/integrations/artifactory"
Expand Down Expand Up @@ -369,6 +370,11 @@ func getAll(ctx context.Context, db gorp.SqlExecutor, query gorpmapping.Query) (
return results, nil
}

func LoadRunResultsByRunIDFilterByIDs(ctx context.Context, db gorp.SqlExecutor, runID int64, resultIDs ...string) (sdk.WorkflowRunResults, error) {
query := gorpmapping.NewQuery("SELECT * FROM workflow_run_result WHERE workflow_run_id = $1 AND id = ANY($2) ORDER BY sub_num DESC").Args(runID, pq.StringArray(resultIDs))
return getAll(ctx, db, query)
}

func LoadRunResultsByRunID(ctx context.Context, db gorp.SqlExecutor, runID int64) (sdk.WorkflowRunResults, error) {
query := gorpmapping.NewQuery("SELECT * FROM workflow_run_result WHERE workflow_run_id = $1 ORDER BY sub_num DESC").Args(runID)
return getAll(ctx, db, query)
Expand Down Expand Up @@ -413,8 +419,19 @@ func ResyncWorkflowRunResultsRoutine(ctx context.Context, DBFunc func() *gorp.Db
continue
}
for _, id := range ids {
if err := SyncRunResultArtifactManagerByRunID(ctx, DBFunc(), id); err != nil {
tx, err := DBFunc().Begin()
if err != nil {
log.ErrorWithStackTrace(ctx, err)
continue
}
if err := SyncRunResultArtifactManagerByRunID(ctx, tx, id); err != nil {
log.ErrorWithStackTrace(ctx, err)
tx.Rollback()
continue
}
if err := tx.Commit(); err != nil {
log.ErrorWithStackTrace(ctx, err)
tx.Rollback()
continue
}
}
Expand All @@ -432,14 +449,17 @@ func FindOldestWorkflowRunsWithResultToSync(ctx context.Context, dbmap *gorp.DbM
return results, nil
}

func SyncRunResultArtifactManagerByRunID(ctx context.Context, dbmap *gorp.DbMap, id int64) error {
log.Info(ctx, "sync run result for workflow run id %d", id)
db, err := dbmap.Begin()
if err != nil {
return sdk.WithStack(err)
func UpdateRunResult(ctx context.Context, db gorp.SqlExecutor, result *sdk.WorkflowRunResult) error {
dbResult := dbRunResult(*result)
log.Debug(ctx, "updating run result %s: %v", dbResult.ID, result)
if err := gorpmapping.Update(db, &dbResult); err != nil {
return err
}
defer db.Rollback() // nolint
return nil
}

func SyncRunResultArtifactManagerByRunID(ctx context.Context, db gorp.SqlExecutor, id int64) error {
log.Info(ctx, "sync run result for workflow run id %d", id)
wr, err := LoadAndLockRunByID(ctx, db, id, LoadRunOptions{})
if err != nil {
return err
Expand All @@ -459,12 +479,12 @@ func SyncRunResultArtifactManagerByRunID(ctx context.Context, dbmap *gorp.DbMap,
result := allRunResults[i]
// If the result is not an artifact manager, we do nothing but we consider it as synchronized
if result.Type != sdk.WorkflowRunResultTypeArtifactManager {
dbResult := dbRunResult(result)
dbResult.DataSync = new(sdk.WorkflowRunResultSync)
dbResult.DataSync.Link = ""
dbResult.DataSync.Sync = true
log.Debug(ctx, "updating run result %s", dbResult.ID)
if err := gorpmapping.Update(db, &dbResult); err != nil {
if result.DataSync == nil {
result.DataSync = new(sdk.WorkflowRunResultSync)
}
result.DataSync.Link = ""
result.DataSync.Sync = true
if err := UpdateRunResult(ctx, db, &result); err != nil {
return err
}
} else {
Expand All @@ -474,9 +494,6 @@ func SyncRunResultArtifactManagerByRunID(ctx context.Context, dbmap *gorp.DbMap,

// Nothing more to do with artifact manager
if len(runResults) == 0 {
if err := db.Commit(); err != nil {
return err
}
return nil
}

Expand All @@ -495,30 +512,27 @@ func SyncRunResultArtifactManagerByRunID(ctx context.Context, dbmap *gorp.DbMap,
for i := range runResults {
result := runResults[i]
// If the result is not an artifact manager, we do nothing but we consider it as synchronized
dbResult := dbRunResult(result)
dbResult.DataSync = new(sdk.WorkflowRunResultSync)
dbResult.DataSync.Sync = false
dbResult.DataSync.Error = err.Error()
log.Debug(ctx, "updating run result %s", dbResult.ID)
if err := gorpmapping.Update(db, &dbResult); err != nil {
if result.DataSync == nil {
result.DataSync = new(sdk.WorkflowRunResultSync)
}
result.DataSync.Sync = false
result.DataSync.Error = err.Error()
if err := UpdateRunResult(ctx, db, &result); err != nil {
return err
}
}
if err := db.Commit(); err != nil {
return err
}
return nil
}

log.Info(ctx, "artifact manager %q found for workflow run", artifactManagerInteg.ProjectIntegration.Name)

var (
rtName = artifactManagerInteg.ProjectIntegration.Config[sdk.ArtifactoryConfigPlatform].Value
rtURL = artifactManagerInteg.ProjectIntegration.Config[sdk.ArtifactoryConfigURL].Value
buildInfoPrefix = artifactManagerInteg.ProjectIntegration.Config[sdk.ArtifactoryConfigBuildInfoPrefix].Value
tokenName = artifactManagerInteg.ProjectIntegration.Config[sdk.ArtifactoryConfigTokenName].Value
lowMaturitySuffix = artifactManagerInteg.ProjectIntegration.Config[sdk.ArtifactoryConfigPromotionLowMaturity].Value
artifactoryProjectKey = artifactManagerInteg.ProjectIntegration.Config[sdk.ArtifactoryConfigProjectKey].Value
rtName = artifactManagerInteg.ProjectIntegration.Config[sdk.ArtifactoryConfigPlatform].Value
rtURL = artifactManagerInteg.ProjectIntegration.Config[sdk.ArtifactoryConfigURL].Value
buildInfoPrefix = artifactManagerInteg.ProjectIntegration.Config[sdk.ArtifactoryConfigBuildInfoPrefix].Value
tokenName = artifactManagerInteg.ProjectIntegration.Config[sdk.ArtifactoryConfigTokenName].Value
lowMaturitySuffixFromConfig = artifactManagerInteg.ProjectIntegration.Config[sdk.ArtifactoryConfigPromotionLowMaturity].Value
artifactoryProjectKey = artifactManagerInteg.ProjectIntegration.Config[sdk.ArtifactoryConfigProjectKey].Value
)

// Load the token from secrets
Expand All @@ -539,18 +553,15 @@ func SyncRunResultArtifactManagerByRunID(ctx context.Context, dbmap *gorp.DbMap,
log.ErrorWithStackTrace(ctx, err)
for i := range runResults {
result := runResults[i]
dbResult := dbRunResult(result)
dbResult.DataSync = new(sdk.WorkflowRunResultSync)
dbResult.DataSync.Sync = false
dbResult.DataSync.Error = err.Error()
log.Debug(ctx, "updating run result %s", dbResult.ID)
if err := gorpmapping.Update(db, &dbResult); err != nil {
if result.DataSync == nil {
result.DataSync = new(sdk.WorkflowRunResultSync)
}
result.DataSync.Sync = false
result.DataSync.Error = err.Error()
if err := UpdateRunResult(ctx, db, &result); err != nil {
return err
}
}
if err := db.Commit(); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -591,6 +602,27 @@ func SyncRunResultArtifactManagerByRunID(ctx context.Context, dbmap *gorp.DbMap,
nodeRunURL := parameters["cds.ui.pipeline.run"][0]
runURL := nodeRunURL[0:strings.Index(nodeRunURL, "/node/")]

var lowMaturitySuffix string
for i := range runResults {
log.Debug(ctx, "checking for earlier maturity in %+v", runResults[i].DataSync)
if runResults[i].DataSync != nil {
p := runResults[i].DataSync.LatestPromotionOrRelease()
if p != nil {
if lowMaturitySuffix != "" {
if p.FromMaturity != lowMaturitySuffix {
return sdk.NewErrorFrom(sdk.ErrWrongRequest, "several maturities (%q, %q) detected among your artifacts", p.ToMaturity, lowMaturitySuffix)
}
}
log.Debug(ctx, "low maturity is %+v", p.FromMaturity)
lowMaturitySuffix = p.FromMaturity
// we don't break the loop to let us to check all the maturities
}
}
}
if lowMaturitySuffix == "" {
lowMaturitySuffix = lowMaturitySuffixFromConfig
}

buildInfoRequest, err := art.PrepareBuildInfo(ctx, artifactClient, art.BuildInfoRequest{
BuildInfoPrefix: buildInfoPrefix,
ProjectKey: wr.Workflow.ProjectKey,
Expand Down Expand Up @@ -630,15 +662,15 @@ func SyncRunResultArtifactManagerByRunID(ctx context.Context, dbmap *gorp.DbMap,
}

for _, result := range runResults {
dbResult := dbRunResult(result)
dbResult.DataSync = new(sdk.WorkflowRunResultSync)
dbResult.DataSync.Link = buildInfoRequest.Name + "/" + buildInfoRequest.Number
dbResult.DataSync.Sync = true
log.Debug(ctx, "updating run result %s", dbResult.ID)
if err := gorpmapping.Update(db, &dbResult); err != nil {
if result.DataSync == nil {
result.DataSync = new(sdk.WorkflowRunResultSync)
}
result.DataSync.Link = buildInfoRequest.Name + "/" + buildInfoRequest.Number
result.DataSync.Sync = true
if err := UpdateRunResult(ctx, db, &result); err != nil {
return err
}
}

return sdk.WithStack(db.Commit())
return nil
}
98 changes: 95 additions & 3 deletions engine/api/workflow_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,64 @@ func (api *API) workflowRunResultCheckUploadHandler() service.Handler {
}
}

func (api *API) workflowRunResultPromoteHandler() service.Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
if !isWorker(ctx) {
return sdk.WrapError(sdk.ErrForbidden, "only workers can call this route")
}

jobID, err := requestVarInt(r, "permJobID")
if err != nil {
return err
}

var promotionRequest sdk.WorkflowRunResultPromotionRequest
if err := service.UnmarshalBody(r, &promotionRequest); err != nil {
return sdk.WithStack(err)
}

tx, err := api.mustDBWithCtx(ctx).Begin()
if err != nil {
return sdk.WithStack(err)
}
defer tx.Rollback()

wr, err := workflow.LoadRunByJobID(ctx, tx, jobID, workflow.LoadRunOptions{DisableDetailledNodeRun: true})
if err != nil {
return err
}

runResults, err := workflow.LoadRunResultsByRunIDFilterByIDs(ctx, tx, wr.ID, promotionRequest.IDs...)
if err != nil {
return err
}

if len(runResults) == 0 {
return sdk.NewErrorFrom(sdk.ErrWrongRequest, "unable to find any run results among %v", promotionRequest.IDs)
}

promotionRequest.WorkflowRunResultPromotion.Date = time.Now()
for i := range runResults {
r := &runResults[i]
for _, id := range promotionRequest.IDs {
if id == r.ID {
log.Debug(ctx, "adding promotion data: %+v", promotionRequest)
r.DataSync.Promotions = append(r.DataSync.Promotions, promotionRequest.WorkflowRunResultPromotion)
}
}
if err := workflow.UpdateRunResult(ctx, tx, r); err != nil {
return err
}
}

if err := workflow.SyncRunResultArtifactManagerByRunID(ctx, tx, wr.ID); err != nil {
return err
}

return sdk.WithStack(tx.Commit())
}
}

func (api *API) workflowRunResultReleaseHandler() service.Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
if !isWorker(ctx) {
Expand All @@ -1132,16 +1190,50 @@ func (api *API) workflowRunResultReleaseHandler() service.Handler {
return err
}

wr, err := workflow.LoadRunByJobID(ctx, api.mustDBWithCtx(ctx), jobID, workflow.LoadRunOptions{DisableDetailledNodeRun: true})
var releaseRequest sdk.WorkflowRunResultPromotionRequest
if err := service.UnmarshalBody(r, &releaseRequest); err != nil {
return sdk.WithStack(err)
}

tx, err := api.mustDBWithCtx(ctx).Begin()
if err != nil {
return sdk.WithStack(err)
}
defer tx.Rollback()

wr, err := workflow.LoadRunByJobID(ctx, tx, jobID, workflow.LoadRunOptions{DisableDetailledNodeRun: true})
if err != nil {
return err
}

if err := workflow.SyncRunResultArtifactManagerByRunID(ctx, api.mustDBWithCtx(ctx), wr.ID); err != nil {
runResults, err := workflow.LoadRunResultsByRunIDFilterByIDs(ctx, tx, wr.ID, releaseRequest.IDs...)
if err != nil {
return err
}

return nil
if len(runResults) == 0 {
return sdk.NewErrorFrom(sdk.ErrWrongRequest, "unable to find any run results among %v", releaseRequest.IDs)
}

releaseRequest.WorkflowRunResultPromotion.Date = time.Now()
for i := range runResults {
r := &runResults[i]
for _, id := range releaseRequest.IDs {
if id == r.ID {
log.Debug(ctx, "adding release data: %+v", releaseRequest)
r.DataSync.Releases = append(r.DataSync.Releases, releaseRequest.WorkflowRunResultPromotion)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

}
}
if err := workflow.UpdateRunResult(ctx, tx, r); err != nil {
return err
}
}

if err := workflow.SyncRunResultArtifactManagerByRunID(ctx, tx, wr.ID); err != nil {
return err
}

return sdk.WithStack(tx.Commit())
}
}

Expand Down
19 changes: 19 additions & 0 deletions engine/worker/internal/action/builtin_promote.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,32 @@ import (
"strings"

"github.com/golang/protobuf/ptypes/empty"
"github.com/rockbears/log"

"github.com/ovh/cds/engine/worker/pkg/workerruntime"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/grpcplugin/integrationplugin"
)

func RunPromote(ctx context.Context, wk workerruntime.Runtime, a sdk.Action, _ []sdk.Variable) (sdk.Result, error) {
jobID, err := workerruntime.JobID(ctx)
if err != nil {
return sdk.Result{Status: sdk.StatusFail}, err
}

promotedRunResultIDs, err := RunReleaseActionPrepare(ctx, wk, a)
if err != nil {
return sdk.Result{Status: sdk.StatusFail}, err
}

log.Info(ctx, "RunPromote> preparing run result %+v for promotion", promotedRunResultIDs)
if err := wk.Client().QueueWorkflowRunResultsPromote(ctx,
jobID, promotedRunResultIDs,
sdk.ParameterValue(a.Parameters, "srcMaturity"), sdk.ParameterValue(a.Parameters, "destMaturity"),
); err != nil {
return sdk.Result{Status: sdk.StatusFail}, err
}

pfName := sdk.ParameterFind(wk.Parameters(), "cds.integration.artifact_manager")
if pfName == nil {
return sdk.Result{}, errors.New("unable to retrieve artifact manager integration... Aborting")
Expand Down
Loading