Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor state to extrac from prospector #1650

Merged
merged 1 commit into from
May 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 16 additions & 67 deletions filebeat/crawler/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@ import (
)

type Prospector struct {
config prospectorConfig
prospectorer Prospectorer
spoolerChan chan *input.FileEvent
harvesterChan chan *input.FileEvent
registrar *Registrar
done chan struct{}
harvesterStates []input.FileState
stateMutex sync.Mutex
wg sync.WaitGroup
config prospectorConfig
prospectorer Prospectorer
spoolerChan chan *input.FileEvent
harvesterChan chan *input.FileEvent
registrar *Registrar
done chan struct{}
states *input.States
wg sync.WaitGroup
}

type Prospectorer interface {
Expand All @@ -31,13 +30,13 @@ type Prospectorer interface {

func NewProspector(cfg *common.Config, registrar *Registrar, spoolerChan chan *input.FileEvent) (*Prospector, error) {
prospector := &Prospector{
config: defaultConfig,
registrar: registrar,
spoolerChan: spoolerChan,
harvesterChan: make(chan *input.FileEvent),
done: make(chan struct{}),
harvesterStates: []input.FileState{},
wg: sync.WaitGroup{},
config: defaultConfig,
registrar: registrar,
spoolerChan: spoolerChan,
harvesterChan: make(chan *input.FileEvent),
done: make(chan struct{}),
states: input.NewStates(),
wg: sync.WaitGroup{},
}

if err := cfg.Unpack(&prospector.config); err != nil {
Expand Down Expand Up @@ -109,7 +108,7 @@ func (p *Prospector) Run() {
logp.Info("Prospector channel stopped")
return
case p.spoolerChan <- event:
p.updateState(event.FileState)
p.states.Update(event.FileState)
}
}
}
Expand All @@ -130,56 +129,6 @@ func (p *Prospector) Run() {
}
}

func (p *Prospector) updateState(newState input.FileState) {

p.stateMutex.Lock()
defer p.stateMutex.Unlock()

index, oldState := p.findPreviousState(newState)

if index >= 0 {
p.harvesterStates[index] = newState
logp.Debug("prospector", "Old state overwritten for %s", oldState.Source)
} else {
// No existing state found, add new one
p.harvesterStates = append(p.harvesterStates, newState)
logp.Debug("prospector", "New state added for %s", newState.Source)
}
}

// findPreviousState returns the previous state fo the file
// In case no previous state exists, index -1 is returned
func (p *Prospector) findPreviousState(newState input.FileState) (int, input.FileState) {

// TODO: This could be made potentially more performance by using an index (harvester id) and only use iteration as fall back
for index, oldState := range p.harvesterStates {
// This is using the FileStateOS for comparison as FileInfo identifiers can only be fetched for existing files
if oldState.FileStateOS.IsSame(newState.FileStateOS) {
return index, oldState
}
}

return -1, input.FileState{}
}

// cleanupState cleans up the internal prospector state after each scan
// Files which reached ignore_older are removed from the state as these states are not needed anymore
func (p *Prospector) cleanupStates() {
p.stateMutex.Lock()
defer p.stateMutex.Unlock()

// Cleanup can only happen after file reaches ignore_older
if p.config.IgnoreOlder != 0 {
for i, state := range p.harvesterStates {
// File is older then ignore_older -> remove state
if p.isIgnoreOlder(state) {
logp.Debug("prospector", "State removed for %s because of ignore_older: %s", state.Source)
p.harvesterStates = append(p.harvesterStates[:i], p.harvesterStates[i+1:]...)
}
}
}
}

func (p *Prospector) Stop(wg *sync.WaitGroup) {
logp.Info("Stopping Prospector")
close(p.done)
Expand Down
14 changes: 7 additions & 7 deletions filebeat/crawler/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,21 @@ func (p *ProspectorLog) Init() {
// Make sure new harvester is started for all states
state.Finished = true
// Prospector must update all states as it has to detect also file rotation
p.Prospector.updateState(state)
p.Prospector.states.Update(state)
}
}

logp.Info("Previous states loaded: %v", len(p.Prospector.harvesterStates))
logp.Info("Previous states loaded: %v", p.Prospector.states.Count())
}

