Skip to content

Commit

Permalink
fix: not return task ids
Browse files Browse the repository at this point in the history
  • Loading branch information
tikazyq committed Apr 8, 2023
1 parent d687e0e commit ed5e9f0
Show file tree
Hide file tree
Showing 10 changed files with 38 additions and 35 deletions.
5 changes: 3 additions & 2 deletions controllers/spider.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,12 +318,13 @@ func (ctx *spiderContext) run(c *gin.Context) {
}

// schedule
if err := ctx.adminSvc.Schedule(id, &opts); err != nil {
taskIds, err := ctx.adminSvc.Schedule(id, &opts)
if err != nil {
HandleErrorInternalServerError(c, err)
return
}

HandleSuccess(c)
HandleSuccessWithData(c, taskIds)
}

func (ctx *spiderContext) getGit(c *gin.Context) {
Expand Down
10 changes: 6 additions & 4 deletions controllers/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,13 @@ func (ctx *taskContext) run(c *gin.Context) {
}

// run
if err := ctx.adminSvc.Schedule(s.GetId(), opts); err != nil {
taskIds, err := ctx.adminSvc.Schedule(s.GetId(), opts)
if err != nil {
HandleErrorInternalServerError(c, err)
return
}

HandleSuccess(c)
HandleSuccessWithData(c, taskIds)
}

func (ctx *taskContext) restart(c *gin.Context) {
Expand Down Expand Up @@ -157,12 +158,13 @@ func (ctx *taskContext) restart(c *gin.Context) {
}

// run
if err := ctx.adminSvc.Schedule(t.SpiderId, opts); err != nil {
taskIds, err := ctx.adminSvc.Schedule(t.SpiderId, opts)
if err != nil {
HandleErrorInternalServerError(c, err)
return
}

HandleSuccess(c)
HandleSuccessWithData(c, taskIds)
}

func (ctx *taskContext) cancel(c *gin.Context) {
Expand Down
2 changes: 1 addition & 1 deletion interfaces/spider_admin_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ type SpiderAdminService interface {
WithConfigPath
Start() (err error)
// Schedule a new task of the spider
Schedule(id primitive.ObjectID, opts *SpiderRunOptions) (err error)
Schedule(id primitive.ObjectID, opts *SpiderRunOptions) (taskIds []primitive.ObjectID, err error)
// Clone the spider
Clone(id primitive.ObjectID, opts *SpiderCloneOptions) (err error)
// Delete the spider
Expand Down
2 changes: 1 addition & 1 deletion interfaces/task_scheduler_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
type TaskSchedulerService interface {
TaskBaseService
// Enqueue task into the task queue
Enqueue(t Task) (err error)
Enqueue(t Task) (t2 Task, err error)
// Cancel task to corresponding node
Cancel(id primitive.ObjectID, args ...interface{}) (err error)
// SetInterval set the interval or duration between two adjacent fetches
Expand Down
2 changes: 1 addition & 1 deletion schedule/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (svc *Service) schedule(id primitive.ObjectID) (fn func()) {
}

// schedule or assign a task in the task queue
if err := svc.adminSvc.Schedule(s.GetSpiderId(), opts); err != nil {
if _, err := svc.adminSvc.Schedule(s.GetSpiderId(), opts); err != nil {
trace.PrintError(err)
}
}
Expand Down
30 changes: 15 additions & 15 deletions spider/admin/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,15 @@ func (svc *Service) Start() (err error) {
return svc.SyncGit()
}

func (svc *Service) Schedule(id primitive.ObjectID, opts *interfaces.SpiderRunOptions) (err error) {
func (svc *Service) Schedule(id primitive.ObjectID, opts *interfaces.SpiderRunOptions) (taskIds []primitive.ObjectID, err error) {
// spider
s, err := svc.modelSvc.GetSpiderById(id)
if err != nil {
return err
return nil, err
}

// assign tasks
if err := svc.scheduleTasks(s, opts); err != nil {
return err
}

return nil
return svc.scheduleTasks(s, opts)
}

func (svc *Service) Clone(id primitive.ObjectID, opts *interfaces.SpiderCloneOptions) (err error) {
Expand All @@ -79,7 +75,7 @@ func (svc *Service) SyncGit() (err error) {
return nil
}

func (svc *Service) scheduleTasks(s *models.Spider, opts *interfaces.SpiderRunOptions) (err error) {
func (svc *Service) scheduleTasks(s *models.Spider, opts *interfaces.SpiderRunOptions) (taskIds []primitive.ObjectID, err error) {
// main task
mainTask := &models.Task{
SpiderId: s.Id,
Expand Down Expand Up @@ -118,7 +114,7 @@ func (svc *Service) scheduleTasks(s *models.Spider, opts *interfaces.SpiderRunOp
//}
nodeIds, err := svc.getNodeIds(opts)
if err != nil {
return err
return nil, err
}
for _, nodeId := range nodeIds {
t := &models.Task{
Expand All @@ -133,25 +129,29 @@ func (svc *Service) scheduleTasks(s *models.Spider, opts *interfaces.SpiderRunOp
Priority: opts.Priority,
UserId: opts.UserId,
}
if err := svc.schedulerSvc.Enqueue(t); err != nil {
return err
t2, err := svc.schedulerSvc.Enqueue(t)
if err != nil {
return nil, err
}
taskIds = append(taskIds, t2.GetId())
}
} else {
// single task
nodeIds, err := svc.getNodeIds(opts)
if err != nil {
return err
return nil, err
}
if len(nodeIds) > 0 {
mainTask.NodeId = nodeIds[0]
}
if err := svc.schedulerSvc.Enqueue(mainTask); err != nil {
return err
t2, err := svc.schedulerSvc.Enqueue(mainTask)
if err != nil {
return nil, err
}
taskIds = append(taskIds, t2.GetId())
}

return nil
return taskIds, nil
}

func (svc *Service) getNodeIds(opts *interfaces.SpiderRunOptions) (nodeIds []primitive.ObjectID, err error) {
Expand Down
4 changes: 2 additions & 2 deletions spider/test/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestAdminService_Run(t *testing.T) {

// TODO: implement
// run
err = T.adminSvc.Schedule(T.TestSpider.Id, &interfaces.SpiderRunOptions{
_, err = T.adminSvc.Schedule(T.TestSpider.Id, &interfaces.SpiderRunOptions{
Mode: constants.RunTypeRandom,
})
require.Nil(t, err)
Expand All @@ -25,5 +25,5 @@ func TestAdminService_Run(t *testing.T) {
task, err := T.modelSvc.GetTask(bson.M{"spider_id": T.TestSpider.Id}, nil)
require.Nil(t, err)
require.False(t, task.Id.IsZero())
require.Equal(t, constants.TaskStatusFinished, task.Status)
require.NotEqual(t, constants.TaskStatusPending, task.Status)
}
2 changes: 1 addition & 1 deletion task/fs/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestTaskFsService(t *testing.T) {
require.Nil(t, err)

t.Run("sync-to-workspace", func(t *testing.T) {
fsSvc, err := NewTaskFsService(task.Id)
fsSvc, err := NewTaskFsService(task.Id, spider.Id)
require.Nil(t, err)

err = fsSvc.GetFsService().SyncToWorkspace()
Expand Down
10 changes: 5 additions & 5 deletions task/scheduler/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (svc *Service) Start() {
svc.Stop()
}

func (svc *Service) Enqueue(t interfaces.Task) (err error) {
func (svc *Service) Enqueue(t interfaces.Task) (t2 interfaces.Task, err error) {
// set task status
t.SetStatus(constants.TaskStatusPending)

Expand All @@ -60,7 +60,7 @@ func (svc *Service) Enqueue(t interfaces.Task) (err error) {

// add task
if err = delegate.NewModelDelegate(t, u).Add(); err != nil {
return err
return nil, err
}

// task queue item
Expand All @@ -79,17 +79,17 @@ func (svc *Service) Enqueue(t interfaces.Task) (err error) {
// enqueue task
_, err = mongo.GetMongoCol(interfaces.ModelColNameTaskQueue).Insert(tq)
if err != nil {
return trace.TraceError(err)
return nil, trace.TraceError(err)
}

// add task stat
_, err = mongo.GetMongoCol(interfaces.ModelColNameTaskStat).Insert(ts)
if err != nil {
return trace.TraceError(err)
return nil, trace.TraceError(err)
}

// success
return nil
return t, nil
}

func (svc *Service) Cancel(id primitive.ObjectID, args ...interface{}) (err error) {
Expand Down
6 changes: 3 additions & 3 deletions task/test/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestHandlerService_Run(t *testing.T) {
T.Setup(t)

task := T.NewTask()
err = T.schedulerSvc.Enqueue(task)
_, err = T.schedulerSvc.Enqueue(task)
require.Nil(t, err)

err = T.handlerSvc.Run(task.GetId())
Expand All @@ -30,7 +30,7 @@ func TestHandlerService_Cancel(t *testing.T) {
T.Setup(t)

task := T.NewTaskLong()
err = T.schedulerSvc.Enqueue(task)
_, err = T.schedulerSvc.Enqueue(task)
require.Nil(t, err)
time.Sleep(1 * time.Second)

Expand All @@ -57,7 +57,7 @@ func TestHandlerService_ReportStatus(t *testing.T) {
T.Setup(t)

task := T.NewTaskLong()
err = T.schedulerSvc.Enqueue(task)
_, err = T.schedulerSvc.Enqueue(task)
require.Nil(t, err)
time.Sleep(1 * time.Second)

Expand Down

0 comments on commit ed5e9f0

Please sign in to comment.