Skip to content

Commit

Permalink
[chore][pkg/stanza] Add file disambiguation test
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Feb 9, 2024
1 parent 2047ee9 commit fd884ec
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 36 deletions.
92 changes: 92 additions & 0 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1318,3 +1318,95 @@ func TestWindowsFilesClosedImmediately(t *testing.T) {
// On Windows, poll should close the file after reading it. We can test this by trying to move it.
require.NoError(t, os.Rename(temp.Name(), temp.Name()+"_renamed"))
}

func TestDelayedDisambiguation(t *testing.T) {
t.Parallel()
tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.FingerprintSize = 18
cfg.StartAt = "beginning"
operator, sink := testManager(t, cfg)
operator.persister = testutil.NewMockPersister("test")

// Two identical files, smaller than fingerprint size
file1 := filetest.OpenTempWithPattern(t, tempDir, "*.log1")
file2 := filetest.OpenTempWithPattern(t, tempDir, "*.log2")

sameContent := "aaaaaaaaaaa"
filetest.WriteString(t, file1, sameContent+"\n")
filetest.WriteString(t, file2, sameContent+"\n")
operator.poll(context.Background())

token, attributes := sink.NextCall(t)
require.Equal(t, []byte(sameContent), token)
sink.ExpectNoCallsUntil(t, 100*time.Millisecond)
operator.wg.Wait()

// Append different data
newContent1 := "more content in file 1 only"
newContent2 := "different content in file 2"
filetest.WriteString(t, file1, newContent1+"\n")
filetest.WriteString(t, file2, newContent2+"\n")
operator.poll(context.Background())

var sameTokenOtherFile emittest.Call
if attributes[attrs.LogFileName].(string) == filepath.Base(file1.Name()) {
sameTokenOtherFile = emittest.Call{Token: []byte(sameContent), Attrs: map[string]any{attrs.LogFileName: filepath.Base(file2.Name())}}
} else {
sameTokenOtherFile = emittest.Call{Token: []byte(sameContent), Attrs: map[string]any{attrs.LogFileName: filepath.Base(file1.Name())}}
}
newFromFile1 := emittest.Call{Token: []byte(newContent1), Attrs: map[string]any{attrs.LogFileName: filepath.Base(file1.Name())}}
newFromFile2 := emittest.Call{Token: []byte(newContent2), Attrs: map[string]any{attrs.LogFileName: filepath.Base(file2.Name())}}
sink.ExpectCalls(t, &sameTokenOtherFile, &newFromFile1, &newFromFile2)
}

