From 08d9f4f676118b1c3e01d9d07500d8cd6d84451a Mon Sep 17 00:00:00 2001 From: Marvin Zhang Date: Fri, 14 Jun 2024 19:24:09 +0800 Subject: [PATCH] feat: updated grpc proto --- core/go.mod | 2 +- core/grpc/server/dependencies_server_v2.go | 81 ++++++++++++++++--- .../server/model_base_service_v2_server.go | 3 + core/models/models/dependency_log_v2.go | 10 +++ core/models/models/dependency_task_v2.go | 15 ++++ core/models/models/dependency_v2.go | 14 ++++ 6 files changed, 111 insertions(+), 14 deletions(-) create mode 100644 core/models/models/dependency_log_v2.go create mode 100644 core/models/models/dependency_task_v2.go create mode 100644 core/models/models/dependency_v2.go diff --git a/core/go.mod b/core/go.mod index f4f15f3e5..98fc835d8 100644 --- a/core/go.mod +++ b/core/go.mod @@ -17,7 +17,7 @@ require ( github.com/cenkalti/backoff/v4 v4.1.0 github.com/crawlab-team/crawlab/db v0.0.0-20240614095218-7b4ee8399ab0 github.com/crawlab-team/crawlab/fs v0.0.0-20240614095218-7b4ee8399ab0 - github.com/crawlab-team/crawlab/grpc v0.0.0-20240614095218-7b4ee8399ab0 + github.com/crawlab-team/crawlab/grpc v0.0.0-20240614111723-e5b20af9a40b github.com/crawlab-team/crawlab/template-parser v0.0.0-20240614095218-7b4ee8399ab0 github.com/crawlab-team/crawlab/trace v0.0.0-20240614095218-7b4ee8399ab0 github.com/crawlab-team/crawlab/vcs v0.0.0-20240614095218-7b4ee8399ab0 diff --git a/core/grpc/server/dependencies_server_v2.go b/core/grpc/server/dependencies_server_v2.go index aca7e0086..b5b15c355 100644 --- a/core/grpc/server/dependencies_server_v2.go +++ b/core/grpc/server/dependencies_server_v2.go @@ -2,29 +2,84 @@ package server import ( "context" - grpc "github.com/crawlab-team/crawlab/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" + "github.com/crawlab-team/crawlab/core/models/models" + "github.com/crawlab-team/crawlab/core/models/service" + "github.com/crawlab-team/crawlab/grpc" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "io" + "sync" ) type DependenciesServerV2 struct { - grpc.UnimplementedDependencyServiceV2Server + grpc.UnimplementedDependenciesServiceV2Server + mu *sync.Mutex + streams map[string]grpc.DependenciesServiceV2_ConnectServer } -func (svr DependenciesServerV2) Connect(stream grpc.DependencyServiceV2_ConnectServer) (err error) { - return status.Errorf(codes.Unimplemented, "method Connect not implemented") +func (svr DependenciesServerV2) Connect(stream grpc.DependenciesServiceV2_ConnectServer) (err error) { + svr.mu.Lock() + defer svr.mu.Unlock() + req, err := stream.Recv() + if err != nil { + return err + } + svr.streams[req.NodeKey] = stream + return nil } func (svr DependenciesServerV2) Sync(ctx context.Context, request *grpc.DependenciesServiceV2SyncRequest) (response *grpc.Response, err error) { - return nil, status.Errorf(codes.Unimplemented, "method Sync not implemented") + n, err := service.NewModelServiceV2[models.NodeV2]().GetOne(bson.M{"key": request.NodeKey}, nil) + if err != nil { + return nil, err + } + var deps []models.DependencyV2 + for _, dep := range request.Dependencies { + deps = append(deps, models.DependencyV2{ + Name: dep.Name, + NodeId: n.Id, + Type: request.Lang, + Version: dep.Version, + }) + } + _, err = service.NewModelServiceV2[models.DependencyV2]().InsertMany(deps) + if err != nil { + return nil, err + } + return nil, nil } -func (svr DependenciesServerV2) Install(stream grpc.DependencyServiceV2_InstallServer) (err error) { - return status.Errorf(codes.Unimplemented, "method Install not implemented") -} - -func (svr DependenciesServerV2) UninstallDependencies(stream grpc.DependencyServiceV2_UninstallDependenciesServer) (err error) { - return status.Errorf(codes.Unimplemented, "method UninstallDependencies not implemented") +func (svr DependenciesServerV2) UpdateTaskLog(stream grpc.DependenciesServiceV2_UpdateTaskLogServer) (err error) { + var t *models.DependencyTaskV2 + for { + req, err := stream.Recv() + if err == io.EOF { + // all messages have been received + return stream.SendAndClose(&grpc.Response{Message: "update task log finished"}) + } + if err != nil { + return err + } + taskId, err := primitive.ObjectIDFromHex(req.TaskId) + if err != nil { + return err + } + if t == nil { + t, err = service.NewModelServiceV2[models.DependencyTaskV2]().GetById(taskId) + if err != nil { + return err + } + } + l := models.DependencyLogV2{ + TaskId: taskId, + Content: req.Content, + } + l.SetCreated(t.CreatedBy) + _, err = service.NewModelServiceV2[models.DependencyLogV2]().InsertOne(l) + if err != nil { + return err + } + } } func NewDependenciesServerV2() *DependenciesServerV2 { diff --git a/core/grpc/server/model_base_service_v2_server.go b/core/grpc/server/model_base_service_v2_server.go index 793856027..17771925d 100644 --- a/core/grpc/server/model_base_service_v2_server.go +++ b/core/grpc/server/model_base_service_v2_server.go @@ -19,7 +19,10 @@ var ( *new(models.TestModel), *new(models.DataCollectionV2), *new(models.DataSourceV2), + *new(models.DependencyV2), + *new(models.DependencyLogV2), *new(models.DependencySettingV2), + *new(models.DependencyTaskV2), *new(models.EnvironmentV2), *new(models.GitV2), *new(models.NodeV2), diff --git a/core/models/models/dependency_log_v2.go b/core/models/models/dependency_log_v2.go new file mode 100644 index 000000000..f0178c2e4 --- /dev/null +++ b/core/models/models/dependency_log_v2.go @@ -0,0 +1,10 @@ +package models + +import "go.mongodb.org/mongo-driver/bson/primitive" + +type DependencyLogV2 struct { + any `collection:"dependency_logs"` + BaseModelV2[DependencyLogV2] `bson:",inline"` + TaskId primitive.ObjectID `json:"task_id" bson:"task_id"` + Content string `json:"content" bson:"content"` +} diff --git a/core/models/models/dependency_task_v2.go b/core/models/models/dependency_task_v2.go new file mode 100644 index 000000000..c9fd40d66 --- /dev/null +++ b/core/models/models/dependency_task_v2.go @@ -0,0 +1,15 @@ +package models + +import "go.mongodb.org/mongo-driver/bson/primitive" + +type DependencyTaskV2 struct { + any `collection:"dependency_tasks"` + BaseModelV2[DependencyTaskV2] `bson:",inline"` + Status string `json:"status" bson:"status"` + Error string `json:"error" bson:"error"` + SettingId primitive.ObjectID `json:"setting_id" bson:"setting_id"` + Type string `json:"type" bson:"type"` + NodeId primitive.ObjectID `json:"node_id" bson:"node_id"` + Action string `json:"action" bson:"action"` + DepNames []string `json:"dep_names" bson:"dep_names"` +} diff --git a/core/models/models/dependency_v2.go b/core/models/models/dependency_v2.go new file mode 100644 index 000000000..4d3811d6f --- /dev/null +++ b/core/models/models/dependency_v2.go @@ -0,0 +1,14 @@ +package models + +import "go.mongodb.org/mongo-driver/bson/primitive" + +type DependencyV2 struct { + any `collection:"dependencies"` + BaseModelV2[DependencyV2] `bson:",inline"` + Name string `json:"name" bson:"name"` + Description string `json:"description" bson:"description"` + NodeId primitive.ObjectID `json:"node_id" bson:"node_id"` + Type string `json:"type" bson:"type"` + LatestVersion string `json:"latest_version" bson:"latest_version"` + Version string `json:"version" bson:"version"` +}