Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iwf-401: update activity type name to find reset history eventID #515

Merged
merged 2 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 167 additions & 0 deletions integ/reset_by_state_id_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package integ

import (
"context"
"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/integ/workflow/reset"
"github.com/indeedeng/iwf/service"
"github.com/indeedeng/iwf/service/common/ptr"
"github.com/stretchr/testify/assert"
"strconv"
"testing"
"time"
)

func TestResetByStateIdWorkflowTemporal(t *testing.T) {
if !*temporalIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestResetByStatIdWorkflow(t, service.BackendTypeTemporal, nil)
smallWaitForFastTest()

//TODO: uncomment below when IWF-403 implementation is done.
//TODO cont.: Reset with state id & state execution id is broken for local activities.
//doTestResetByStatIdWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true))
//smallWaitForFastTest()
}
}

func TestResetByStateIdWorkflowCadence(t *testing.T) {
if !*cadenceIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestResetByStatIdWorkflow(t, service.BackendTypeCadence, nil)
smallWaitForFastTest()

//TODO: uncomment below when IWF-403 implementation is done.
//TODO cont.: Reset with state id & state execution id is broken for local activities.
//doTestResetByStatIdWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false))
//smallWaitForFastTest()
}
}

func doTestResetByStatIdWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) {
// start test workflow server
wfHandler := reset.NewHandler()
closeFunc1 := startWorkflowWorker(wfHandler)
defer closeFunc1()

_, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{
BackendType: backendType,
})
defer closeFunc2()

// start a workflow
apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{
Servers: []iwfidl.ServerConfiguration{
{
URL: "http://localhost:" + testIwfServerPort,
},
},
})
wfId := reset.WorkflowType + strconv.Itoa(int(time.Now().UnixNano()))
wfInput := &iwfidl.EncodedObject{
Encoding: iwfidl.PtrString("json"),
Data: iwfidl.PtrString("1"),
}
req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background())
startReq := iwfidl.WorkflowStartRequest{
WorkflowId: wfId,
IwfWorkflowType: reset.WorkflowType,
WorkflowTimeoutSeconds: 100,
IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort,
StartStateId: ptr.Any(reset.State1),
StateInput: wfInput,
WorkflowStartOptions: &iwfidl.WorkflowStartOptions{
WorkflowConfigOverride: config,
WorkflowIDReusePolicy: ptr.Any(iwfidl.REJECT_DUPLICATE),
},
StateOptions: &iwfidl.WorkflowStateOptions{
//Skipping wait until for state1
SkipWaitUntil: iwfidl.PtrBool(true),
},
}
startResp, httpResp, err := req.WorkflowStartRequest(startReq).Execute()
panicAtHttpError(err, httpResp)

assertions := assert.New(t)

req2 := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background())
resp2, httpResp, err := req2.WorkflowGetRequest(iwfidl.WorkflowGetRequest{
WorkflowId: wfId,
}).Execute()
panicAtHttpError(err, httpResp)

history, _ := wfHandler.GetTestResult()
//expect no starts in history as WaitUntil api is skipped.
assertions.Equalf(map[string]int64{
"S1_decide": 1,
"S2_decide": 5,
}, history, "reset test fail, %v", history)

assertions.Equal(iwfidl.COMPLETED, resp2.GetWorkflowStatus())
assertions.Equal(1, len(resp2.GetResults()))
assertions.Equal("S2", resp2.GetResults()[0].CompletedStateId)
assertions.Equal("S2-5", resp2.GetResults()[0].CompletedStateExecutionId)
assertions.Equal("5", resp2.GetResults()[0].CompletedStateOutput.GetData())

//reset workflow by state id
resetReq := apiClient.DefaultApi.ApiV1WorkflowResetPost(context.Background())
_, httpResp, err = resetReq.WorkflowResetRequest(iwfidl.WorkflowResetRequest{
WorkflowRunId: iwfidl.PtrString(startResp.GetWorkflowRunId()),
WorkflowId: wfId,
ResetType: iwfidl.STATE_ID,
StateId: iwfidl.PtrString(reset.State2),
}).Execute()
panicAtHttpError(err, httpResp)

req3 := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background())
resp3, httpResp, err := req3.WorkflowGetRequest(iwfidl.WorkflowGetRequest{
WorkflowId: wfId,
}).Execute()
panicAtHttpError(err, httpResp)

resetHistory, _ := wfHandler.GetTestResult()
//expect no starts in history as WaitUntil api is skipped.
assertions.Equalf(map[string]int64{
"S1_decide": 1,
"S2_decide": 10,
}, resetHistory, "reset test fail, %v", resetHistory)

assertions.Equal(iwfidl.COMPLETED, resp3.GetWorkflowStatus())
assertions.Equal(1, len(resp3.GetResults()))
assertions.Equal("S2", resp3.GetResults()[0].CompletedStateId)
assertions.Equal("S2-5", resp3.GetResults()[0].CompletedStateExecutionId)
assertions.Equal("5", resp3.GetResults()[0].CompletedStateOutput.GetData())

//reset workflow by state execution id
reset2Req := apiClient.DefaultApi.ApiV1WorkflowResetPost(context.Background())
_, httpResp, err = reset2Req.WorkflowResetRequest(iwfidl.WorkflowResetRequest{
WorkflowRunId: iwfidl.PtrString(startResp.GetWorkflowRunId()),
WorkflowId: wfId,
ResetType: iwfidl.STATE_EXECUTION_ID,
StateExecutionId: iwfidl.PtrString(reset.State2 + "-4"),
}).Execute()
panicAtHttpError(err, httpResp)

