Skip to content

Commit

Permalink
fix(api,sql): nodeExits function (#3841)
Browse files Browse the repository at this point in the history
  • Loading branch information
fsamin authored and sguiheux committed Jan 11, 2019
1 parent 9548abb commit 6a0e575
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 32 deletions.
24 changes: 19 additions & 5 deletions engine/api/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,21 @@ func TestPostAdminMigrationCancelHandler(t *testing.T) {
//Create admin user
u, pass := assets.InsertAdminUser(api.mustDB())

//Load all migration
uri := router.GetRoute("GET", api.getAdminMigrationsHandler, nil)
req, err := http.NewRequest("GET", uri, nil)
test.NoError(t, err)
assets.AuthentifyRequest(t, req, u, pass)

// Do the request
w := httptest.NewRecorder()
router.Mux.ServeHTTP(w, req)
assert.Equal(t, 200, w.Code)

var migrations []sdk.Migration
test.NoError(t, json.Unmarshal(w.Body.Bytes(), &migrations))
numberOfMigrations := len(migrations)

mig := sdk.Migration{
Name: "TestMigration",
Status: sdk.MigrationStatusInProgress,
Expand All @@ -31,21 +46,20 @@ func TestPostAdminMigrationCancelHandler(t *testing.T) {
_ = migrate.Delete(db, &mig)
}()

uri := router.GetRoute("GET", api.getAdminMigrationsHandler, nil)
req, err := http.NewRequest("GET", uri, nil)
uri = router.GetRoute("GET", api.getAdminMigrationsHandler, nil)
req, err = http.NewRequest("GET", uri, nil)
test.NoError(t, err)
assets.AuthentifyRequest(t, req, u, pass)

// Do the request
w := httptest.NewRecorder()
w = httptest.NewRecorder()
router.Mux.ServeHTTP(w, req)
assert.Equal(t, 200, w.Code)

var migrations []sdk.Migration
test.NoError(t, json.Unmarshal(w.Body.Bytes(), &migrations))

assert.NotNil(t, migrations)
assert.Equal(t, 1, len(migrations))
assert.Equal(t, numberOfMigrations+1, len(migrations))

//Prepare post request
uri = router.GetRoute("POST", api.postAdminMigrationCancelHandler, map[string]string{"id": fmt.Sprintf("%d", migrations[0].ID)})
Expand Down
2 changes: 1 addition & 1 deletion engine/api/platform/dao_platform.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func LoadPlatformsByName(db gorp.SqlExecutor, key string, name string, clearPwd
WHERE project.projectkey = $1 AND project_platform.name = $2
`
if err := db.SelectOne(&pp, query, key, name); err != nil {
return sdk.ProjectPlatform{}, sdk.WrapError(err, "Cannot load platform")
return sdk.ProjectPlatform{}, sdk.WithStack(err)
}
p := sdk.ProjectPlatform(pp)
for k, v := range p.Config {
Expand Down
2 changes: 1 addition & 1 deletion engine/api/project_platform.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (api *API) putProjectPlatformHandler() service.Handler {

ppDB, errP := platform.LoadPlatformsByName(api.mustDB(), projectKey, platformName, true)
if errP != nil {
return sdk.WrapError(errP, "putProjectPlatformHandler> Cannot load project platform")
return sdk.WrapError(errP, "putProjectPlatformHandler> Cannot load platform %s for project %s", platformName, projectKey)
}

//If the platform model is public, it's forbidden to update the project platform
Expand Down
31 changes: 9 additions & 22 deletions engine/api/project_platform_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package api

import (
"bytes"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"

Expand All @@ -28,40 +25,33 @@ func TestAddUpdateAndDeleteProjectPlatform(t *testing.T) {
models, _ = platform.LoadModels(db)
}

platformModel, err := platform.LoadModelByName(db, sdk.KafkaPlatform.Name, false)
test.NoError(t, err)

pp := sdk.ProjectPlatform{
Name: "kafkaTest",
Config: sdk.KafkaPlatform.DefaultConfig,
PlatformModelID: models[0].ID,
PlatformModelID: platformModel.ID,
}

// ADD project platform
jsonBody, _ := json.Marshal(pp)
body := bytes.NewBuffer(jsonBody)

vars := map[string]string{}
vars["permProjectKey"] = proj.Key
uri := router.GetRoute("POST", api.postProjectPlatformHandler, vars)
req, err := http.NewRequest("POST", uri, body)
test.NoError(t, err)
assets.AuthentifyRequest(t, req, u, pass)

req := assets.NewAuthentifiedRequest(t, u, pass, "POST", uri, pp)
w := httptest.NewRecorder()
router.Mux.ServeHTTP(w, req)
assert.Equal(t, 200, w.Code)

// UPDATE project platform
pp.Name = "kafkaTest2"
pp.ProjectID = proj.ID
jsonBody, _ = json.Marshal(pp)
body = bytes.NewBuffer(jsonBody)

vars = map[string]string{}
vars["permProjectKey"] = proj.Key
vars["platformName"] = "kafkaTest"
uri = router.GetRoute("PUT", api.putProjectPlatformHandler, vars)
req, err = http.NewRequest("PUT", uri, body)
test.NoError(t, err)
assets.AuthentifyRequest(t, req, u, pass)
req = assets.NewAuthentifiedRequest(t, u, pass, "PUT", uri, pp)

w = httptest.NewRecorder()
router.Mux.ServeHTTP(w, req)
Expand All @@ -72,9 +62,8 @@ func TestAddUpdateAndDeleteProjectPlatform(t *testing.T) {
vars["permProjectKey"] = proj.Key
vars["platformName"] = pp.Name
uri = router.GetRoute("GET", api.getProjectPlatformHandler, vars)
req, err = http.NewRequest("GET", uri, nil)
test.NoError(t, err)
assets.AuthentifyRequest(t, req, u, pass)

req = assets.NewAuthentifiedRequest(t, u, pass, "GET", uri, nil)

w = httptest.NewRecorder()
router.Mux.ServeHTTP(w, req)
Expand All @@ -85,9 +74,7 @@ func TestAddUpdateAndDeleteProjectPlatform(t *testing.T) {
vars["permProjectKey"] = proj.Key
vars["platformName"] = pp.Name
uri = router.GetRoute("DELETE", api.deleteProjectPlatformHandler, vars)
req, err = http.NewRequest("DELETE", uri, nil)
test.NoError(t, err)
assets.AuthentifyRequest(t, req, u, pass)
req = assets.NewAuthentifiedRequest(t, u, pass, "DELETE", uri, nil)

w = httptest.NewRecorder()
router.Mux.ServeHTTP(w, req)
Expand Down
4 changes: 2 additions & 2 deletions engine/api/workflow/dao_node_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,8 @@ func insertWorkflowNodeRun(db gorp.SqlExecutor, n *sdk.WorkflowNodeRun) error {
return nil
}

func nodeRunExist(db gorp.SqlExecutor, nodeID, num int64, subnumber int) (bool, error) {
nb, err := db.SelectInt("SELECT COUNT(1) FROM workflow_node_run WHERE workflow_node_id = $1 AND num = $2 AND sub_num = $3", nodeID, num, subnumber)
func nodeRunExist(db gorp.SqlExecutor, workflowRunID, nodeID, num int64, subnumber int) (bool, error) {
nb, err := db.SelectInt("SELECT COUNT(1) FROM workflow_node_run WHERE workflow_run_id = $4 AND workflow_node_id = $1 AND num = $2 AND sub_num = $3", nodeID, num, subnumber, workflowRunID)
return nb > 0, err
}

Expand Down
2 changes: 1 addition & 1 deletion engine/api/workflow/process_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func processNodeTriggers(ctx context.Context, db gorp.SqlExecutor, store cache.S

func processNodeRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj *sdk.Project, wr *sdk.WorkflowRun, mapNodes map[int64]*sdk.Node, n *sdk.Node, subNumber int, parentNodeRuns []*sdk.WorkflowNodeRun, hookEvent *sdk.WorkflowNodeRunHookEvent, manual *sdk.WorkflowNodeRunManual) (*ProcessorReport, bool, error) {
report := new(ProcessorReport)
exist, errN := nodeRunExist(db, n.ID, wr.Number, subNumber)
exist, errN := nodeRunExist(db, wr.ID, n.ID, wr.Number, subNumber)
if errN != nil {
return nil, false, sdk.WrapError(errN, "processNodeRun> unable to check if node run exist")
}
Expand Down
6 changes: 6 additions & 0 deletions engine/sql/151_workflow_node_run_idx.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- +migrate Up
DROP INDEX idx_workflow_node_run_subnum;
SELECT create_unique_index('workflow_node_run', 'idx_workflow_node_run_subnum', 'workflow_run_id,workflow_node_id, num, sub_num');

-- +migrate Down
SELECT 1;

0 comments on commit 6a0e575

Please sign in to comment.