Skip to content

Commit

Permalink
fix: task hung for long log content
Browse files Browse the repository at this point in the history
  • Loading branch information
tikazyq committed May 20, 2023
1 parent e5b1fb5 commit 68a112d
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 43 deletions.
56 changes: 19 additions & 37 deletions task/handler/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bufio"
"context"
"encoding/json"
"fmt"
"github.com/apex/log"
"github.com/cenkalti/backoff/v4"
"github.com/crawlab-team/crawlab-core/constants"
Expand All @@ -17,7 +16,6 @@ import (
"github.com/crawlab-team/crawlab-core/models/models"
"github.com/crawlab-team/crawlab-core/sys_exec"
"github.com/crawlab-team/crawlab-core/task/fs"
"github.com/crawlab-team/crawlab-core/utils"
"github.com/crawlab-team/crawlab-db/mongo"
grpc "github.com/crawlab-team/crawlab-grpc"
"github.com/crawlab-team/go-trace"
Expand All @@ -28,6 +26,7 @@ import (
"go.uber.org/dig"
"os"
"os/exec"
"strings"
"time"
)

Expand All @@ -38,6 +37,7 @@ type Runner struct {

// settings
subscribeTimeout time.Duration
bufferSize int

// internals
cmd *exec.Cmd // process command instance
Expand All @@ -53,8 +53,8 @@ type Runner struct {
sub grpc.TaskService_SubscribeClient // grpc task service stream client

// log internals
scannerStdout *bufio.Scanner
scannerStderr *bufio.Scanner
scannerStdout *bufio.Reader
scannerStderr *bufio.Reader
logBatchSize int
}

Expand Down Expand Up @@ -251,11 +251,11 @@ func (r *Runner) configureCmd() {
func (r *Runner) configureLogging() {
// set stdout reader
stdout, _ := r.cmd.StdoutPipe()
r.scannerStdout = bufio.NewScanner(stdout)
r.scannerStdout = bufio.NewReaderSize(stdout, r.bufferSize)

// set stderr reader
stderr, _ := r.cmd.StderrPipe()
r.scannerStderr = bufio.NewScanner(stderr)
r.scannerStderr = bufio.NewReaderSize(stderr, r.bufferSize)
}

func (r *Runner) startLogging() {
Expand All @@ -267,44 +267,25 @@ func (r *Runner) startLogging() {
}

func (r *Runner) startLoggingReaderStdout() {
utils.LogDebug("begin startLoggingReaderStdout")
var lines []string
for r.scannerStdout.Scan() {
line := r.scannerStdout.Text()
lines = append(lines, line)
if len(lines) >= r.logBatchSize {
r.writeLogLines(lines)
utils.LogDebug(fmt.Sprintf("scannerStdout lines: %s", lines))
lines = []string{}
for {
line, err := r.scannerStdout.ReadString(byte('\n'))
if err != nil {
break
}
line = strings.TrimSuffix(line, "\n")
r.writeLogLines([]string{line})
}
if len(lines) > 0 {
r.writeLogLines(lines)
utils.LogDebug(fmt.Sprintf("scannerStdout lines: %s", lines))

}
// reach end
utils.LogDebug("scannerStdout reached end")
}

func (r *Runner) startLoggingReaderStderr() {
utils.LogDebug("begin startLoggingReaderStderr")
var lines []string
for r.scannerStderr.Scan() {
line := r.scannerStderr.Text()
lines = append(lines, line)
if len(lines) >= r.logBatchSize {
r.writeLogLines(lines)
utils.LogDebug(fmt.Sprintf("scannerStderr lines: %s", lines))
lines = []string{}
for {
line, err := r.scannerStderr.ReadString(byte('\n'))
if err != nil {
break
}
line = strings.TrimSuffix(line, "\n")
r.writeLogLines([]string{line})
}
if len(lines) > 0 {
r.writeLogLines(lines)
utils.LogDebug(fmt.Sprintf("scannerStderr lines: %s", lines))
}
// reach end
utils.LogDebug("scannerStderr reached end")
}

func (r *Runner) startHealthCheck() {
Expand Down Expand Up @@ -603,6 +584,7 @@ func NewTaskRunner(id primitive.ObjectID, svc interfaces.TaskHandlerService, opt
// runner
r := &Runner{
subscribeTimeout: 30 * time.Second,
bufferSize: 1024 * 1024,
svc: svc,
tid: id,
ch: make(chan constants.TaskSignal),
Expand Down
14 changes: 8 additions & 6 deletions task/log/file_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,16 @@ func (d *FileLogDriver) Find(id string, pattern string, skip int, limit int) (li
}
defer f.Close()

sc := bufio.NewScanner(f)
sc := bufio.NewReaderSize(f, 1024*1024*10)

i := -1
for sc.Scan() {
for {
line, err := sc.ReadString(byte('\n'))
if err != nil {
break
}
line = strings.TrimSuffix(line, "\n")

i++

if i < skip {
Expand All @@ -102,12 +108,8 @@ func (d *FileLogDriver) Find(id string, pattern string, skip int, limit int) (li
break
}

line := sc.Text()
lines = append(lines, line)
}
if err := sc.Err(); err != nil {
return nil, trace.TraceError(err)
}

return lines, nil
}
Expand Down

0 comments on commit 68a112d

Please sign in to comment.