Skip to content

Commit

Permalink
Merge pull request #1200 from PapaPiya/fix_lag
Browse files Browse the repository at this point in the history
[PDR-16094][fix(lag)]: dirx/dir模式下lag显示不正确
  • Loading branch information
redHJ authored Jan 12, 2022
2 parents 6976061 + 698c81b commit 01200fd
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 40 deletions.
12 changes: 12 additions & 0 deletions reader/dirx/dir_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,18 @@ func (drs *dirReaders) SyncMeta() ([]byte, error) {
return data, nil
}

func (drs *dirReaders) Lag() (rl *LagInfo, err error) {
rl = &LagInfo{Size: 0, SizeUnit: "bytes"}
for _, dr := range drs.getReaders() {
drLag, err := dr.br.Lag()
if err != nil {
return nil, err
}
rl.Size += drLag.Size
}
return rl, nil
}

func (drs *dirReaders) Close() {
var wg sync.WaitGroup
for _, dr := range drs.getReaders() {
Expand Down
4 changes: 4 additions & 0 deletions reader/dirx/dirx.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,10 @@ func (r *Reader) ReadLine() (string, error) {
}
}

func (r *Reader) Lag() (rl *LagInfo, err error) {
return r.dirReaders.Lag()
}

func (r *Reader) Status() StatsInfo {
r.statsLock.RLock()
defer r.statsLock.RUnlock()
Expand Down
74 changes: 34 additions & 40 deletions reader/seqfile/seqfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// SeqFile 按最终修改时间依次读取文件的Reader类型
type SeqFile struct {
meta *reader.Meta
mux sync.Mutex
mux *sync.RWMutex

name string
dir string // 文件目录
Expand Down Expand Up @@ -109,7 +109,7 @@ func NewSeqFile(meta *reader.Meta, path string, ignoreHidden, newFileNewLine boo
ignoreFileSuffix: suffixes,
ignoreHidden: ignoreHidden,
validFilePattern: validFileRegex,
mux: sync.Mutex{},
mux: new(sync.RWMutex),
newFileAsNewLine: newFileNewLine,
meta: meta,
inodeOffset: make(map[string]int64),
Expand Down Expand Up @@ -499,13 +499,7 @@ func (sf *SeqFile) getNextFileCondition() (condition func(os.FileInfo) bool, err
if len(sf.inodeOffset) < 1 {
return true
}
var key string
if sf.inodeSensitive {
key = reader.JoinFileInode(f.Name(), strconv.FormatUint(inode, 10))
} else {
key = filepath.Base(f.Name())
}
offset, ok := sf.inodeOffset[key]
offset, ok := sf.inodeOffset[getInodeKey(f.Name(), inode, sf.inodeSensitive)]
return !ok || (sf.ReadSameInode && offset != -1 && f.Size() != offset)
}

Expand Down Expand Up @@ -624,13 +618,7 @@ func (sf *SeqFile) open(fi os.FileInfo) (err error) {
if sf.inodeOffset == nil {
sf.inodeOffset = make(map[string]int64)
}
var key string
if sf.inodeSensitive {
key = reader.JoinFileInode(doneFile, strconv.FormatUint(doneFileInode, 10))
} else {
key = filepath.Base(doneFile)
}
sf.inodeOffset[key] = doneFileOffset
sf.inodeOffset[getInodeKey(doneFile, doneFileInode, sf.inodeSensitive)] = doneFileOffset
tryTime := 0
for {
err := sf.meta.SyncDoneFileInode(sf.inodeOffset)
Expand Down Expand Up @@ -682,19 +670,8 @@ func (sf *SeqFile) SyncMeta() (err error) {

func (sf *SeqFile) Lag() (rl *LagInfo, err error) {
sf.mux.Lock()
rl = &LagInfo{Size: -sf.offset, SizeUnit: "bytes"}
logReading := filepath.Base(sf.currFile)
sf.mux.Unlock()

inode, err := utilsos.GetIdentifyIDByPath(sf.currFile)
if os.IsNotExist(err) || (inode != 0 && inode != sf.inode) {
rl.Size = 0
err = nil
}
if err != nil {
rl.Size = 0
return rl, err
}
defer sf.mux.Unlock()
rl = &LagInfo{Size: 0, SizeUnit: "bytes"}

logs, err := ReadDirByTime(sf.dir)
if err != nil {
Expand All @@ -705,13 +682,28 @@ func (sf *SeqFile) Lag() (rl *LagInfo, err error) {
if l.IsDir() {
continue
}
if condition == nil || !condition(l) {

inode, err := utilsos.GetIdentifyIDByPath(filepath.Join(sf.dir, l.Name()))
if os.IsNotExist(err) || inode == 0 {
continue
}
rl.Size += l.Size()
if l.Name() == logReading {
break
if err != nil {
rl.Size = 0
return rl, err
}

if filepath.Base(sf.currFile) == l.Name() {
if sf.inodeSensitive && sf.inode != inode {
rl.Size += l.Size()
} else {
rl.Size += l.Size() - sf.offset
}
continue
}
if condition == nil || !condition(l) {
continue
}
rl.Size += l.Size() - sf.inodeOffset[getInodeKey(l.Name(), inode, sf.inodeSensitive)]
}

return rl, nil
Expand Down Expand Up @@ -750,13 +742,7 @@ func (sf *SeqFile) getOffset(f *os.File, offset int64, seek bool) int64 {
log.Errorf("Runner[%s] NewSeqFile get file %s inode error %v, ignore...", sf.meta.RunnerName, fileName, err)
return offset
}
var key string
if sf.inodeSensitive {
key = reader.JoinFileInode(fileName, strconv.FormatUint(inode, 10))
} else {
key = filepath.Base(fileName)
}
offset = sf.inodeOffset[key]
offset = sf.inodeOffset[getInodeKey(fileName, inode, sf.inodeSensitive)]
if fileInfo.Size() < offset {
offset = 0
}
Expand All @@ -769,6 +755,14 @@ func (sf *SeqFile) getOffset(f *os.File, offset int64, seek bool) int64 {
return offset
}

func getInodeKey(name string, inode uint64, inodeSensitive bool) string {
if inodeSensitive {
return reader.JoinFileInode(name, strconv.FormatUint(inode, 10))
} else {
return filepath.Base(name)
}
}

var (
_ LineSkipper = new(SeqFile)
_ reader.NewSourceRecorder = new(SeqFile)
Expand Down

0 comments on commit 01200fd

Please sign in to comment.