Skip to content

Commit

Permalink
Reprocess results if there was a processing error
Browse files Browse the repository at this point in the history
In the case where a connection gets reset or other
processing errors occur, the client is already
coded to automatically retry submission of the
results.

However, the server was marking the results as 'seen'
and would not reprocess them.

This change allows reprocessing results only if there
were errors processing before.

Fixes #728

Signed-off-by: John Schnake <[email protected]>
  • Loading branch information
johnSchnake committed Jun 5, 2019
1 parent 98d7d32 commit f51d88d
Show file tree
Hide file tree
Showing 3 changed files with 1,814 additions and 18 deletions.
61 changes: 51 additions & 10 deletions pkg/plugin/aggregation/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"os"
"path"
"sync"
"time"

"github.com/heptio/sonobuoy/pkg/plugin"
"github.com/heptio/sonobuoy/pkg/tarball"
Expand All @@ -35,7 +36,8 @@ import (
)

const (
gzipMimeType = "application/gzip"
gzipMimeType = "application/gzip"
defaultRetryWindow = 15 * time.Second
)

// Aggregator is responsible for taking results from an HTTP server (configured
Expand All @@ -45,18 +47,32 @@ const (
type Aggregator struct {
// OutputDir is the directory to write the node results
OutputDir string

// Results stores a map of check-in results the server has seen
Results map[string]*plugin.Result

// ExpectedResults stores a map of results the server should expect
ExpectedResults map[string]*plugin.ExpectedResult

// FailedResults is a map to track which plugin results were received
// but returned errors during processing. This enables us to retry results
// that failed to process if the client tries, as opposed to rejecting
// them as duplicates. Important if connection resets or network issues
// are common.
FailedResults map[string]time.Time

// resultEvents is a channel that is written to when results are seen
// by the server, so we can block until we're done.
resultEvents chan *plugin.Result

// resultsMutex prevents race conditions if two identical results
// come in at the same time.
resultsMutex sync.Mutex

// retryWindow is the duration which the server will continue to block during
// Wait() after a FailedResult has been reported, even if all expected results
// are accounted for. This prevents racing the client retries that may occur.
retryWindow time.Duration
}

// httpError is an internal error type which allows us to unify result processing
Expand Down Expand Up @@ -87,7 +103,9 @@ func NewAggregator(outputDir string, expected []plugin.ExpectedResult) *Aggregat
OutputDir: outputDir,
Results: make(map[string]*plugin.Result, len(expected)),
ExpectedResults: make(map[string]*plugin.ExpectedResult, len(expected)),
FailedResults: make(map[string]time.Time, len(expected)),
resultEvents: make(chan *plugin.Result, len(expected)),
retryWindow: defaultRetryWindow,
}

for i, expResult := range expected {
Expand All @@ -106,6 +124,22 @@ func (a *Aggregator) Wait(stop chan bool) {
return
}
}

// Give all clients a chance to retry failed requests.
for _, failedTime := range a.FailedResults {
remainingTime := retryWindowRemaining(failedTime, time.Now(), a.retryWindow)

// A sleep for 0 or < 0 returns immediately.
time.Sleep(remainingTime)
}
}

// retryWindowRemaining wraps the awkward looking calculation to see the time beteween
// two events and subtract out a given duration. If the returned duration is 0 or negative
// it means that the time between the first and second events is equal or greater to the
// window's duration.
func retryWindowRemaining(first, second time.Time, window time.Duration) time.Duration {
return first.Add(window).Sub(second)
}

// isComplete returns true if sure all expected results have checked in.
Expand Down Expand Up @@ -150,21 +184,35 @@ func (a *Aggregator) processResult(result *plugin.Result) error {
}
}

// Don't allow duplicates
if a.isResultDuplicate(result) {
// Don't allow duplicates unless it failed to process fully.
isDup := a.isResultDuplicate(result)
_, hadErrs := a.FailedResults[resultID]
if isDup && !hadErrs {
return &httpError{
err: fmt.Errorf("result %v already received", resultID),
code: http.StatusConflict,
}
}

// Send an event that we got this result even if we get an error, so
// that Wait() doesn't hang forever on problems.
defer func() {
a.Results[result.ExpectedResultID()] = result
a.resultEvents <- result
}()

if err := a.handleResult(result); err != nil {
// Drop a breadcrumb so that we reconsider new results from this result.
a.FailedResults[result.ExpectedResultID()] = time.Now()
return &httpError{
err: fmt.Errorf("error handling result %v: %v", resultID, err),
code: http.StatusInternalServerError,
}
}

// Upon success, we no longer want to keep processing duplicate results.
delete(a.FailedResults, result.ExpectedResultID())

return nil
}