func TestNoLostPartial(t *testing.T) {
t.Parallel()
tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.FingerprintSize = 18
cfg.StartAt = "beginning"
operator, sink := testManager(t, cfg)
operator.persister = testutil.NewMockPersister("test")

// Two same fingerprint file , and smaller than config size
file1 := filetest.OpenTempWithPattern(t, tempDir, "*.log1")
file2 := filetest.OpenTempWithPattern(t, tempDir, "*.log2")

sameContent := "aaaaaaaaaaa"
filetest.WriteString(t, file1, sameContent+"\n")
filetest.WriteString(t, file2, sameContent+"\n")
operator.poll(context.Background())

token, attributes := sink.NextCall(t)
require.Equal(t, []byte(sameContent), token)
sink.ExpectNoCallsUntil(t, 100*time.Millisecond)
operator.wg.Wait()

newContent1 := "additional content in file 1 only"
filetest.WriteString(t, file1, newContent1+"\n")

var otherFileName string
if attributes[attrs.LogFileName].(string) == filepath.Base(file1.Name()) {
otherFileName = filepath.Base(file2.Name())
} else {
otherFileName = filepath.Base(file1.Name())
}

var foundSameFromOtherFile, foundNewFromFileOne bool
require.Eventually(t, func() bool {
operator.poll(context.Background())
defer operator.wg.Wait()

token, attributes = sink.NextCall(t)
switch {
case string(token) == sameContent && attributes[attrs.LogFileName].(string) == otherFileName:
foundSameFromOtherFile = true
case string(token) == newContent1 && attributes[attrs.LogFileName].(string) == filepath.Base(file1.Name()):
foundNewFromFileOne = true
default:
t.Errorf("unexpected token from file %q: %s", filepath.Base(attributes[attrs.LogFileName].(string)), token)
}
return foundSameFromOtherFile && foundNewFromFileOne
}, time.Second, 100*time.Millisecond)
}
40 changes: 27 additions & 13 deletions pkg/stanza/fileconsumer/internal/emittest/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ type sinkCfg struct {

type SinkOpt func(*sinkCfg)

type call struct {
token []byte
attrs map[string]any
type Call struct {
Token []byte
Attrs map[string]any
}

type Sink struct {
emitChan chan *call
emitChan chan *Call
timeout time.Duration
emit.Callback
}
Expand All @@ -53,14 +53,14 @@ func NewSink(opts ...SinkOpt) *Sink {
for _, opt := range opts {
opt(cfg)
}
emitChan := make(chan *call, cfg.emitChanLen)
emitChan := make(chan *Call, cfg.emitChanLen)
return &Sink{
emitChan: emitChan,
timeout: cfg.timeout,
Callback: func(_ context.Context, token []byte, attrs map[string]any) error {
copied := make([]byte, len(token))
copy(copied, token)
emitChan <- &call{copied, attrs}
emitChan <- &Call{copied, attrs}
return nil
},
}
Expand All @@ -76,7 +76,7 @@ func (s *Sink) NextTokens(t *testing.T, n int) [][]byte {
for i := 0; i < n; i++ {
select {
case call := <-s.emitChan:
emitChan = append(emitChan, call.token)
emitChan = append(emitChan, call.Token)
case <-time.After(s.timeout):
assert.Fail(t, "Timed out waiting for message")
return nil
Expand All @@ -88,7 +88,7 @@ func (s *Sink) NextTokens(t *testing.T, n int) [][]byte {
func (s *Sink) NextCall(t *testing.T) ([]byte, map[string]any) {
select {
case c := <-s.emitChan:
return c.token, c.attrs
return c.Token, c.Attrs
case <-time.After(s.timeout):
assert.Fail(t, "Timed out waiting for message")
return nil, nil
Expand All @@ -98,7 +98,7 @@ func (s *Sink) NextCall(t *testing.T) ([]byte, map[string]any) {
func (s *Sink) ExpectToken(t *testing.T, expected []byte) {
select {
case call := <-s.emitChan:
assert.Equal(t, expected, call.token)
assert.Equal(t, expected, call.Token)
case <-time.After(s.timeout):
assert.Fail(t, fmt.Sprintf("Timed out waiting for token: %s", expected))
}
Expand All @@ -109,7 +109,7 @@ func (s *Sink) ExpectTokens(t *testing.T, expected ...[]byte) {
for i := 0; i < len(expected); i++ {
select {
case call := <-s.emitChan:
actual = append(actual, call.token)
actual = append(actual, call.Token)
case <-time.After(s.timeout):
assert.Fail(t, "Timed out waiting for message")
return
Expand All @@ -121,21 +121,35 @@ func (s *Sink) ExpectTokens(t *testing.T, expected ...[]byte) {
func (s *Sink) ExpectCall(t *testing.T, expected []byte, attrs map[string]any) {
select {
case c := <-s.emitChan:
assert.Equal(t, expected, c.token)
assert.Equal(t, attrs, c.attrs)
assert.Equal(t, expected, c.Token)
assert.Equal(t, attrs, c.Attrs)
case <-time.After(s.timeout):
assert.Fail(t, fmt.Sprintf("Timed out waiting for token: %s", expected))
}
}

func (s *Sink) ExpectCalls(t *testing.T, expected ...*Call) {
actual := make([]*Call, 0, len(expected))
for i := 0; i < len(expected); i++ {
select {
case call := <-s.emitChan:
actual = append(actual, call)
case <-time.After(s.timeout):
assert.Fail(t, "Timed out waiting for message")
return
}
}
require.ElementsMatch(t, expected, actual)
}

func (s *Sink) ExpectNoCalls(t *testing.T) {
s.ExpectNoCallsUntil(t, 200*time.Millisecond)
}

func (s *Sink) ExpectNoCallsUntil(t *testing.T, d time.Duration) {
select {
case c := <-s.emitChan:
assert.Fail(t, "Received unexpected message", "Message: %s", c.token)
assert.Fail(t, "Received unexpected message", "Message: %s", c.Token)
case <-time.After(d):
}
}
75 changes: 52 additions & 23 deletions pkg/stanza/fileconsumer/internal/emittest/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ func TestNextToken(t *testing.T) {
s, testCalls := sinkTest(t)
for _, c := range testCalls {
token := s.NextToken(t)
assert.Equal(t, c.token, token)
assert.Equal(t, c.Token, token)
}
}

func TestNextTokenTimeout(t *testing.T) {
s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond))
for _, c := range testCalls {
token := s.NextToken(t)
assert.Equal(t, c.token, token)
assert.Equal(t, c.Token, token)
}

// Create a new T so we can expect it to fail without failing the overall test.
Expand All @@ -38,17 +38,17 @@ func TestNextTokens(t *testing.T) {
s, testCalls := sinkTest(t)
for i := 0; i < 5; i++ {
tokens := s.NextTokens(t, 2)
assert.Equal(t, testCalls[2*i].token, tokens[0])
assert.Equal(t, testCalls[2*i+1].token, tokens[1])
assert.Equal(t, testCalls[2*i].Token, tokens[0])
assert.Equal(t, testCalls[2*i+1].Token, tokens[1])
}
}

func TestNextTokensTimeout(t *testing.T) {
s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond))
for i := 0; i < 5; i++ {
tokens := s.NextTokens(t, 2)
assert.Equal(t, testCalls[2*i].token, tokens[0])
assert.Equal(t, testCalls[2*i+1].token, tokens[1])
assert.Equal(t, testCalls[2*i].Token, tokens[0])
assert.Equal(t, testCalls[2*i+1].Token, tokens[1])
}

// Create a new T so we can expect it to fail without failing the overall test.
Expand All @@ -61,17 +61,17 @@ func TestNextCall(t *testing.T) {
s, testCalls := sinkTest(t)
for _, c := range testCalls {
token, attributes := s.NextCall(t)
require.Equal(t, c.token, token)
require.Equal(t, c.attrs, attributes)
require.Equal(t, c.Token, token)
require.Equal(t, c.Attrs, attributes)
}
}

func TestNextCallTimeout(t *testing.T) {
s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond))
for _, c := range testCalls {
token, attributes := s.NextCall(t)
require.Equal(t, c.token, token)
require.Equal(t, c.attrs, attributes)
require.Equal(t, c.Token, token)
require.Equal(t, c.Attrs, attributes)
}

// Create a new T so we can expect it to fail without failing the overall test.
Expand All @@ -83,14 +83,14 @@ func TestNextCallTimeout(t *testing.T) {
func TestExpectToken(t *testing.T) {
s, testCalls := sinkTest(t)
for _, c := range testCalls {
s.ExpectToken(t, c.token)
s.ExpectToken(t, c.Token)
}
}

func TestExpectTokenTimeout(t *testing.T) {
s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond))
for _, c := range testCalls {
s.ExpectToken(t, c.token)
s.ExpectToken(t, c.Token)
}

// Create a new T so we can expect it to fail without failing the overall test.
Expand All @@ -102,14 +102,14 @@ func TestExpectTokenTimeout(t *testing.T) {
func TestExpectTokens(t *testing.T) {
s, testCalls := sinkTest(t)
for i := 0; i < 5; i++ {
s.ExpectTokens(t, testCalls[2*i].token, testCalls[2*i+1].token)
s.ExpectTokens(t, testCalls[2*i].Token, testCalls[2*i+1].Token)
}
}

func TestExpectTokensTimeout(t *testing.T) {
s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond))
for i := 0; i < 5; i++ {
s.ExpectTokens(t, testCalls[2*i].token, testCalls[2*i+1].token)
s.ExpectTokens(t, testCalls[2*i].Token, testCalls[2*i+1].Token)
}

// Create a new T so we can expect it to fail without failing the overall test.
Expand All @@ -121,14 +121,14 @@ func TestExpectTokensTimeout(t *testing.T) {
func TestExpectCall(t *testing.T) {
s, testCalls := sinkTest(t)
for _, c := range testCalls {
s.ExpectCall(t, c.token, c.attrs)
s.ExpectCall(t, c.Token, c.Attrs)
}
}

func TestExpectCallTimeout(t *testing.T) {
s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond))
for _, c := range testCalls {
s.ExpectCall(t, c.token, c.attrs)
s.ExpectCall(t, c.Token, c.Attrs)
}

// Create a new T so we can expect it to fail without failing the overall test.
Expand All @@ -137,6 +137,35 @@ func TestExpectCallTimeout(t *testing.T) {
assert.True(t, tt.Failed())
}

func TestExpectCalls(t *testing.T) {
s, testCalls := sinkTest(t)
testCallsOutOfOrder := make([]*Call, 0, 10)
for i := 0; i < len(testCalls); i += 2 {
testCallsOutOfOrder = append(testCallsOutOfOrder, testCalls[i])
}
for i := 1; i < len(testCalls); i += 2 {
testCallsOutOfOrder = append(testCallsOutOfOrder, testCalls[i])
}
s.ExpectCalls(t, testCallsOutOfOrder...)
}

func TestExpectCallsTimeout(t *testing.T) {
s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond))
testCallsOutOfOrder := make([]*Call, 0, 10)
for i := 0; i < len(testCalls); i += 2 {
testCallsOutOfOrder = append(testCallsOutOfOrder, testCalls[i])
}
for i := 1; i < len(testCalls); i += 2 {
testCallsOutOfOrder = append(testCallsOutOfOrder, testCalls[i])
}
s.ExpectCalls(t, testCallsOutOfOrder...)

// Create a new T so we can expect it to fail without failing the overall test.
tt := new(testing.T)
s.ExpectCalls(tt, new(Call))
assert.True(t, tt.Failed())
}

func TestExpectNoCalls(t *testing.T) {
s, _ := sinkTest(t)
s.NextTokens(t, 10) // drain the channel
Expand All @@ -156,24 +185,24 @@ func TestExpectNoCallsFailure(t *testing.T) {
func TestWithCallBuffer(t *testing.T) {
s, testCalls := sinkTest(t, WithCallBuffer(5))
for i := 0; i < 10; i++ {
s.ExpectCall(t, testCalls[i].token, testCalls[i].attrs)
s.ExpectCall(t, testCalls[i].Token, testCalls[i].Attrs)
}
}

func sinkTest(t *testing.T, opts ...SinkOpt) (*Sink, []*call) {
func sinkTest(t *testing.T, opts ...SinkOpt) (*Sink, []*Call) {
s := NewSink(opts...)
testCalls := make([]*call, 0, 10)
testCalls := make([]*Call, 0, 10)
for i := 0; i < 10; i++ {
testCalls = append(testCalls, &call{
token: []byte(fmt.Sprintf("token-%d", i)),
attrs: map[string]any{
testCalls = append(testCalls, &Call{
Token: []byte(fmt.Sprintf("token-%d", i)),
Attrs: map[string]any{
"key": fmt.Sprintf("value-%d", i),
},
})
}
go func() {
for _, c := range testCalls {
require.NoError(t, s.Callback(context.Background(), c.token, c.attrs))
require.NoError(t, s.Callback(context.Background(), c.Token, c.Attrs))
}
}()
return s, testCalls
Expand Down

0 comments on commit fd884ec

Please sign in to comment.