Skip to content

Commit

Permalink
fix: unable to sync directories to work nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
tikazyq committed Jul 1, 2024
1 parent 68307d2 commit 023ba27
Show file tree
Hide file tree
Showing 40 changed files with 765 additions and 1,537 deletions.
149 changes: 0 additions & 149 deletions core/apps/server.go

This file was deleted.

29 changes: 0 additions & 29 deletions core/apps/server_test.go

This file was deleted.

4 changes: 2 additions & 2 deletions core/apps/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ func NewServerV2() (app NodeApp) {
// node service
var err error
if utils.IsMaster() {
svr.nodeSvc, err = service.NewMasterServiceV2()
svr.nodeSvc, err = service.GetMasterServiceV2()
} else {
svr.nodeSvc, err = service.NewWorkerServiceV2()
svr.nodeSvc, err = service.GetWorkerServiceV2()
}
if err != nil {
panic(err)
Expand Down
3 changes: 1 addition & 2 deletions core/apps/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ var injectors = []interface{}{
modelsclient.NewEnvironmentServiceDelegate,
grpcclient.NewClient,
grpcclient.NewPool,
grpcserver.GetServer,
grpcserver.NewModelDelegateServer,
grpcserver.NewModelBaseServiceServer,
grpcserver.NewNodeServer,
Expand All @@ -75,7 +74,7 @@ var injectors = []interface{}{
schedule.GetScheduleService,
admin.GetSpiderAdminService,
stats.GetStatsService,
nodeconfig.NewNodeConfigService,
nodeconfig.GetNodeConfigService,
taskstats.GetTaskStatsService,
color.NewService,
scheduler.GetTaskSchedulerService,
Expand Down
1 change: 1 addition & 0 deletions core/grpc/server/model_base_service_v2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var (
*new(models.ScheduleV2),
*new(models.SettingV2),
*new(models.SpiderV2),
*new(models.SpiderStatV2),
*new(models.TaskQueueItemV2),
*new(models.TaskStatV2),
*new(models.TaskV2),
Expand Down
5 changes: 1 addition & 4 deletions core/grpc/server/node_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,7 @@ func NewNodeServer() (res *NodeServer, err error) {
if err != nil {
return nil, err
}
svr.cfgSvc, err = nodeconfig.NewNodeConfigService()
if err != nil {
return nil, err
}
svr.cfgSvc = nodeconfig.GetNodeConfigService()

return svr, nil
}
95 changes: 41 additions & 54 deletions core/grpc/server/node_server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package server

import (
"context"
"encoding/json"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/core/constants"
"github.com/crawlab-team/crawlab/core/entity"
Expand All @@ -16,6 +15,7 @@ import (
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"sync"
"time"
)

Expand All @@ -30,78 +30,61 @@ type NodeServerV2 struct {
}

// Register from handler/worker to master
func (svr NodeServerV2) Register(ctx context.Context, req *grpc.Request) (res *grpc.Response, err error) {
func (svr NodeServerV2) Register(ctx context.Context, req *grpc.NodeServiceRegisterRequest) (res *grpc.Response, err error) {
// unmarshall data
var node models.NodeV2
if req.Data != nil {
if err := json.Unmarshal(req.Data, &node); err != nil {
return HandleError(err)
}

if node.IsMaster {
// error: cannot register master node
return HandleError(errors.ErrorGrpcNotAllowed)
}
if req.IsMaster {
// error: cannot register master node
return HandleError(errors.ErrorGrpcNotAllowed)
}

// node key
var nodeKey string
if req.NodeKey != "" {
nodeKey = req.NodeKey
} else {
nodeKey = node.Key
}
if nodeKey == "" {
if req.Key == "" {
return HandleError(errors.ErrorModelMissingRequiredData)
}

// find in db
nodeDb, err := service.NewModelServiceV2[models.NodeV2]().GetOne(bson.M{"key": nodeKey}, nil)
var node *models.NodeV2
node, err = service.NewModelServiceV2[models.NodeV2]().GetOne(bson.M{"key": req.Key}, nil)
if err == nil {
if node.IsMaster {
// error: cannot register master node
return HandleError(errors.ErrorGrpcNotAllowed)
} else {
// register existing
nodeDb.Status = constants.NodeStatusRegistered
nodeDb.Active = true
err = service.NewModelServiceV2[models.NodeV2]().ReplaceById(nodeDb.Id, *nodeDb)
if err != nil {
return HandleError(err)
}
log.Infof("[NodeServerV2] updated worker[%s] in db. id: %s", nodeKey, nodeDb.Id.Hex())
}
} else if errors2.Is(err, mongo.ErrNoDocuments) {
// register new
node.Key = nodeKey
// register existing
node.Status = constants.NodeStatusRegistered
node.Active = true
node.ActiveAt = time.Now()
node.Enabled = true
if node.Name == "" {
node.Name = nodeKey
err = service.NewModelServiceV2[models.NodeV2]().ReplaceById(node.Id, *node)
if err != nil {
return HandleError(err)
}
log.Infof("[NodeServerV2] updated worker[%s] in db. id: %s", req.Key, node.Id.Hex())
} else if errors2.Is(err, mongo.ErrNoDocuments) {
// register new
node = &models.NodeV2{
Key: req.Key,
Status: constants.NodeStatusRegistered,
Active: true,
ActiveAt: time.Now(),
Enabled: true,
}
node.SetCreated(primitive.NilObjectID)
node.SetUpdated(primitive.NilObjectID)
node.Id, err = service.NewModelServiceV2[models.NodeV2]().InsertOne(node)
node.Id, err = service.NewModelServiceV2[models.NodeV2]().InsertOne(*node)
if err != nil {
return HandleError(err)
}
log.Infof("[NodeServerV2] added worker[%s] in db. id: %s", nodeKey, node.Id.Hex())
log.Infof("[NodeServerV2] added worker[%s] in db. id: %s", req.Key, node.Id.Hex())
} else {
// error
return HandleError(err)
}

log.Infof("[NodeServerV2] master registered worker[%s]", req.GetNodeKey())
log.Infof("[NodeServerV2] master registered worker[%s]", req.Key)

return HandleSuccessWithData(node)
}

// SendHeartbeat from worker to master
func (svr NodeServerV2) SendHeartbeat(ctx context.Context, req *grpc.Request) (res *grpc.Response, err error) {
func (svr NodeServerV2) SendHeartbeat(ctx context.Context, req *grpc.NodeServiceSendHeartbeatRequest) (res *grpc.Response, err error) {
// find in db
node, err := service.NewModelServiceV2[models.NodeV2]().GetOne(bson.M{"key": req.NodeKey}, nil)
node, err := service.NewModelServiceV2[models.NodeV2]().GetOne(bson.M{"key": req.Key}, nil)
if err != nil {
if errors2.Is(err, mongo.ErrNoDocuments) {
return HandleError(errors.ErrorNodeNotExists)
Expand All @@ -126,11 +109,6 @@ func (svr NodeServerV2) SendHeartbeat(ctx context.Context, req *grpc.Request) (r
return HandleSuccessWithData(node)
}

// Ping from worker to master
func (svr NodeServerV2) Ping(ctx context.Context, req *grpc.Request) (res *grpc.Response, err error) {
return HandleSuccess()
}

func (svr NodeServerV2) Subscribe(request *grpc.Request, stream grpc.NodeService_SubscribeServer) (err error) {
log.Infof("[NodeServerV2] master received subscribe request from node[%s]", request.NodeKey)

Expand Down Expand Up @@ -177,13 +155,22 @@ func (svr NodeServerV2) Unsubscribe(ctx context.Context, req *grpc.Request) (res
}, nil
}

var nodeSvrV2 *NodeServerV2
var nodeSvrV2Once = new(sync.Once)

func NewNodeServerV2() (res *NodeServerV2, err error) {
// node server
svr := &NodeServerV2{}
svr.cfgSvc, err = nodeconfig.NewNodeConfigService()
if nodeSvrV2 != nil {
return nodeSvrV2, nil
}
nodeSvrV2Once.Do(func() {
nodeSvrV2 = &NodeServerV2{}
nodeSvrV2.cfgSvc = nodeconfig.GetNodeConfigService()
if err != nil {
log.Errorf("[NodeServerV2] error: %s", err.Error())
}
})
if err != nil {
return nil, err
}

return svr, nil
return nodeSvrV2, nil
}
Loading

0 comments on commit 023ba27

Please sign in to comment.