req4 := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background())
resp4, httpResp, err := req4.WorkflowGetRequest(iwfidl.WorkflowGetRequest{
WorkflowId: wfId,
}).Execute()
panicAtHttpError(err, httpResp)

reset2History, _ := wfHandler.GetTestResult()
//expect no starts in history as WaitUntil api is skipped.
assertions.Equalf(map[string]int64{
"S1_decide": 1,
"S2_decide": 12,
}, reset2History, "reset test fail, %v", reset2History)

assertions.Equal(iwfidl.COMPLETED, resp4.GetWorkflowStatus())
assertions.Equal(1, len(resp4.GetResults()))
assertions.Equal("S2", resp4.GetResults()[0].CompletedStateId)
assertions.Equal("S2-5", resp4.GetResults()[0].CompletedStateExecutionId)
assertions.Equal("5", resp4.GetResults()[0].CompletedStateOutput.GetData())
}
121 changes: 121 additions & 0 deletions integ/workflow/reset/routers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package reset

import (
"fmt"
"github.com/gin-gonic/gin"
"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/service"
"log"
"net/http"
"strconv"
)

/**
* This test workflow has 2 states, using REST controller to implement the workflow directly.
* State1:
* - No WaitUntil
* - Execute moves to State2
* State2:
* - No WaitUntil
* - Execute loops through state2 5 times, then gracefully completes the workflow.
* This test is used for testing reset by state id and state execution id without WaitUntil
*/
const (
WorkflowType = "reset"
State1 = "S1"
State2 = "S2"
)

type handler struct {
invokeHistory map[string]int64
}

func NewHandler() *handler {
return &handler{
invokeHistory: make(map[string]int64),
}
}

// ApiV1WorkflowStartPost - for a workflow
func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {
panic("No start call is expected.")
}

func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
log.Println("start of decide")
var req iwfidl.WorkflowStateDecideRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
log.Println("received state decide request, ", req)
context := req.GetContext()
if context.GetAttempt() <= 0 || context.GetFirstAttemptTimestamp() <= 0 {
panic("attempt and firstAttemptTimestamp should be greater than zero")
}

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++
if req.GetWorkflowStateId() == State1 {
// go to S2
c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
{
StateId: State2,
StateInput: req.StateInput,
StateOptions: &iwfidl.WorkflowStateOptions{
//Skipping wait until for 1st execution of state2
SkipWaitUntil: iwfidl.PtrBool(true),
},
},
},
},
})
return
} else if req.GetWorkflowStateId() == State2 {
input := req.GetStateInput()
i, _ := strconv.Atoi(input.GetData())
if i < 5 {
updatedInput := &iwfidl.EncodedObject{
Encoding: iwfidl.PtrString("json"),
Data: iwfidl.PtrString(fmt.Sprintf("%v", i+1)),
}
c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
{
StateId: State2,
StateInput: updatedInput,
StateOptions: &iwfidl.WorkflowStateOptions{
//Skipping wait until for all executions of state2 after the 1st execution.
SkipWaitUntil: iwfidl.PtrBool(true),
},
},
},
},
})
return
} else {
// go to complete
c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
{
StateId: service.GracefulCompletingWorkflowStateId,
StateInput: req.StateInput,
},
},
},
})
return
}
}
}

c.JSON(http.StatusBadRequest, struct{}{})
}

func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) {
return h.invokeHistory, nil
}
3 changes: 2 additions & 1 deletion service/client/cadence/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,10 @@ func getDecisionEventIDByStateOrStateExecutionId(
if e.GetEventType() == shared.EventTypeDecisionTaskCompleted {
decisionFinishID = e.GetEventId()
}
//TODO: Add check for local activity. (IWF-403)
if e.GetEventType() == shared.EventTypeActivityTaskScheduled {
typeName := e.GetActivityTaskScheduledEventAttributes().GetActivityType().GetName()
if strings.Contains(typeName, "StateStart") || strings.Contains(typeName, "StateApiWaitUntil") {
if strings.Contains(typeName, "StateApiExecute") || strings.Contains(typeName, "StateApiWaitUntil") {
var backendType service.BackendType
var input service.StateStartActivityInput
err = converter.FromData(e.GetActivityTaskScheduledEventAttributes().Input, &backendType, &input)
Expand Down
3 changes: 2 additions & 1 deletion service/client/temporal/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,10 @@ func getDecisionEventIDByStateOrStateExecutionId(
if e.GetEventType() == enums.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
decisionFinishID = e.GetEventId()
}
//TODO: Add check for local activity. (IWF-403)
if e.GetEventType() == enums.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED {
typeName := e.GetActivityTaskScheduledEventAttributes().GetActivityType().GetName()
if strings.Contains(typeName, "StateStart") || strings.Contains(typeName, "StateApiWaitUntil") {
if strings.Contains(typeName, "StateApiExecute") || strings.Contains(typeName, "StateApiWaitUntil") {
var backendType service.BackendType
var input service.StateStartActivityInput
err = converter.FromPayloads(e.GetActivityTaskScheduledEventAttributes().Input, &backendType, &input)
Expand Down
Loading