Skip to content

Commit

Permalink
feat: new workflow model (#3494)
Browse files Browse the repository at this point in the history
  • Loading branch information
sguiheux authored Nov 12, 2018
1 parent 991d739 commit 6c94766
Show file tree
Hide file tree
Showing 236 changed files with 9,728 additions and 7,855 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
*.swp
*.iml
.idea
*.iml
.vscode
Expand Down
18 changes: 17 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@
version: '3'

services:
cds-feature:
image: maif/izanami
ports:
- "8080:8080"
environment:
INITIAL_APIKEY_CLIENT_ID: changeitchangeitchangeitchangeit
INITIAL_APIKEY_CLIENT_SECRET: changeitchangeitchangeitchangeit
cds-db:
image: postgres:9.6.2
environment:
Expand Down Expand Up @@ -34,7 +41,7 @@ services:
CDS_API_URL_API: ${HOSTNAME}:8081
CDS_API_URL_UI: ${HOSTNAME}:8080
CDS_API_AUTH_DEFAULTGROUP: cdsdemo
CDS_LOG_LEVEL: info
CDS_LOG_LEVEL: debug
CDS_API_CACHE_MODE: redis
CDS_API_CACHE_REDIS_HOST: cds-cache:6379
CDS_API_CACHE_REDIS_PASSWORD: cds
Expand All @@ -43,12 +50,21 @@ services:
CDS_API_ARTIFACT_LOCAL_BASEDIRECTORY: /app/artifacts
CDS_API_AUTH_SHAREDINFRATOKEN: changeitchangeitchangeitchangeitchangeitchangeitchangeitchangeit
CDS_API_SECRETS_KEY: changeitchangeitchangeitchangeit
CDS_LOG_GRAYLOG_EXTRAKEY:
CDS_LOG_GRAYLOG_EXTRAVALUE:
CDS_LOG_GRAYLOG_HOST:
CDS_LOG_GRAYLOG_PORT:
CDS_API_FEATURES_IZANAMI_APIURL: "http://cds-feature:8080/api"
CDS_API_FEATURES_IZANAMI_CLIENTID: "changeitchangeitchangeitchangeit"
CDS_API_FEATURES_IZANAMI_CLIENTSECRET: "changeitchangeitchangeitchangeit"

ports:
- "8081:8081"
- "8082:8082"
links:
- cds-db
- cds-cache
- cds-feature

cds-ui:
image: ovhcom/cds-ui:latest
Expand Down
4 changes: 4 additions & 0 deletions engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,7 @@ func (a *API) Serve(ctx context.Context) error {
event.Subscribe(a.warnChan)

log.Info("Initializing internal routines...")

sdk.GoRoutine(ctx, "worker.Initialize", func(ctx context.Context) {
if err := worker.Initialize(ctx, a.DBConnectionFactory.GetDBMap, a.Cache); err != nil {
log.Error("error while initializing workers routine: %s", err)
Expand Down Expand Up @@ -689,6 +690,9 @@ func (a *API) Serve(ctx context.Context) error {
sdk.GoRoutine(ctx, "api.serviceAPIHeartbeat", func(ctx context.Context) {
a.serviceAPIHeartbeat(ctx)
})
sdk.GoRoutine(ctx, "migrate.WorkflowData", func(ctx context.Context) {
migrate.MigrateToWorkflowData(a.DBConnectionFactory.GetDBMap, a.Cache)
})

//Temporary migration code
go migrate.WorkflowNodeRunArtifacts(a.Cache, a.DBConnectionFactory.GetDBMap)
Expand Down
3 changes: 3 additions & 0 deletions engine/api/api_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ func (api *API) InitRouter() {
r.Handle("/project/{key}/workflow/{permWorkflowName}/node/{nodeID}/hook/model", r.GET(api.getWorkflowHookModelsHandler))
r.Handle("/project/{key}/workflow/{permWorkflowName}/node/{nodeID}/outgoinghook/model", r.GET(api.getWorkflowOutgoingHookModelsHandler))

// Outgoing hook model
r.Handle("/workflow/outgoinghook/model", r.GET(api.getWorkflowOutgoingHookModelsHandler))

// Preview workflows
r.Handle("/project/{permProjectKey}/preview/workflows", r.POST(api.postWorkflowPreviewHandler))
// Import workflows
Expand Down
15 changes: 10 additions & 5 deletions engine/api/application/dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,15 +188,20 @@ func TestLoadByWorkflowID(t *testing.T) {
Name: "test_1",
ProjectID: proj.ID,
ProjectKey: proj.Key,
Root: &sdk.WorkflowNode{
PipelineID: pip.ID,
PipelineName: pip.Name,
Context: &sdk.WorkflowNodeContext{
Application: &app,
WorkflowData: &sdk.WorkflowData{
Node: sdk.Node{
Type: sdk.NodeTypePipeline,
Context: &sdk.NodeContext{
PipelineID: pip.ID,
ApplicationID: app.ID,
},
},
},
}

test.NoError(t, workflow.RenameNode(db, &w))
(&w).RetroMigrate()

proj, _ = project.LoadByID(db, cache, proj.ID, u, project.LoadOptions.WithApplications, project.LoadOptions.WithPipelines, project.LoadOptions.WithEnvironments, project.LoadOptions.WithGroups)

test.NoError(t, workflow.Insert(db, cache, &w, proj, u))
Expand Down
78 changes: 46 additions & 32 deletions engine/api/event/publish_workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package event

import (
"fmt"
"github.com/ovh/cds/sdk/log"
"time"

"github.com/fatih/structs"
Expand Down Expand Up @@ -66,55 +67,68 @@ func PublishWorkflowNodeRun(db gorp.SqlExecutor, nr sdk.WorkflowNodeRun, w sdk.W
NodeID: nr.WorkflowNodeID,
RunID: nr.WorkflowRunID,
StagesSummary: make([]sdk.StageSummary, len(nr.Stages)),
HookUUID: nr.UUID,
}

if nr.Callback != nil {
e.HookLog = nr.Callback.Log
}

for i := range nr.Stages {
e.StagesSummary[i] = nr.Stages[i].ToSummary()
}

var pipName string
node := w.GetNode(nr.WorkflowNodeID)
if node != nil {
pipName = w.Pipelines[node.PipelineID].Name
e.NodeName = node.Name
var nodeName string
var app sdk.Application
var env sdk.Environment
n := w.GetNode(nr.WorkflowNodeID)
if n == nil {
// check on workflow data
wnode := w.WorkflowData.NodeByID(nr.WorkflowNodeID)
if wnode == nil {
log.Warning("PublishWorkflowNodeRun> Unable to publish event on node %d", nr.WorkflowNodeID)
return
}
nodeName = wnode.Name
if wnode.Context != nil && wnode.Context.PipelineID != 0 {
pipName = w.Pipelines[wnode.Context.PipelineID].Name
}

if wnode.Context != nil && wnode.Context.ApplicationID != 0 {
app = w.Applications[wnode.Context.ApplicationID]
}
if wnode.Context != nil && wnode.Context.EnvironmentID != 0 {
env = w.Environments[wnode.Context.EnvironmentID]
}
} else {
nodeName = n.Name
pipName = w.Pipelines[n.PipelineID].Name
if n.Context != nil && n.Context.Application != nil {
app = *n.Context.Application
}
if n.Context != nil && n.Context.Environment != nil {
env = *n.Context.Environment
}
}

e.NodeName = nodeName
var envName string
var appName string
if node.Context != nil {
if node.Context.Application != nil {
appName = node.Context.Application.Name
e.RepositoryManagerName = node.Context.Application.VCSServer
e.RepositoryFullName = node.Context.Application.RepositoryFullname
}
if node.Context.Environment != nil {
envName = node.Context.Environment.Name
}
if app.ID != 0 {
appName = app.Name
e.RepositoryManagerName = app.VCSServer
e.RepositoryFullName = app.RepositoryFullname
}
if env.ID != 0 {
envName = env.Name
}
if sdk.StatusIsTerminated(nr.Status) {
e.Done = nr.Done.Unix()
}
publishRunWorkflow(e, w.ProjectKey, w.Name, appName, pipName, envName, nr.Number, nr.SubNumber, nr.Status)
}

// PublishWorkflowNodeOutgoingHookRun publish a EventRunWorkflowOutgoingHook
func PublishWorkflowNodeOutgoingHookRun(db gorp.SqlExecutor, hr sdk.WorkflowNodeOutgoingHookRun, w sdk.Workflow) {
evt := sdk.EventRunWorkflowOutgoingHook{
ID: hr.HookRunID,
HookID: hr.Hook.ID,
Status: hr.Status,
WorkflowRunID: hr.WorkflowRunID,
}

if hr.Callback != nil {
evt.Start = hr.Callback.Start.Unix()
evt.Done = hr.Callback.Done.Unix()
evt.Log = hr.Callback.Log
evt.WorkflowRunNumber = hr.Callback.WorkflowRunNumber
}

publishRunWorkflow(evt, w.ProjectKey, w.Name, "", "", "", hr.Number, hr.SubNumber, hr.Status)
}

// PublishWorkflowNodeJobRun publish a WorkflowNodeJobRun
func PublishWorkflowNodeJobRun(db gorp.SqlExecutor, pkey, wname string, jr sdk.WorkflowNodeJobRun) {
e := sdk.EventRunWorkflowJob{
Expand Down
5 changes: 4 additions & 1 deletion engine/api/feature/flipping.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ const (
// FeatEnableTracing is the opencensus tracing feature id
FeatEnableTracing = "cds:tracing"

// FeatDisabledWNode wnode workflow representation
FeatWNode = "cds:wnode"

cacheFeatureKey = "feature:"
)

Expand All @@ -34,7 +37,7 @@ type ProjectFeatures struct {

// List all features
func List() []string {
return []string{FeatWorkflowAsCode}
return []string{FeatWorkflowAsCode, FeatWNode}
}

// Init initialize izanami client
Expand Down
2 changes: 1 addition & 1 deletion engine/api/grpc_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (h *grpcHandlers) SendResult(c context.Context, res *sdk.Result) (*empty.Em
return new(empty.Empty), sdk.WrapError(err, "Cannot post job result")
}

workflow.ResyncNodeRunsWithCommits(c, db, h.store, p, report)
workflow.ResyncNodeRunsWithCommits(db, h.store, p, report)
go workflow.SendEvent(db, p.Key, report)

return new(empty.Empty), nil
Expand Down
12 changes: 12 additions & 0 deletions engine/api/migrate/migrate_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ func ToWorkflow(db gorp.SqlExecutor, store cache.Store, cdTree []sdk.CDPipeline,
}
newW.Root.Hooks = []sdk.WorkflowNodeHook{*h}

(&newW).Migrate(false)
if err := workflow.RenameNode(db, &newW); err != nil {
return nil, sdk.WrapError(err, "Unable to rename node")
}
(&newW).RetroMigrate()

if errW := workflow.Insert(db, store, &newW, proj, u); errW != nil {
return nil, sdk.WrapError(errW, "MigrateToWorkflow workflow.Insert>")
}
Expand All @@ -101,6 +107,12 @@ func ToWorkflow(db gorp.SqlExecutor, store cache.Store, cdTree []sdk.CDPipeline,
return nil, sdk.WrapError(errHr, "migratePipeline> Cannot register hook 2")
}
} else {
data := (&newW).Migrate(false)
newW.WorkflowData = &data
if err := workflow.RenameNode(db, &newW); err != nil {
return nil, sdk.WrapError(err, "Unable to rename node")
}
(&newW).RetroMigrate()
if errW := workflow.Insert(db, store, &newW, proj, u); errW != nil {
return nil, sdk.WrapError(errW, "MigrateToWorkflow workflow.Insert>")
}
Expand Down
106 changes: 106 additions & 0 deletions engine/api/migrate/migrate_workflow_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package migrate

import (
"github.com/go-gorp/gorp"

"github.com/ovh/cds/engine/api/cache"
"github.com/ovh/cds/engine/api/project"
"github.com/ovh/cds/engine/api/workflow"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/log"
)

// MigrateToWorkflowData migrates all workflow from WorkflowNode to Node
func MigrateToWorkflowData(DBFunc func() *gorp.DbMap, store cache.Store) {
log.Info("Start migrate MigrateToWorkflowData")
defer func() {
log.Info("End migrate MigrateToWorkflowData")
}()

for {
db := DBFunc()
var IDs []int64
query := "SELECT id FROM workflow WHERE workflow_data IS NULL AND to_delete = false AND root_node_id is not null LIMIT 100"
if _, err := db.Select(&IDs, query); err != nil {
log.Error("MigrateToWorkflowData> Unable to select workflows id: %v", err)
return
}
if len(IDs) == 0 {
return
}

jobs := make(chan int64, 100)
results := make(chan int64, 100)

// 5 workers
for w := 1; w <= 5; w++ {
go migrationWorker(db, store, jobs, results)
}

for _, ID := range IDs {
jobs <- ID
}
close(jobs)
for a := 0; a < len(IDs); a++ {
<-results
}
}
}

func migrationWorker(db *gorp.DbMap, store cache.Store, jobs <-chan int64, results chan<- int64) {
for ID := range jobs {
if err := migrateWorkflowData(db, store, ID); err != nil {
log.Error("MigrateToWorkflowData> Unable to migrate workflow data %d: %v", ID, err)
}
results <- ID
}
}

func migrateWorkflowData(db *gorp.DbMap, store cache.Store, ID int64) error {
tx, err := db.Begin()
if err != nil {
return sdk.WrapError(err, "MigrateToWorkflowData> unable to start transaction")
}
defer tx.Rollback() // nolint

query := "SELECT id FROM workflow WHERE id=$1 FOR UPDATE NOWAIT"
if _, err := tx.Exec(query, ID); err != nil {
return nil
}

p, err := project.LoadProjectByWorkflowID(tx, store, nil, ID,
project.LoadOptions.WithPlatforms,
project.LoadOptions.WithPipelines,
project.LoadOptions.WithEnvironments,
project.LoadOptions.WithApplicationWithDeploymentStrategies)
if err != nil {
return sdk.WrapError(err, "migrateWorkflowData> Unable to load project from workflow %d", ID)
}

w, err := workflow.LoadByID(tx, store, p, ID, nil, workflow.LoadOptions{})
if err != nil {
return sdk.WrapError(err, "migrateWorkflowData> Unable to load workflow %d", ID)
}

if w.WorkflowData != nil {
return nil
}

data := w.Migrate(false)
w.WorkflowData = &data

if err := workflow.RenameNode(tx, w); err != nil {
return sdk.WrapError(err, "Unable to rename node")
}

if err := workflow.InsertWorkflowData(tx, w); err != nil {
return sdk.WrapError(err, "migrateWorkflowData> Unable to insert Workflow Data")
}

dbWorkflow := workflow.Workflow(*w)
if err := dbWorkflow.PostUpdate(tx); err != nil {
return sdk.WrapError(err, "migrateWorkflowData> Unable to update workflow %d", ID)
}

return tx.Commit()
}
Loading

0 comments on commit 6c94766

Please sign in to comment.