Skip to content

Commit

Permalink
迁移日志到MongoDB
Browse files Browse the repository at this point in the history
  • Loading branch information
tikazyq committed Apr 1, 2020
1 parent a9f6967 commit 63c95f8
Show file tree
Hide file tree
Showing 10 changed files with 222 additions and 77 deletions.
29 changes: 9 additions & 20 deletions backend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,39 +34,28 @@ func main() {
panic(err)
}
log.Info("initialized config successfully")

// 初始化日志设置
logLevel := viper.GetString("log.level")
if logLevel != "" {
log.SetLevelFromString(logLevel)
}
log.Info("initialized log config successfully")
if viper.GetString("log.isDeletePeriodically") == "Y" {
err := services.InitDeleteLogPeriodically()
if err != nil {
log.Error("init DeletePeriodically failed")
panic(err)
}
log.Info("initialized periodically cleaning log successfully")
} else {
log.Info("periodically cleaning log is switched off")
}

// 初始化Mongodb数据库
if err := database.InitMongo(); err != nil {
log.Error("init mongodb error:" + err.Error())
debug.PrintStack()
panic(err)
}
log.Info("initialized MongoDB successfully")
log.Info("initialized mongodb successfully")

// 初始化Redis数据库
if err := database.InitRedis(); err != nil {
log.Error("init redis error:" + err.Error())
debug.PrintStack()
panic(err)
}
log.Info("initialized Redis successfully")
log.Info("initialized redis successfully")

// 初始化日志设置
if err := services.InitLogService(); err != nil {
log.Error("init log error:" + err.Error())
panic(err)
}
log.Info("initialized log successfully")

