Skip to content

Commit

Permalink
Migration to new state structure
Browse files Browse the repository at this point in the history
The previous state list was dependent on a file path. This had the potential to lead to overwrite and conflicts on file rotation. As the file path is also stored inside the state object this information was duplicated. Now a pure array is stored in the registry which makes the format more flexibel and brings it close to the format used by Logstash.

Changes:
* Not storing an index with paths as this leaded to duplicates
* Introduction of last_seen to differentiate between entries with same bath which show up multiple times. This introduces also the base to clean up the registry file. Currently it is only used in the prospector, no registry yet.
* Registry can now contain multiple entries for a path
* Refactor registrar to use the same state array as prospector does
* Introduce migration check to migrate from old to new state. Make it backward compatible by checking for old structure on startup
* Decouple registrar from prospector
* Update tests to follow new model
  • Loading branch information
ruflin committed May 24, 2016
1 parent 627c05e commit c824301
Show file tree
Hide file tree
Showing 11 changed files with 303 additions and 166 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha2...master[Check the HEAD d
*Topbeat*

*Filebeat*
- The registry format was changed to an array instead of dict. The migration to the new format will happen automatically at the first startup. {pull}1703[1703]

*Winlogbeat*

Expand Down
6 changes: 5 additions & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
}

// Load the previous log file locations now, for use in prospector
fb.registrar.LoadState()
err = fb.registrar.LoadState()
if err != nil {
logp.Err("Error loading state: %v", err)
return err
}

