Skip to content

Commit

Permalink
Move state creation to harvester (#1515)
Browse files Browse the repository at this point in the history
The Registrar should only persist the state but should not have to fetch information from the file itself before writing the state. First this is not the responsibility of the registrar to have knowledge about the different harvester implementation, second the information of a file could already have changed when the registrar writes. With this PR the file state is decoupled from the Registrar.

Currently there is duplicated information in FileEvent. This will be cleaned up in a second step to keep the changes as minimal as possible per PR.

Related to #1022
  • Loading branch information
ruflin authored and tsg committed May 2, 2016
1 parent 328c99f commit 19eeeed
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 21 deletions.
17 changes: 9 additions & 8 deletions filebeat/crawler/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Registrar struct {
// Path to the Registry File
registryFile string
// Map with all file paths inside and the corresponding state
state map[string]*FileState
state map[string]FileState
stateMutex sync.Mutex

Channel chan []*FileEvent
Expand All @@ -38,7 +38,7 @@ func NewRegistrar(registryFile string) (*Registrar, error) {

func (r *Registrar) Init() error {
// Init state
r.state = map[string]*FileState{}
r.state = map[string]FileState{}
r.Channel = make(chan []*FileEvent, 1)

// Set to default in case it is not set
Expand Down Expand Up @@ -106,7 +106,7 @@ func (r *Registrar) processEvents(events []*FileEvent) {
continue
}

r.setState(event.Source, event.GetState())
r.setState(event.Source, event.FileState)
}
}

Expand All @@ -116,8 +116,8 @@ func (r *Registrar) Stop() {
// Note: don't block using waitGroup, cause this method is run by async signal handler
}

func (r *Registrar) GetFileState(path string) (*FileState, bool) {
state, exist := r.getState(path)
func (r *Registrar) GetFileState(path string) (FileState, bool) {
state, exist := r.getStateEntry(path)
return state, exist
}

Expand Down Expand Up @@ -200,14 +200,14 @@ func (r *Registrar) getPreviousFile(newFilePath string, newFileInfo os.FileInfo)
return "", fmt.Errorf("No previous file found")
}

func (r *Registrar) setState(path string, state *FileState) {
func (r *Registrar) setState(path string, state FileState) {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()

r.state[path] = state
}

func (r *Registrar) getState(path string) (*FileState, bool) {
func (r *Registrar) getStateEntry(path string) (FileState, bool) {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()

Expand All @@ -220,8 +220,9 @@ func (r *Registrar) getStateCopy() map[string]FileState {
defer r.stateMutex.Unlock()

copy := make(map[string]FileState)

for k, v := range r.state {
copy[k] = *v
copy[k] = v
}

return copy
Expand Down
11 changes: 10 additions & 1 deletion filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (h *Harvester) Harvest() {
// By default the offset is set to 0, means no bytes read. This can be used to report the status
// of a harvester
func (h *Harvester) createEvent() *input.FileEvent {
return &input.FileEvent{
event := &input.FileEvent{
EventMetadata: h.Config.EventMetadata,
Source: h.Path,
InputType: h.Config.InputType,
Expand All @@ -117,6 +117,15 @@ func (h *Harvester) createEvent() *input.FileEvent {
Fileinfo: &h.Stat.Fileinfo,
JSONConfig: h.Config.JSON,
}

if h.Config.InputType != config.StdinInputType {
event.FileState = input.FileState{
Source: h.Path,
Offset: h.getOffset(),
FileStateOS: *input.GetOSFileState(&h.Stat.Fileinfo),
}
}
return event
}

// sendEvent sends event to the spooler channel
Expand Down
13 changes: 1 addition & 12 deletions filebeat/input/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,7 @@ type FileEvent struct {
JSONFields common.MapStr
JSONConfig *config.JSONConfig
Stat *FileStat
}

// GetState builds and returns the FileState object based on the Event info.
func (f *FileEvent) GetState() *FileState {

state := &FileState{
Source: f.Source,
Offset: f.Offset,
FileStateOS: *GetOSFileState(f.Fileinfo),
}

return state
FileState FileState
}

// mergeJSONFields writes the JSON fields in the event map,
Expand Down
6 changes: 6 additions & 0 deletions filebeat/tests/system/test_registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,9 @@ def test_rotating_file_with_shutdown(self):
lambda: self.output_has(lines=1),
max_timeout=10)

# Wait a momemt to make sure registry is completely written
time.sleep(1)

data = self.get_registry()
assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"]

Expand All @@ -326,6 +329,9 @@ def test_rotating_file_with_shutdown(self):
lambda: self.output_has(lines=2),
max_timeout=10)

# Wait a momemt to make sure registry is completely written
time.sleep(1)

data = self.get_registry()

assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"]
Expand Down

0 comments on commit 19eeeed

Please sign in to comment.