func (p *ProspectorLog) Run() {
logp.Debug("prospector", "Start next scan")

p.scan()
p.Prospector.cleanupStates()
// Only cleanup states if enabled
if p.config.IgnoreOlder != 0 {
p.Prospector.states.Cleanup(p.config.IgnoreOlder)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, by default we never clean up the state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this PR does not change the behaviour. For changing it see #1600

}
}

// getFiles returns all files which have to be harvested
Expand Down Expand Up @@ -115,11 +118,8 @@ func (p *ProspectorLog) scan() {
// Create new state for comparison
newState := input.NewFileState(fileinfo, file)

// TODO: This currently blocks writing updates every time state is fetched. Should be improved for performance
p.Prospector.stateMutex.Lock()
// Load last state
index, lastState := p.Prospector.findPreviousState(newState)
p.Prospector.stateMutex.Unlock()
index, lastState := p.Prospector.states.FindPrevious(newState)

// Decides if previous state exists
if index == -1 {
Expand Down
4 changes: 4 additions & 0 deletions filebeat/crawler/prospector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/libbeat/common"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -40,6 +41,7 @@ func TestProspectorInitInputTypeLog(t *testing.T) {

prospector := Prospector{
config: prospectorConfig,
states: input.NewStates(),
}

err := prospector.Init()
Expand Down Expand Up @@ -91,6 +93,7 @@ func TestProspectorInitInputTypeWrong(t *testing.T) {

prospector := Prospector{
config: prospectorConfig,
states: input.NewStates(),
}

err := prospector.Init()
Expand All @@ -109,6 +112,7 @@ func TestProspectorFileExclude(t *testing.T) {

prospector := Prospector{
config: prospectorConfig,
states: input.NewStates(),
}

prospector.Init()
Expand Down
83 changes: 82 additions & 1 deletion filebeat/input/state.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package input

import "os"
import (
"os"
"sync"
"time"

"github.com/elastic/beats/libbeat/logp"
)

// FileState is used to communicate the reading state of a file
type FileState struct {
Expand All @@ -20,3 +26,78 @@ func NewFileState(fileInfo os.FileInfo, path string) FileState {
FileStateOS: GetOSFileState(fileInfo),
}
}

// States handles list of FileState
type States struct {
states []FileState
mutex sync.Mutex
}

func NewStates() *States {
return &States{
states: []FileState{},
}
}

// Update updates a state. If previous state didn't exist, new one is created
func (s *States) Update(newState FileState) {
s.mutex.Lock()
defer s.mutex.Unlock()

index, oldState := s.findPrevious(newState)

if index >= 0 {
s.states[index] = newState
logp.Debug("prospector", "Old state overwritten for %s", oldState.Source)
} else {
// No existing state found, add new one
s.states = append(s.states, newState)
logp.Debug("prospector", "New state added for %s", newState.Source)
}
}

func (s *States) FindPrevious(newState FileState) (int, FileState) {
// TODO: This currently blocks writing updates every time state is fetched. Should be improved for performance
s.mutex.Lock()
defer s.mutex.Unlock()
return s.findPrevious(newState)
}

// findPreviousState returns the previous state fo the file
// In case no previous state exists, index -1 is returned
func (s *States) findPrevious(newState FileState) (int, FileState) {

// TODO: This could be made potentially more performance by using an index (harvester id) and only use iteration as fall back
for index, oldState := range s.states {
// This is using the FileStateOS for comparison as FileInfo identifiers can only be fetched for existing files
if oldState.FileStateOS.IsSame(newState.FileStateOS) {
return index, oldState
}
}

return -1, FileState{}
}

// Cleanup cleans up the state array. All states which are older then `older` are removed
func (s *States) Cleanup(older time.Duration) {
s.mutex.Lock()
defer s.mutex.Unlock()

for i, state := range s.states {
// File is older then ignore_older -> remove state
modTime := state.Fileinfo.ModTime()

if time.Since(modTime) > older {
logp.Debug("prospector", "State removed for %s because of older: %s", state.Source)
s.states = append(s.states[:i], s.states[i+1:]...)
}
}

}

// Count returns number of states
func (s *States) Count() int {
s.mutex.Lock()
defer s.mutex.Unlock()
return len(s.states)
}