// Init and Start spooler: Harvesters dump events into the spooler.
fb.spooler = NewSpooler(fb.FbConfig.Filebeat, fb.publisherChan)
Expand Down
8 changes: 5 additions & 3 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@ func (c *Crawler) Start(prospectorConfigs []*common.Config, eventChan chan *inpu

logp.Info("Loading Prospectors: %v", len(prospectorConfigs))

// Get existing states
states := *c.Registrar.state

// Prospect the globs/paths given on the command line and launch harvesters
for _, prospectorConfig := range prospectorConfigs {

prospector, err := NewProspector(prospectorConfig, c.Registrar, eventChan)
prospector, err := NewProspector(prospectorConfig, states, eventChan)
if err != nil {
return fmt.Errorf("Error in initing prospector: %s", err)
}
Expand All @@ -60,10 +63,9 @@ func (c *Crawler) Start(prospectorConfigs []*common.Config, eventChan chan *inpu
logp.Debug("crawler", "Starting prospector %v", id)
prospector.Run()
}(i, p)

}

logp.Info("All prospectors are initialised and running with %d states to persist", len(c.Registrar.getState()))
logp.Info("All prospectors are initialised and running with %d states to persist", c.Registrar.state.Count())

return nil
}
Expand Down
11 changes: 3 additions & 8 deletions filebeat/crawler/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ type Prospector struct {
prospectorer Prospectorer
spoolerChan chan *input.FileEvent
harvesterChan chan *input.FileEvent
registrar *Registrar
done chan struct{}
states *input.States
wg sync.WaitGroup
Expand All @@ -28,14 +27,13 @@ type Prospectorer interface {
Run()
}

func NewProspector(cfg *common.Config, registrar *Registrar, spoolerChan chan *input.FileEvent) (*Prospector, error) {
func NewProspector(cfg *common.Config, states input.States, spoolerChan chan *input.FileEvent) (*Prospector, error) {
prospector := &Prospector{
config: defaultConfig,
registrar: registrar,
spoolerChan: spoolerChan,
harvesterChan: make(chan *input.FileEvent),
done: make(chan struct{}),
states: input.NewStates(),
states: &states,
wg: sync.WaitGroup{},
}

Expand All @@ -44,7 +42,6 @@ func NewProspector(cfg *common.Config, registrar *Registrar, spoolerChan chan *i
}

err := prospector.Init()

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -72,15 +69,13 @@ func (p *Prospector) Init() error {
switch p.config.Harvester.InputType {
case cfg.StdinInputType:
prospectorer, err = NewProspectorStdin(p)
prospectorer.Init()
case cfg.LogInputType:
prospectorer, err = NewProspectorLog(p)
prospectorer.Init()

default:
return fmt.Errorf("Invalid prospector type: %v", p.config.Harvester.InputType)
}

prospectorer.Init()
p.prospectorer = prospectorer

return nil
Expand Down
23 changes: 8 additions & 15 deletions filebeat/crawler/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,17 @@ func (p *ProspectorLog) Init() {
logp.Debug("prospector", "exclude_files: %s", p.config.ExcludeFiles)

logp.Info("Load previous states from registry into memory")
fileStates := p.Prospector.states.GetStates()

// Load the initial state from the registry
for path, fileinfo := range p.getFiles() {

// Check for each path found, if there is a previous state
offset := p.Prospector.registrar.fetchState(path, fileinfo)

// Offset found -> skip to previous state
if offset > 0 {
state := input.NewFileState(fileinfo, path)
state.Offset = offset
// Make sure new harvester is started for all states
state.Finished = true
// Prospector must update all states as it has to detect also file rotation
p.Prospector.states.Update(state)
}
// Make sure all states are set as finished
for key, state := range fileStates {
state.Finished = true
fileStates[key] = state
}

// Overwrite prospector states
p.Prospector.states.SetStates(fileStates)

logp.Info("Previous states loaded: %v", p.Prospector.states.Count())
}

Expand Down
2 changes: 1 addition & 1 deletion filebeat/crawler/prospector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

func TestProspectorDefaultConfigs(t *testing.T) {

prospector, err := NewProspector(common.NewConfig(), nil, nil)
prospector, err := NewProspector(common.NewConfig(), *input.NewStates(), nil)
assert.NoError(t, err)

// Default values expected
Expand Down
177 changes: 87 additions & 90 deletions filebeat/crawler/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"path/filepath"
"sync"

"time"

cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/input"
. "github.com/elastic/beats/filebeat/input"
Expand All @@ -17,9 +19,8 @@ import (
type Registrar struct {
Channel chan []*FileEvent
done chan struct{}
registryFile string // Path to the Registry File
state map[string]FileState // Map with all file paths inside and the corresponding state
stateMutex sync.Mutex
registryFile string // Path to the Registry File
state *input.States // Map with all file paths inside and the corresponding state
wg sync.WaitGroup
}

Expand All @@ -28,7 +29,7 @@ func NewRegistrar(registryFile string) (*Registrar, error) {
r := &Registrar{
registryFile: registryFile,
done: make(chan struct{}),
state: map[string]FileState{},
state: input.NewStates(),
Channel: make(chan []*FileEvent, 1),
wg: sync.WaitGroup{},
}
Expand Down Expand Up @@ -63,14 +64,84 @@ func (r *Registrar) Init() error {

// loadState fetches the previous reading state from the configure RegistryFile file
// The default file is `registry` in the data path.
func (r *Registrar) LoadState() {
if existing, e := os.Open(r.registryFile); e == nil {
defer existing.Close()
func (r *Registrar) LoadState() error {

// Check if files exists
_, err := os.Stat(r.registryFile)
if err != nil && !os.IsNotExist(err) {
return err
}

// Error means no file found
if err != nil {
logp.Info("No registry file found under: %s. Creating a new registry file.", r.registryFile)
return nil
}

file, err := os.Open(r.registryFile)
if err != nil {
return err
}

defer file.Close()

logp.Info("Loading registrar data from %s", r.registryFile)

// DEPRECATED: This should be removed in 6.0
oldStates := r.loadAndConvertOldState(file)
if oldStates {
return nil
}

decoder := json.NewDecoder(file)
states := []input.FileState{}
decoder.Decode(&states)

r.state.SetStates(states)
logp.Info("States Loaded from registrar: %+v", len(states))

return nil
}

// loadAndConvertOldState loads the old state file and converts it to the new state
// This is designed so it can be easily removed in later versions
func (r *Registrar) loadAndConvertOldState(file *os.File) bool {
// Make sure file reader is reset afterwards
defer file.Seek(0, 0)

decoder := json.NewDecoder(file)
oldStates := map[string]FileState{}
err := decoder.Decode(&oldStates)

if err != nil {
logp.Debug("registrar", "Error decoding old state: %+v", err)
return false
}

// No old states found -> probably already new format
if oldStates == nil {
return false
}

logp.Info("Loading registrar data from %s", r.registryFile)
decoder := json.NewDecoder(existing)
decoder.Decode(&r.state)
// Convert old states to new states
states := make([]input.FileState, len(oldStates))
logp.Info("Old registry states found: %v", len(oldStates))
counter := 0
for _, state := range oldStates {
// Makes time last_seen time of migration, as this is the best guess
state.LastSeen = time.Now()
states[counter] = state
counter++
}

r.state.SetStates(states)

// Rewrite registry in new format
r.writeRegistry()

logp.Info("Old states converted to new states and written to registrar: %v", len(oldStates))

return true
}

func (r *Registrar) Start() {
Expand Down Expand Up @@ -112,22 +183,17 @@ func (r *Registrar) processEventStates(events []*FileEvent) {
if event.InputType == cfg.StdinInputType {
continue
}

r.setState(event.Source, event.FileState)
r.state.Update(event.FileState)
}
}

// Stop stops the registry. It waits until Run function finished.
func (r *Registrar) Stop() {
logp.Info("Stopping Registrar")
close(r.done)
r.wg.Wait()
}

func (r *Registrar) GetFileState(path string) (FileState, bool) {
state, exist := r.getStateEntry(path)
return state, exist
}

// writeRegistry writes the new json registry file to disk.
func (r *Registrar) writeRegistry() error {
logp.Debug("registrar", "Write registry file: %s", r.registryFile)
Expand All @@ -139,84 +205,15 @@ func (r *Registrar) writeRegistry() error {
return e
}

encoder := json.NewEncoder(file)
states := r.state.GetStates()

state := r.getState()
encoder.Encode(state)
encoder := json.NewEncoder(file)
encoder.Encode(states)

// Directly close file because of windows
file.Close()

logp.Info("Registry file updated. %d states written.", len(state))
logp.Info("Registry file updated. %d states written.", len(states))

return SafeFileRotate(r.registryFile, tempfile)
}

func (r *Registrar) fetchState(filePath string, fileInfo os.FileInfo) int64 {

if previous, err := r.getPreviousFile(filePath, fileInfo); err == nil {

if previous != filePath {
// File has rotated between shutdown and startup
// We return last state downstream, with a modified event source with the new file name
// And return the offset - also force harvest in case the file is old and we're about to skip it
logp.Info("Detected rename of a previously harvested file: %s -> %s", previous, filePath)
}

logp.Info("Previous state for file %s found", filePath)

lastState, _ := r.GetFileState(previous)
return lastState.Offset
}

logp.Info("New file. Start reading from the beginning: %s", filePath)

// New file so just start from the beginning
return 0
}

// getPreviousFile checks in the registrar if there is the newFile already exist with a different name
// In case an old file is found, the path to the file is returned, if not, an error is returned
func (r *Registrar) getPreviousFile(newFilePath string, newFileInfo os.FileInfo) (string, error) {

newState := input.GetOSFileState(newFileInfo)

for oldFilePath, oldState := range r.getState() {

// Compare states
if newState.IsSame(oldState.FileStateOS) {
logp.Info("Old file with new name found: %s -> %s", oldFilePath, newFilePath)
return oldFilePath, nil
}
}

return "", fmt.Errorf("No previous file found")
}

func (r *Registrar) setState(path string, state FileState) {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()

r.state[path] = state
}

func (r *Registrar) getStateEntry(path string) (FileState, bool) {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()

state, exist := r.state[path]
return state, exist
}

func (r *Registrar) getState() map[string]FileState {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()

copy := make(map[string]FileState)

for k, v := range r.state {
copy[k] = v
}

return copy
}
Loading

0 comments on commit c824301

Please sign in to comment.