Skip to content

Commit

Permalink
Fix filebeat file rotation registrar issue #1281
Browse files Browse the repository at this point in the history
When multiple files were rotated at the same time, the first rotation was overwriting the states for the other rotated files. This happened because the new file directly had new content inside and the state for all the other files was fetched from disk, resetting the changes which were in memory.

The problem is now fixed by not writing the state to disk every time state changes are discovered by the registrar, as it is up to the prospector to decide if these changes are the most recent ones. Now changes get written to disk every time a scan is done. The downside of this solution is that the registrar is also updated, if no changes did happen.

* System test added to verify that issue is fixed
* Closes #1281
* Refactor some variables for better sync handling and fix tests
* Make offset private and only set it through method
* Rename system tests to close_older as this is what is actually checked
* Update registrar only when file was rotated or changed
* Add test to check state between shutdown and rotation.Only update registry when changes happen.
  • Loading branch information
ruflin committed Apr 18, 2016
1 parent 08204a2 commit 8bbc264
Show file tree
Hide file tree
Showing 13 changed files with 280 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha1...master[Check the HEAD d
*Topbeat*

*Filebeat*
- Improvements in registrar dealing with file rotation. {pull}1281[1281]

*Winlogbeat*

Expand Down
2 changes: 1 addition & 1 deletion filebeat/beater/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func makeEvents(name string, n int) []*input.FileEvent {
DocumentType: "log",
Bytes: 100,
Offset: int64(i),
Source: &name,
Source: name,
}
events = append(events, event)
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (c *Crawler) Start(prospectorConfigs []config.ProspectorConfig, eventChan c
go prospector.Run(&c.wg)
}

logp.Info("All prospectors are initialised and running with %d states to persist", len(c.Registrar.State))
logp.Info("All prospectors are initialised and running with %d states to persist", len(c.Registrar.getStateCopy()))

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/crawler/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ type Prospector struct {
}

type Prospectorer interface {
Run()
Init()
Run()
}

func NewProspector(prospectorConfig cfg.ProspectorConfig, registrar *Registrar, channel chan *input.FileEvent) (*Prospector, error) {
Expand Down
12 changes: 11 additions & 1 deletion filebeat/crawler/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ func (p *ProspectorLog) checkExistingFile(h *harvester.Harvester, newFile *input

// Start a new harvester on the path
h.Start()
p.Prospector.registrar.Persist <- h.GetState()

}

// Keep the old file in missingFiles so we don't rescan it if it was renamed and we've not yet reached the new filename
Expand All @@ -239,6 +241,8 @@ func (p *ProspectorLog) checkExistingFile(h *harvester.Harvester, newFile *input
// Start a harvester on the path; a file was just modified and it doesn't have a harvester
// The offset to continue from will be stored in the harvester channel - so take that to use and also clear the channel
p.resumeHarvesting(h, <-h.Stat.Return)
p.Prospector.registrar.Persist <- h.GetState()

} else {
logp.Debug("prospector", "Not harvesting, file didn't change: %s", h.Path)
}
Expand All @@ -252,14 +256,20 @@ func (p *ProspectorLog) continueExistingFile(h *harvester.Harvester, previousFil

lastinfo := p.prospectorList[previousFile]
h.Stat.Continue(&lastinfo)

// Update state because of file rotation
p.Prospector.registrar.Persist <- h.GetState()
}

// Start / resume harvester with a predefined offset
func (p *ProspectorLog) resumeHarvesting(h *harvester.Harvester, offset int64) {

logp.Debug("prospector", "Start / resuming harvester of file: %s", h.Path)
h.Offset = offset
h.SetOffset(offset)
h.Start()

// Update state because of file rotation
p.Prospector.registrar.Persist <- h.GetState()
}

// Check if the given file was renamed. If file is known but with different path,
Expand Down
62 changes: 49 additions & 13 deletions filebeat/crawler/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"path/filepath"
"sync"

cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/input"
Expand All @@ -17,7 +18,8 @@ type Registrar struct {
// Path to the Registry File
registryFile string
// Map with all file paths inside and the corresponding state
State map[string]*FileState
state map[string]*FileState
stateMutex sync.Mutex
// Channel used by the prospector and crawler to send FileStates to be persisted
Persist chan *input.FileState

Expand All @@ -39,7 +41,7 @@ func NewRegistrar(registryFile string) (*Registrar, error) {
func (r *Registrar) Init() error {
// Init state
r.Persist = make(chan *FileState)
r.State = make(map[string]*FileState)
r.state = map[string]*FileState{}
r.Channel = make(chan []*FileEvent, 1)

// Set to default in case it is not set
Expand All @@ -66,11 +68,13 @@ func (r *Registrar) Init() error {
// loadState fetches the previous reading state from the configure RegistryFile file
// The default file is `registry` in the data path.
func (r *Registrar) LoadState() {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()
if existing, e := os.Open(r.registryFile); e == nil {
defer existing.Close()
logp.Info("Loading registrar data from %s", r.registryFile)
decoder := json.NewDecoder(existing)
decoder.Decode(&r.State)
decoder.Decode(&r.state)
}
}

Expand All @@ -87,8 +91,9 @@ func (r *Registrar) Run() {
return
// Treats new log files to persist with higher priority then new events
case state := <-r.Persist:
r.State[*state.Source] = state
logp.Debug("prospector", "Registrar will re-save state for %s", *state.Source)
source := state.Source
r.setState(source, state)
logp.Debug("prospector", "Registrar will re-save state for %s", source)
case events := <-r.Channel:
r.processEvents(events)
}
Expand All @@ -111,7 +116,7 @@ func (r *Registrar) processEvents(events []*FileEvent) {
continue
}

r.State[*event.Source] = event.GetState()
r.setState(event.Source, event.GetState())
}
}

Expand All @@ -122,7 +127,7 @@ func (r *Registrar) Stop() {
}

func (r *Registrar) GetFileState(path string) (*FileState, bool) {
state, exist := r.State[path]
state, exist := r.getState(path)
return state, exist
}

Expand All @@ -138,12 +143,14 @@ func (r *Registrar) writeRegistry() error {
}

encoder := json.NewEncoder(file)
encoder.Encode(r.State)

state := r.getStateCopy()
encoder.Encode(state)

// Directly close file because of windows
file.Close()

logp.Info("Registry file updated. %d states written.", len(r.State))
logp.Info("Registry file updated. %d states written.", len(state))

return SafeFileRotate(r.registryFile, tempfile)
}
Expand All @@ -157,7 +164,6 @@ func (r *Registrar) fetchState(filePath string, fileInfo os.FileInfo) (int64, bo
logp.Debug("registrar", "Same file as before found. Fetch the state and persist it.")
// We're resuming - throw the last state back downstream so we resave it
// And return the offset - also force harvest in case the file is old and we're about to skip it
r.Persist <- lastState
return lastState.Offset, true
}

Expand All @@ -168,8 +174,7 @@ func (r *Registrar) fetchState(filePath string, fileInfo os.FileInfo) (int64, bo
logp.Info("Detected rename of a previously harvested file: %s -> %s", previous, filePath)

lastState, _ := r.GetFileState(previous)
lastState.Source = &filePath
r.Persist <- lastState
r.updateStateSource(lastState, filePath)
return lastState.Offset, true
}

Expand All @@ -189,7 +194,7 @@ func (r *Registrar) getPreviousFile(newFilePath string, newFileInfo os.FileInfo)

newState := input.GetOSFileState(&newFileInfo)

for oldFilePath, oldState := range r.State {
for oldFilePath, oldState := range r.getStateCopy() {

// Skipping when path the same
if oldFilePath == newFilePath {
Expand All @@ -205,3 +210,34 @@ func (r *Registrar) getPreviousFile(newFilePath string, newFileInfo os.FileInfo)

return "", fmt.Errorf("No previous file found")
}

func (r *Registrar) setState(path string, state *FileState) {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()
r.state[path] = state
}

func (r *Registrar) getState(path string) (*FileState, bool) {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()
state, exist := r.state[path]
return state, exist
}

func (r *Registrar) updateStateSource(state *FileState, path string) {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()
state.Source = path
}

func (r *Registrar) getStateCopy() map[string]FileState {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()

copy := make(map[string]FileState)
for k, v := range r.state {
copy[k] = *v
}

return copy
}
2 changes: 1 addition & 1 deletion filebeat/harvester/filestat.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package harvester

import "os"

// Contains statistic about file when it was last seend by the prospector
// Contains statistic about file when it was last seen by the prospector
type FileStat struct {
Fileinfo os.FileInfo /* the file info */
Return chan int64 /* the harvester will send an event with its offset when it closes */
Expand Down
5 changes: 3 additions & 2 deletions filebeat/harvester/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package harvester
import (
"fmt"
"regexp"
"sync"

"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/encoding"
Expand All @@ -25,7 +26,8 @@ import (
type Harvester struct {
Path string /* the file path to harvest */
Config *config.HarvesterConfig
Offset int64
offset int64
offsetLock sync.Mutex
Stat *FileStat
SpoolerChan chan *input.FileEvent
encoding encoding.EncodingFactory
Expand Down Expand Up @@ -67,6 +69,5 @@ func NewHarvester(

func (h *Harvester) Start() {
// Starts harvester and picks the right type. In case type is not set, set it to defeault (log)

go h.Harvest()
}
2 changes: 1 addition & 1 deletion filebeat/harvester/harvester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestExampleTest(t *testing.T) {

h := Harvester{
Path: "/var/log/",
Offset: 0,
offset: 0,
}

assert.Equal(t, "/var/log/", h.Path)
Expand Down
49 changes: 38 additions & 11 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (h *Harvester) Harvest() {
defer func() {
// On completion, push offset so we can continue where we left off if we relaunch on the same file
if h.Stat != nil {
h.Stat.Return <- h.Offset
h.Stat.Return <- h.GetOffset()
}

logp.Debug("harvester", "Stopping harvester for file: %s", h.Path)
Expand Down Expand Up @@ -123,8 +123,8 @@ func (h *Harvester) Harvest() {

logp.Info("File was truncated. Begin reading file from offset 0: %s", h.Path)

h.Offset = 0
seeker.Seek(h.Offset, os.SEEK_SET)
h.SetOffset(0)
seeker.Seek(h.GetOffset(), os.SEEK_SET)
continue
}

Expand All @@ -136,10 +136,10 @@ func (h *Harvester) Harvest() {
event := &input.FileEvent{
EventMetadata: h.Config.EventMetadata,
ReadTime: ts,
Source: &h.Path,
Source: h.Path,
InputType: h.Config.InputType,
DocumentType: h.Config.DocumentType,
Offset: h.Offset,
Offset: h.GetOffset(),
Bytes: bytesRead,
Text: &text,
Fileinfo: &info,
Expand All @@ -151,7 +151,7 @@ func (h *Harvester) Harvest() {
}

// Set Offset
h.Offset += int64(bytesRead) // Update offset if complete line has been processed
h.SetOffset(h.GetOffset() + int64(bytesRead)) // Update offset if complete line has been processed
}
}

Expand Down Expand Up @@ -237,25 +237,52 @@ func (h *Harvester) openFile() (encoding.Encoding, error) {
func (h *Harvester) initFileOffset(file *os.File) error {
offset, err := file.Seek(0, os.SEEK_CUR)

if h.Offset > 0 {
if h.GetOffset() > 0 {
// continue from last known offset

logp.Debug("harvester",
"harvest: %q position:%d (offset snapshot:%d)", h.Path, h.Offset, offset)
_, err = file.Seek(h.Offset, os.SEEK_SET)
"harvest: %q position:%d (offset snapshot:%d)", h.Path, h.GetOffset(), offset)
_, err = file.Seek(h.GetOffset(), os.SEEK_SET)
} else if h.Config.TailFiles {
// tail file if file is new and tail_files config is set

logp.Debug("harvester",
"harvest: (tailing) %q (offset snapshot:%d)", h.Path, offset)
h.Offset, err = file.Seek(0, os.SEEK_END)
offset, err = file.Seek(0, os.SEEK_END)
h.SetOffset(offset)

} else {
// get offset from file in case of encoding factory was
// required to read some data.
logp.Debug("harvester", "harvest: %q (offset snapshot:%d)", h.Path, offset)
h.Offset = offset
h.SetOffset(offset)
}

return err
}

// GetState returns current state of harvester
func (h *Harvester) GetState() *input.FileState {

state := input.FileState{
Source: h.Path,
Offset: h.GetOffset(),
FileStateOS: input.GetOSFileState(&h.Stat.Fileinfo),
}

return &state
}

func (h *Harvester) SetOffset(offset int64) {
h.offsetLock.Lock()
defer h.offsetLock.Unlock()

h.offset = offset
}

func (h *Harvester) GetOffset() int64 {
h.offsetLock.Lock()
defer h.offsetLock.Unlock()

return h.offset
}
6 changes: 3 additions & 3 deletions filebeat/input/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type File struct {
type FileEvent struct {
common.EventMetadata
ReadTime time.Time
Source *string
Source string
InputType string
DocumentType string
Offset int64
Expand All @@ -32,8 +32,8 @@ type FileEvent struct {
}

type FileState struct {
Source *string `json:"source,omitempty"`
Offset int64 `json:"offset,omitempty"`
Source string `json:"source,omitempty"`
Offset int64 `json:"offset,omitempty"`
FileStateOS *FileStateOS
}

Expand Down
Loading

0 comments on commit 8bbc264

Please sign in to comment.