Skip to content

Commit

Permalink
add timeout to poll Docker stats
Browse files Browse the repository at this point in the history
  • Loading branch information
sharanyad committed Oct 6, 2020
1 parent 01e5a85 commit 3dea0a4
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 15 deletions.
48 changes: 33 additions & 15 deletions agent/dockerclient/dockerapi/docker_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ const (
maximumPullRetryDelay = 5 * time.Second
pullRetryDelayMultiplier = 2
pullRetryJitterMultiplier = 0.2

// pollStatsTimeout is the timeout for polling Docker Stats API;
// keeping it same as streaming stats inactivity timeout
pollStatsTimeout = 18 * time.Second
)

var ctxTimeoutStopContainer = dockerclient.StopContainerTimeout
Expand Down Expand Up @@ -1366,7 +1370,7 @@ func (dg *dockerGoClient) Stats(ctx context.Context, id string, inactivityTimeou
defer close(statsC)
// we need to start by getting container stats so that the task stats
// endpoint will be populated immediately.
stats, err := getContainerStatsNotStreamed(client, subCtx, id)
stats, err := getContainerStatsNotStreamed(client, subCtx, id, pollStatsTimeout)
if err != nil {
errC <- err
return
Expand All @@ -1381,7 +1385,7 @@ func (dg *dockerGoClient) Stats(ctx context.Context, id string, inactivityTimeou
statPollTicker := time.NewTicker(dg.config.PollingMetricsWaitDuration)
defer statPollTicker.Stop()
for range statPollTicker.C {
stats, err := getContainerStatsNotStreamed(client, subCtx, id)
stats, err := getContainerStatsNotStreamed(client, subCtx, id, pollStatsTimeout)
if err != nil {
errC <- err
return
Expand All @@ -1394,21 +1398,35 @@ func (dg *dockerGoClient) Stats(ctx context.Context, id string, inactivityTimeou
return statsC, errC
}

func getContainerStatsNotStreamed(client sdkclient.Client, ctx context.Context, id string) (*types.StatsJSON, error) {
resp, err := client.ContainerStats(ctx, id, false)
if err != nil {
return nil, fmt.Errorf("DockerGoClient: Unable to retrieve stats for container %s: %v", id, err)
func getContainerStatsNotStreamed(client sdkclient.Client, ctx context.Context, id string, timeout time.Duration) (*types.StatsJSON, error) {
ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
type statsResponse struct {
stats types.ContainerStats
err error
}
defer resp.Body.Close()

decoder := json.NewDecoder(resp.Body)
stats := &types.StatsJSON{}
err = decoder.Decode(stats)
if err != nil {
return nil, fmt.Errorf("DockerGoClient: Unable to decode stats for container %s: %v", id, err)
response := make(chan statsResponse, 1)
go func() {
stats, err := client.ContainerStats(ctxWithTimeout, id, false)
response <- statsResponse{stats, err}
}()
select {
case resp := <-response:
decoder := json.NewDecoder(resp.stats.Body)
stats := &types.StatsJSON{}
err := decoder.Decode(stats)
if err != nil {
return nil, fmt.Errorf("DockerGoClient: Unable to decode stats for container %s: %v", id, err)
}
defer resp.stats.Body.Close()
return stats, nil
case <-ctxWithTimeout.Done():
err := ctxWithTimeout.Err()
if err == context.DeadlineExceeded {
return nil, fmt.Errorf("DockerGoClient: timed out retrieving stats for container %s", id)
}
return nil, fmt.Errorf("DockerGoClient: Unable to retrieve stats for container %s: %v", id, err)
}

return stats, nil
}

func (dg *dockerGoClient) RemoveImage(ctx context.Context, imageName string, timeout time.Duration) error {
Expand Down
16 changes: 16 additions & 0 deletions agent/dockerclient/dockerapi/docker_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,22 @@ func TestStatsInactivityTimeout(t *testing.T) {
assert.Error(t, <-errC)
}

func TestPollStatsTimeout(t *testing.T) {
shortTimeout := 1 * time.Millisecond
mockDockerSDK, _, _, _, _, done := dockerClientSetup(t)
defer done()
wait := &sync.WaitGroup{}
wait.Add(1)
mockDockerSDK.EXPECT().ContainerStats(gomock.Any(), gomock.Any(), false).Do(func(x, y, z interface{}) {
wait.Wait()
}).MaxTimes(1).Return(types.ContainerStats{Body: mockStream{}}, nil)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
_, err := getContainerStatsNotStreamed(mockDockerSDK, ctx, "", shortTimeout)
assert.Error(t, err)
wait.Done()
}

func TestStatsInactivityTimeoutNoHit(t *testing.T) {
longInactivityTimeout := 500 * time.Millisecond
mockDockerSDK, client, _, _, _, done := dockerClientSetup(t)
Expand Down

0 comments on commit 3dea0a4

Please sign in to comment.