if model.IsMaster() {
// 初始化定时任务
Expand Down
35 changes: 35 additions & 0 deletions backend/model/log.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
package model

import (
"crawlab/database"
"crawlab/utils"
"github.com/apex/log"
"github.com/globalsign/mgo/bson"
"os"
"runtime/debug"
"time"
)

type LogItem struct {
Id bson.ObjectId `json:"_id" bson:"_id"`
Message string `json:"msg" bson:"msg"`
TaskId string `json:"task_id" bson:"task_id"`
IsError bool `json:"is_error" bson:"is_error"`
Ts time.Time `json:"ts" bson:"ts"`
}

// 获取本地日志
func GetLocalLog(logPath string) (fileBytes []byte, err error) {

Expand Down Expand Up @@ -42,3 +53,27 @@ func GetLocalLog(logPath string) (fileBytes []byte, err error) {
logBuf = logBuf[:n]
return logBuf, nil
}

func AddLogItem(l LogItem) error {
s, c := database.GetCol("logs")
defer s.Close()
if err := c.Insert(l); err != nil {
log.Errorf("insert log error: " + err.Error())
debug.PrintStack()
return err
}
return nil
}

func GetLogItemList(filter interface{}, skip int, limit int, sortStr string) ([]LogItem, error) {
s, c := database.GetCol("logs")
defer s.Close()

var logItems []LogItem
if err := c.Find(filter).Skip(skip).Limit(limit).Sort(sortStr).All(&logItems); err != nil {
debug.PrintStack()
return logItems, err
}

return logItems, nil
}
13 changes: 13 additions & 0 deletions backend/model/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,19 @@ func (t *Task) GetResults(pageNum int, pageSize int) (results []interface{}, tot
return
}

func (t *Task) GetLogItems() (logItems []LogItem, err error) {
query := bson.M{
"task_id": t.Id,
}

logItems, err = GetLogItemList(query, 0, constants.Infinite, "+_id")
if err != nil {
return logItems, err
}

return logItems, nil
}

func GetTaskList(filter interface{}, skip int, limit int, sortKey string) ([]Task, error) {
s, c := database.GetCol("tasks")
defer s.Close()
Expand Down
4 changes: 2 additions & 2 deletions backend/routes/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,12 @@ func DeleteTask(c *gin.Context) {

func GetTaskLog(c *gin.Context) {
id := c.Param("id")
logStr, err := services.GetTaskLog(id)
logItems, err := services.GetTaskLog(id)
if err != nil {
HandleError(http.StatusInternalServerError, c, err)
return
}
HandleSuccessData(c, logStr)
HandleSuccessData(c, logItems)
}

func GetTaskResults(c *gin.Context) {
Expand Down
38 changes: 38 additions & 0 deletions backend/services/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"crawlab/utils"
"encoding/json"
"github.com/apex/log"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"github.com/spf13/viper"
"io/ioutil"
Expand Down Expand Up @@ -162,5 +163,42 @@ func InitDeleteLogPeriodically() error {

c.Start()
return nil
}

func InitLogIndexes() error {
s, c := database.GetCol("logs")
defer s.Close()

_ = c.EnsureIndexKey("task_id")
_ = c.EnsureIndex(mgo.Index{
Key: []string{"$text:msg"},
})

return nil
}

func InitLogService() error {
logLevel := viper.GetString("log.level")
if logLevel != "" {
log.SetLevelFromString(logLevel)
}
log.Info("initialized log config successfully")
if viper.GetString("log.isDeletePeriodically") == "Y" {
if err := InitDeleteLogPeriodically(); err != nil {
log.Error("init DeletePeriodically failed")
return err
}
log.Info("initialized periodically cleaning log successfully")
} else {
log.Info("periodically cleaning log is switched off")
}

if model.IsMaster() {
if err := InitLogIndexes(); err != nil {
log.Errorf(err.Error())
return err
}
}

return nil
}
158 changes: 110 additions & 48 deletions backend/services/task.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package services

import (
"bufio"
"crawlab/constants"
"crawlab/database"
"crawlab/entity"
Expand Down Expand Up @@ -160,15 +161,70 @@ func SetEnv(cmd *exec.Cmd, envs []model.Env, task model.Task, spider model.Spide
return cmd
}

func SetLogConfig(cmd *exec.Cmd, path string) error {
fLog, err := os.Create(path)
func SetLogConfig(cmd *exec.Cmd, t model.Task) error {
//fLog, err := os.Create(path)
//if err != nil {
// log.Errorf("create task log file error: %s", path)
// debug.PrintStack()
// return err
//}
//cmd.Stdout = fLog
//cmd.Stderr = fLog

// get stdout reader
stdout, err := cmd.StdoutPipe()
readerStdout := bufio.NewReader(stdout)
if err != nil {
log.Errorf("create task log file error: %s", path)
log.Errorf("get stdout error: %s", err.Error())
debug.PrintStack()
return err
}
cmd.Stdout = fLog
cmd.Stderr = fLog

// get stderr reader
stderr, err := cmd.StderrPipe()
readerStderr := bufio.NewReader(stderr)
if err != nil {
log.Errorf("get stdout error: %s", err.Error())
debug.PrintStack()
return err
}

// read stdout
go func() {
for {
line, err := readerStdout.ReadString('\n')
if err != nil {
break
}
line = strings.Replace(line, "\n", "", -1)
_ = model.AddLogItem(model.LogItem{
Id: bson.NewObjectId(),
Message: line,
TaskId: t.Id,
IsError: false,
Ts: time.Now(),
})
}
}()

// read stderr
go func() {
for {
line, err := readerStderr.ReadString('\n')
line = strings.Replace(line, "\n", "", -1)
if err != nil {
break
}
_ = model.AddLogItem(model.LogItem{
Id: bson.NewObjectId(),
Message: line,
TaskId: t.Id,
IsError: true,
Ts: time.Now(),
})
}
}()

return nil
}

Expand Down Expand Up @@ -260,7 +316,7 @@ func ExecuteShellCmd(cmdStr string, cwd string, t model.Task, s model.Spider) (e
cmd.Dir = cwd

// 日志配置
if err := SetLogConfig(cmd, t.LogPath); err != nil {
if err := SetLogConfig(cmd, t); err != nil {
return err
}

Expand Down Expand Up @@ -566,54 +622,60 @@ func SpiderFileCheck(t model.Task, spider model.Spider) error {
return nil
}

func GetTaskLog(id string) (logStr string, err error) {
func GetTaskLog(id string) (logItems []model.LogItem, err error) {
task, err := model.GetTask(id)

if err != nil {
return
}

if IsMasterNode(task.NodeId.Hex()) {
if !utils.Exists(task.LogPath) {
fileDir, err := MakeLogDir(task)

if err != nil {
log.Errorf(err.Error())
}

fileP := GetLogFilePaths(fileDir, task)

// 获取日志文件路径
fLog, err := os.Create(fileP)
defer fLog.Close()
if err != nil {
log.Errorf("create task log file error: %s", fileP)
debug.PrintStack()
}
task.LogPath = fileP
if err := task.Save(); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}

}
// 若为主节点,获取本机日志
logBytes, err := model.GetLocalLog(task.LogPath)
if err != nil {
log.Errorf(err.Error())
logStr = err.Error()
} else {
logStr = utils.BytesToString(logBytes)
}
return logStr, err
}
// 若不为主节点,获取远端日志
logStr, err = GetRemoteLog(task)
logItems, err = task.GetLogItems()
if err != nil {
log.Errorf(err.Error())

}
return logStr, err
return logItems, err
}

return logItems, nil

//if IsMasterNode(task.NodeId.Hex()) {
// if !utils.Exists(task.LogPath) {
// fileDir, err := MakeLogDir(task)
//
// if err != nil {
// log.Errorf(err.Error())
// }
//
// fileP := GetLogFilePaths(fileDir, task)
//
// // 获取日志文件路径
// fLog, err := os.Create(fileP)
// defer fLog.Close()
// if err != nil {
// log.Errorf("create task log file error: %s", fileP)
// debug.PrintStack()
// }
// task.LogPath = fileP
// if err := task.Save(); err != nil {
// log.Errorf(err.Error())
// debug.PrintStack()
// }
//
// }
// // 若为主节点,获取本机日志
// logBytes, err := model.GetLocalLog(task.LogPath)
// if err != nil {
// log.Errorf(err.Error())
// logStr = err.Error()
// } else {
// logStr = utils.BytesToString(logBytes)
// }
// return logStr, err
//}
//// 若不为主节点,获取远端日志
//logStr, err = GetRemoteLog(task)
//if err != nil {
// log.Errorf(err.Error())
//
//}
//return logStr, err
}

func CancelTask(id string) (err error) {
Expand Down
4 changes: 2 additions & 2 deletions devops/master/mongo-pv.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ metadata:
spec:
storageClassName: manual
capacity:
storage: 10Gi
storage: 3Gi
accessModes:
- ReadWriteOnce
hostPath:
Expand All @@ -25,4 +25,4 @@ spec:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
storage: 3Gi
Loading

0 comments on commit 63c95f8

Please sign in to comment.