Skip to content

Commit

Permalink
pref(runner): Batch writing of logs to reduce frequent open and close…
Browse files Browse the repository at this point in the history
… of files
  • Loading branch information
ma-pony committed Apr 5, 2023
1 parent c6356d3 commit 84bf0f0
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 12 deletions.
33 changes: 27 additions & 6 deletions task/handler/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Runner struct {
// log internals
scannerStdout *bufio.Scanner
scannerStderr *bufio.Scanner
logBatchSize int
}

func (r *Runner) Init() (err error) {
Expand Down Expand Up @@ -267,21 +268,40 @@ func (r *Runner) startLogging() {

func (r *Runner) startLoggingReaderStdout() {
utils.LogDebug("begin startLoggingReaderStdout")
var lines []string
for r.scannerStdout.Scan() {
line := r.scannerStdout.Text()
utils.LogDebug(fmt.Sprintf("scannerStdout line: %s", line))
r.writeLogLine(line)
lines = append(lines, line)
if len(lines) >= r.logBatchSize {
r.writeLogLines(lines)
utils.LogDebug(fmt.Sprintf("scannerStdout lines: %s", lines))
lines = []string{}
}
}
if len(lines) > 0 {
r.writeLogLines(lines)
utils.LogDebug(fmt.Sprintf("scannerStdout lines: %s", lines))

}
// reach end
utils.LogDebug("scannerStdout reached end")
}

func (r *Runner) startLoggingReaderStderr() {
utils.LogDebug("begin startLoggingReaderStderr")
var lines []string
for r.scannerStderr.Scan() {
line := r.scannerStderr.Text()
utils.LogDebug(fmt.Sprintf("scannerStderr line: %s", line))
r.writeLogLine(line)
lines = append(lines, line)
if len(lines) >= r.logBatchSize {
r.writeLogLines(lines)
utils.LogDebug(fmt.Sprintf("scannerStderr lines: %s", lines))
lines = []string{}
}
}
if len(lines) > 0 {
r.writeLogLines(lines)
utils.LogDebug(fmt.Sprintf("scannerStderr lines: %s", lines))
}
// reach end
utils.LogDebug("scannerStderr reached end")
Expand Down Expand Up @@ -468,10 +488,10 @@ func (r *Runner) initSub() (err error) {
return nil
}

func (r *Runner) writeLogLine(line string) {
func (r *Runner) writeLogLines(lines []string) {
data, err := json.Marshal(&entity.StreamMessageTaskData{
TaskId: r.tid,
Logs: []string{line},
Logs: lines,
})
if err != nil {
trace.PrintError(err)
Expand Down Expand Up @@ -586,6 +606,7 @@ func NewTaskRunner(id primitive.ObjectID, svc interfaces.TaskHandlerService, opt
svc: svc,
tid: id,
ch: make(chan constants.TaskSignal),
logBatchSize: 20,
}

// apply options
Expand Down
18 changes: 12 additions & 6 deletions task/log/file_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"os"
"path/filepath"
"strings"
"sync"
"time"
)
Expand Down Expand Up @@ -43,12 +44,18 @@ func (d *FileLogDriver) WriteLine(id string, line string) (err error) {

d.mu.Lock()
defer d.mu.Unlock()
filePath := d.getLogFilePath(id, d.logFileName)

f, err := os.OpenFile(d.getLogFilePath(id, d.logFileName), os.O_WRONLY|os.O_APPEND|os.O_CREATE, os.FileMode(0760))
f, err := os.OpenFile(filePath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, os.FileMode(0760))
if err != nil {
return trace.TraceError(err)
}
defer f.Close()
defer func(f *os.File) {
err := f.Close()
if err != nil {
log.Errorf("close file error: %s", err.Error())
}
}(f)

_, err = f.WriteString(line + "\n")
if err != nil {
Expand All @@ -59,10 +66,9 @@ func (d *FileLogDriver) WriteLine(id string, line string) (err error) {
}

func (d *FileLogDriver) WriteLines(id string, lines []string) (err error) {
for _, l := range lines {
if err := d.WriteLine(id, l); err != nil {
return err
}
linesString := strings.Join(lines, "\n")
if err := d.WriteLine(id, linesString); err != nil {
return err
}
return nil
}
Expand Down

0 comments on commit 84bf0f0

Please sign in to comment.