Expand Down Expand Up @@ -223,13 +271,6 @@ func (a *Aggregator) IngestResults(resultsCh <-chan *plugin.Result) {
// handleResult takes a given plugin Result and writes it out to the
// filesystem, signaling to the resultEvents channel when complete.
func (a *Aggregator) handleResult(result *plugin.Result) error {
// Send an event that we got this result even if we get an error, so
// that Wait() doesn't hang forever on problems.
defer func() {
a.Results[result.ExpectedResultID()] = result
a.resultEvents <- result
}()

if result.MimeType == gzipMimeType {
return a.handleArchiveResult(result)
}
Expand Down
161 changes: 153 additions & 8 deletions pkg/plugin/aggregation/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ package aggregation

import (
"bytes"
"io"
"io/ioutil"
"net/http"
"os"
"os/exec"
"path"
"path/filepath"
"strings"
"testing"
"testing/iotest"
"time"

"github.com/heptio/sonobuoy/pkg/backplane/ca/authtest"
"github.com/heptio/sonobuoy/pkg/plugin"
Expand All @@ -33,7 +38,7 @@ import (

func TestAggregation(t *testing.T) {
expected := []plugin.ExpectedResult{
plugin.ExpectedResult{NodeName: "node1", ResultType: "systemd_logs"},
{NodeName: "node1", ResultType: "systemd_logs"},
}

withAggregator(t, expected, func(agg *Aggregator, srv *authtest.Server) {
Expand Down Expand Up @@ -61,7 +66,7 @@ func TestAggregation(t *testing.T) {

func TestAggregation_noExtension(t *testing.T) {
expected := []plugin.ExpectedResult{
plugin.ExpectedResult{NodeName: "node1", ResultType: "systemd_logs"},
{NodeName: "node1", ResultType: "systemd_logs"},
}

withAggregator(t, expected, func(agg *Aggregator, srv *authtest.Server) {
Expand All @@ -88,7 +93,7 @@ func TestAggregation_noExtension(t *testing.T) {

func TestAggregation_tarfile(t *testing.T) {
expected := []plugin.ExpectedResult{
plugin.ExpectedResult{ResultType: "e2e"},
{ResultType: "e2e"},
}

fileBytes := []byte("foo")
Expand Down Expand Up @@ -125,7 +130,7 @@ func TestAggregation_tarfile(t *testing.T) {

func TestAggregation_wrongnodes(t *testing.T) {
expected := []plugin.ExpectedResult{
plugin.ExpectedResult{NodeName: "node1", ResultType: "systemd_logs"},
{NodeName: "node1", ResultType: "systemd_logs"},
}

withAggregator(t, expected, func(agg *Aggregator, srv *authtest.Server) {
Expand All @@ -147,8 +152,8 @@ func TestAggregation_wrongnodes(t *testing.T) {

func TestAggregation_duplicates(t *testing.T) {
expected := []plugin.ExpectedResult{
plugin.ExpectedResult{NodeName: "node1", ResultType: "systemd_logs"},
plugin.ExpectedResult{NodeName: "node12", ResultType: "systemd_logs"},
{NodeName: "node1", ResultType: "systemd_logs"},
{NodeName: "node12", ResultType: "systemd_logs"},
}
withAggregator(t, expected, func(agg *Aggregator, srv *authtest.Server) {
URL, err := NodeResultURL(srv.URL, "node1", "systemd_logs")
Expand All @@ -174,9 +179,150 @@ func TestAggregation_duplicates(t *testing.T) {
})
}

func TestAggregation_duplicatesWithErrors(t *testing.T) {
// Setup aggregator with expected results and preload the test data/info
// that we want to transmit/compare against.
dir, err := ioutil.TempDir("", "sonobuoy_server_test")
if err != nil {
t.Fatalf("Could not create temp directory: %v", err)
}
defer os.RemoveAll(dir)
outpath := filepath.Join(dir, "systemd_logs", "results", "node1")
testDataPath := "./testdata/fakeLogData.txt"
testinfo, err := os.Stat(testDataPath)
if err != nil {
t.Fatalf("Could not stat test file: %v", err)
}
testDataReader, err := os.Open(testDataPath)
if err != nil {
t.Fatalf("Could not open test data file: %v", err)
}
defer testDataReader.Close()

expected := []plugin.ExpectedResult{
{NodeName: "node1", ResultType: "systemd_logs"},
{NodeName: "node12", ResultType: "systemd_logs"},
}
agg := NewAggregator(dir, expected)

// Send first result and force an error in processing.
errReader := iotest.TimeoutReader(testDataReader)
err = agg.processResult(&plugin.Result{Body: errReader, NodeName: "node1", ResultType: "systemd_logs"})
if err == nil {
t.Fatal("Expected error processing this due to reading error, instead got nil.")
}

// Confirm results are recorded but they are partial results.
realinfo, err := os.Stat(outpath)
if err != nil {
t.Fatalf("Could not stat output file: %v", err)
}
if realinfo.Size() == testinfo.Size() {
t.Fatal("Expected truncated results for first result (simulating error), but got all the data.")
}

// Retry the result without an error this time.
_, err = testDataReader.Seek(0, 0)
if err != nil {
t.Fatalf("Could not rewind test data file: %v", err)
}
err = agg.processResult(&plugin.Result{Body: testDataReader, NodeName: "node1", ResultType: "systemd_logs"})
if err != nil {
t.Errorf("Expected no error processing this result, got %v", err)
}

// Confirm the new results overwrite the old ones.
realinfo, err = os.Stat(outpath)
if err != nil {
t.Fatalf("Could not stat output file: %v", err)
}
if realinfo.Size() != testinfo.Size() {
t.Errorf("Expected all the data to be transmitted. Expected data size %v but got %v.", testinfo.Size(), realinfo.Size())
}
}

// TestAggregation_RetryWindow ensures that the server Wait() method
// gives clients a chance to retry if their results were not processed correctly.
func TestAggregation_RetryWindow(t *testing.T) {
// Setup aggregator with expected results and preload the test data/info
// that we want to transmit/compare against.
dir, err := ioutil.TempDir("", "sonobuoy_server_test")
if err != nil {
t.Fatalf("Could not create temp directory: %v", err)
}
defer os.RemoveAll(dir)
testRetryWindow := 1 * time.Second
testBufferDuration := 200 * time.Millisecond
expected := []plugin.ExpectedResult{
{NodeName: "node1", ResultType: "systemd_logs"},
}

testCases := []struct {
desc string
postProcessSleep time.Duration
simulateErr bool
expectExtraWait time.Duration
}{
{
desc: "Error causes us to wait at least the retry window",
simulateErr: true,
expectExtraWait: testRetryWindow,
}, {
desc: "Retry window is sliding",
simulateErr: true,
postProcessSleep: 500 * time.Millisecond,
expectExtraWait: 500 * time.Millisecond,
}, {
desc: "Retry window can slide to 0",
simulateErr: true,
postProcessSleep: testRetryWindow,
expectExtraWait: 0,
}, {
desc: "No retry window without error",
simulateErr: false,
expectExtraWait: 0,
},
}

for _, tc := range testCases {
agg := NewAggregator(dir, expected)
// Shorten retry window for testing.
agg.retryWindow = testRetryWindow
testDataPath := "./testdata/fakeLogData.txt"
testDataReader, err := os.Open(testDataPath)
if err != nil {
t.Fatalf("Could not open test data file: %v", err)
}
defer testDataReader.Close()

var r io.Reader
if tc.simulateErr {
r = iotest.TimeoutReader(testDataReader)
} else {
r = strings.NewReader("foo")
}

err = agg.processResult(&plugin.Result{Body: r, NodeName: "node1", ResultType: "systemd_logs"})
if err == nil && tc.simulateErr {
t.Fatal("Expected error processing this due to reading error, instead got nil.")
}
// check time before/after wait and ensure it is greater than the retryWindow.
time.Sleep(tc.postProcessSleep)
start := time.Now()
agg.Wait(make(chan bool))
waitTime := time.Now().Sub(start)

// Add buffer to avoid raciness due to processing time.
diffTime := waitTime - tc.expectExtraWait
if diffTime > testBufferDuration || diffTime < -1*testBufferDuration {
t.Errorf("Expected Wait() to wait the duration (%v) due to failed result, instead waited only %v", agg.retryWindow, waitTime)
}
}
}

func TestAggregation_errors(t *testing.T) {
expected := []plugin.ExpectedResult{
plugin.ExpectedResult{ResultType: "e2e"},
{ResultType: "e2e"},
}

withAggregator(t, expected, func(agg *Aggregator, srv *authtest.Server) {
Expand All @@ -202,7 +348,6 @@ func withAggregator(t *testing.T, expected []plugin.ExpectedResult, callback fun
dir, err := ioutil.TempDir("", "sonobuoy_server_test")
if err != nil {
t.Fatal("Could not create temp directory")
t.FailNow()
return
}
defer os.RemoveAll(dir)
Expand Down
Loading

0 comments on commit f51d88d

Please sign in to comment.