Skip to content

Commit

Permalink
feat: support notification for node
Browse files Browse the repository at this point in the history
  • Loading branch information
tikazyq committed Jul 24, 2024
1 parent 923921c commit 7b1fa48
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 31 deletions.
11 changes: 11 additions & 0 deletions core/grpc/server/node_server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/crawlab-team/crawlab/core/models/models/v2"
"github.com/crawlab-team/crawlab/core/models/service"
nodeconfig "github.com/crawlab-team/crawlab/core/node/config"
"github.com/crawlab-team/crawlab/core/notification"
"github.com/crawlab-team/crawlab/core/utils"
"github.com/crawlab-team/crawlab/grpc"
errors2 "github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
Expand Down Expand Up @@ -93,6 +95,7 @@ func (svr NodeServerV2) SendHeartbeat(_ context.Context, req *grpc.NodeServiceSe
}
return HandleError(err)
}
oldStatus := node.Status

// validate status
if node.Status == constants.NodeStatusUnregistered {
Expand All @@ -107,6 +110,14 @@ func (svr NodeServerV2) SendHeartbeat(_ context.Context, req *grpc.NodeServiceSe
if err != nil {
return HandleError(err)
}
newStatus := node.Status

// send notification if status changed
if utils.IsPro() {
if oldStatus != newStatus {
go notification.GetNotificationServiceV2().SendNodeNotification(node)
}
}

return HandleSuccessWithData(node)
}
Expand Down
4 changes: 4 additions & 0 deletions core/grpc/server/task_server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ func (svr TaskServerV2) Fetch(ctx context.Context, request *grpc.Request) (respo
}

func (svr TaskServerV2) SendNotification(_ context.Context, request *grpc.TaskServiceSendNotificationRequest) (response *grpc.Response, err error) {
if !utils.IsPro() {
return nil, nil
}

// task id
taskId, err := primitive.ObjectIDFromHex(request.TaskId)
if err != nil {
Expand Down
28 changes: 22 additions & 6 deletions core/node/service/master_service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
models2 "github.com/crawlab-team/crawlab/core/models/models/v2"
"github.com/crawlab-team/crawlab/core/models/service"
"github.com/crawlab-team/crawlab/core/node/config"
"github.com/crawlab-team/crawlab/core/notification"
"github.com/crawlab-team/crawlab/core/schedule"
"github.com/crawlab-team/crawlab/core/system"
"github.com/crawlab-team/crawlab/core/task/handler"
Expand Down Expand Up @@ -207,31 +208,27 @@ func (svc *MasterServiceV2) monitor() (err error) {
wg.Add(len(workerNodes))
for _, n := range workerNodes {
go func(n *models2.NodeV2) {
defer wg.Done()

// subscribe
ok := svc.subscribeNode(n)
if !ok {
go svc.setWorkerNodeOffline(n)
wg.Done()
return
}

// ping client
ok = svc.pingNodeClient(n)
if !ok {
go svc.setWorkerNodeOffline(n)
wg.Done()
return
}

// update node available runners
if err := svc.updateNodeAvailableRunners(n); err != nil {
trace.PrintError(err)
wg.Done()
return
}

// done
wg.Done()
}(&n)
}

Expand Down Expand Up @@ -261,13 +258,24 @@ func (svc *MasterServiceV2) updateMasterNodeStatus() (err error) {
if err != nil {
return err
}
oldStatus := node.Status

node.Status = constants.NodeStatusOnline
node.Active = true
node.ActiveAt = time.Now()
newStatus := node.Status

err = service.NewModelServiceV2[models2.NodeV2]().ReplaceById(node.Id, *node)
if err != nil {
return err
}

if utils.IsPro() {
if oldStatus != newStatus {
go svc.sendNotification(node)
}
}

return nil
}

Expand All @@ -280,6 +288,7 @@ func (svc *MasterServiceV2) setWorkerNodeOffline(node *models2.NodeV2) {
if err != nil {
trace.PrintError(err)
}
svc.sendNotification(node)
}

func (svc *MasterServiceV2) subscribeNode(n *models2.NodeV2) (ok bool) {
Expand Down Expand Up @@ -316,6 +325,13 @@ func (svc *MasterServiceV2) updateNodeAvailableRunners(node *models2.NodeV2) (er
return nil
}

func (svc *MasterServiceV2) sendNotification(node *models2.NodeV2) {
if !utils.IsPro() {
return
}
go notification.GetNotificationServiceV2().SendNodeNotification(node)
}

func newMasterServiceV2() (res *MasterServiceV2, err error) {
// master service
svc := &MasterServiceV2{
Expand Down
2 changes: 1 addition & 1 deletion core/notification/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package notification

import "github.com/crawlab-team/crawlab/core/models/models/v2"

type VariableDataTask struct {
type VariableData struct {
Task *models.TaskV2 `json:"task"`
TaskStat *models.TaskStatV2 `json:"task_stat"`
Spider *models.SpiderV2 `json:"spider"`
Expand Down
78 changes: 54 additions & 24 deletions core/notification/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"github.com/crawlab-team/crawlab/core/entity"
"github.com/crawlab-team/crawlab/core/models/models/v2"
"github.com/crawlab-team/crawlab/core/models/service"
"github.com/crawlab-team/crawlab/trace"
"github.com/gomarkdown/markdown"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"regexp"
"strings"
Expand Down Expand Up @@ -64,35 +66,28 @@ func (svc *ServiceV2) SendIM(s *models.NotificationSettingV2, ch *models.Notific
}

func (svc *ServiceV2) getContent(s *models.NotificationSettingV2, ch *models.NotificationChannelV2, args ...any) (content string) {
switch s.TriggerTarget {
case constants.NotificationTriggerTargetTask:
vd := svc.getTaskVariableData(args...)
switch s.TemplateMode {
case constants.NotificationTemplateModeMarkdown:
variables := svc.parseTemplateVariables(s.TemplateMarkdown)
content = svc.getTaskContent(s.TemplateMarkdown, variables, vd)
if ch.Type == TypeMail {
content = svc.convertMarkdownToHtml(content)
}
return content
case constants.NotificationTemplateModeRichText:
template := s.TemplateRichText
if ch.Type == TypeIM {
template = s.TemplateMarkdown
}
variables := svc.parseTemplateVariables(template)
return svc.getTaskContent(template, variables, vd)
vd := svc.getVariableData(args...)
switch s.TemplateMode {
case constants.NotificationTemplateModeMarkdown:
variables := svc.parseTemplateVariables(s.TemplateMarkdown)
content = svc.geContentWithVariables(s.TemplateMarkdown, variables, vd)
if ch.Type == TypeMail {
content = svc.convertMarkdownToHtml(content)
}

case constants.NotificationTriggerTargetNode:
// TODO: implement

return content
case constants.NotificationTemplateModeRichText:
template := s.TemplateRichText
if ch.Type == TypeIM {
template = s.TemplateMarkdown
}
variables := svc.parseTemplateVariables(template)
return svc.geContentWithVariables(template, variables, vd)
}

return content
}

func (svc *ServiceV2) getTaskContent(template string, variables []entity.NotificationVariable, vd VariableDataTask) (content string) {
func (svc *ServiceV2) geContentWithVariables(template string, variables []entity.NotificationVariable, vd VariableData) (content string) {
content = template
for _, v := range variables {
switch v.Category {
Expand Down Expand Up @@ -192,6 +187,8 @@ func (svc *ServiceV2) getTaskContent(template string, variables []entity.Notific
content = strings.ReplaceAll(content, v.GetKey(), vd.Node.Key)
case "name":
content = strings.ReplaceAll(content, v.GetKey(), vd.Node.Name)
case "is_master":
content = strings.ReplaceAll(content, v.GetKey(), fmt.Sprintf("%t", vd.Node.IsMaster))
case "ip":
content = strings.ReplaceAll(content, v.GetKey(), vd.Node.Ip)
case "mac":
Expand Down Expand Up @@ -260,7 +257,7 @@ func (svc *ServiceV2) getTaskContent(template string, variables []entity.Notific
return content
}

func (svc *ServiceV2) getTaskVariableData(args ...any) (vd VariableDataTask) {
func (svc *ServiceV2) getVariableData(args ...any) (vd VariableData) {
for _, arg := range args {
switch arg.(type) {
case *models.TaskV2:
Expand Down Expand Up @@ -331,6 +328,39 @@ func (svc *ServiceV2) convertMarkdownToHtml(content string) (html string) {
return string(markdown.ToHTML([]byte(content), nil, nil))
}

func (svc *ServiceV2) SendNodeNotification(node *models.NodeV2) {
// arguments
var args []any
args = append(args, node)

// settings
settings, err := service.NewModelServiceV2[models.NotificationSettingV2]().GetMany(bson.M{
"enabled": true,
"trigger_target": constants.NotificationTriggerTargetNode,
}, nil)
if err != nil {
log.Errorf("get notification settings error: %v", err)
trace.PrintError(err)
return
}

for _, s := range settings {
// send notification
switch s.Trigger {
case constants.NotificationTriggerNodeStatusChange:
go svc.Send(&s, args...)
case constants.NotificationTriggerNodeOnline:
if node.Status == constants.NodeStatusOnline {
go svc.Send(&s, args...)
}
case constants.NotificationTriggerNodeOffline:
if node.Status == constants.NodeStatusOffline {
go svc.Send(&s, args...)
}
}
}
}

func newNotificationServiceV2() *ServiceV2 {
return &ServiceV2{}
}
Expand Down

0 comments on commit 7b1fa48

Please sign in to comment.