Skip to content

Commit

Permalink
Merge pull request #1849 from adnxn/dev-granular-timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
adnxn committed Feb 19, 2019
2 parents 2cf4734 + 3265bf9 commit f4da625
Show file tree
Hide file tree
Showing 13 changed files with 391 additions and 39 deletions.
6 changes: 4 additions & 2 deletions agent/acs/model/api/api-2.json
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,9 @@
"registryAuthentication":{"shape":"RegistryAuthenticationData"},
"logsAuthStrategy":{"shape":"AuthStrategy"},
"secrets":{"shape":"SecretList"},
"dependsOn":{"shape": "DependsOnList"}
"dependsOn":{"shape": "DependsOnList"},
"startTimeout":{"shape":"Integer"},
"stopTimeout":{"shape":"Integer"}
}
},
"ContainerList":{
Expand Down Expand Up @@ -613,4 +615,4 @@
]
}
}
}
}
4 changes: 4 additions & 0 deletions agent/acs/model/ecsacs/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ type Container struct {

Secrets []*Secret `locationName:"secrets" type:"list"`

StartTimeout *int64 `locationName:"startTimeout" type:"integer"`

StopTimeout *int64 `locationName:"stopTimeout" type:"integer"`

VolumesFrom []*VolumeFrom `locationName:"volumesFrom" type:"list"`
}

Expand Down
20 changes: 20 additions & 0 deletions agent/api/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ type Container struct {
// LogsAuthStrategy specifies how the logs driver for the container will be
// authenticated
LogsAuthStrategy string
// StartTimeout specifies the time the agent waits for StartContainer api call
// before timing out
StartTimeout uint
// StopTimeout specifies the time value to be passed as StopContainer api call
StopTimeout uint

// lock is used for fields that are accessed and updated concurrently
lock sync.RWMutex

Expand Down Expand Up @@ -867,3 +873,17 @@ func (c *Container) HasSecretAsEnv() bool {
}
return false
}

func (c *Container) GetStartTimeout() time.Duration {
c.lock.Lock()
defer c.lock.Unlock()

return time.Duration(c.StartTimeout) * time.Second
}

func (c *Container) GetStopTimeout() time.Duration {
c.lock.Lock()
defer c.lock.Unlock()

return time.Duration(c.StopTimeout) * time.Second
}
17 changes: 15 additions & 2 deletions agent/api/container/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func TestMergeEnvironmentVariables(t *testing.T) {
},

{
Name: "merge single item to nil container env var map",
Name: "merge single item to nil container env var map",
InContainerEnvironment: nil,
InEnvVarMap: map[string]string{
"SECRET1": "secret1"},
Expand Down Expand Up @@ -357,7 +357,7 @@ func TestMergeEnvironmentVariables(t *testing.T) {
},

{
Name: "merge nil to nil container env var map",
Name: "merge nil to nil container env var map",
InContainerEnvironment: nil,
InEnvVarMap: nil,
OutEnvVarMap: map[string]string{},
Expand Down Expand Up @@ -455,3 +455,16 @@ func TestHasSecretAsEnv(t *testing.T) {
}

}

func TestPerContainerTimeouts(t *testing.T) {
timeout := uint(10)
expectedTimeout := time.Duration(timeout) * time.Second

container := Container{
StartTimeout: timeout,
StopTimeout: timeout,
}

assert.Equal(t, container.GetStartTimeout(), expectedTimeout)
assert.Equal(t, container.GetStopTimeout(), expectedTimeout)
}
59 changes: 39 additions & 20 deletions agent/api/task/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2845,28 +2845,27 @@ func TestTaskGPUDisabled(t *testing.T) {
},
},
}

assert.False(t, testTask.isGPUEnabled())
}

func TestInitializeContainerOrderingWithLinksAndVolumesFrom(t *testing.T) {
containerWithOnlyVolume := &apicontainer.Container{
Name: "myName",
Image: "image:tag",
Name: "myName",
Image: "image:tag",
VolumesFrom: []apicontainer.VolumeFrom{{SourceContainer: "myName1"}},
}

containerWithOnlyLink := &apicontainer.Container{
Name: "myName1",
Image: "image:tag",
Links: []string{"myName"},
Links: []string{"myName"},
}

containerWithBothVolumeAndLink := &apicontainer.Container{
Name: "myName2",
Image: "image:tag",
Name: "myName2",
Image: "image:tag",
VolumesFrom: []apicontainer.VolumeFrom{{SourceContainer: "myName"}},
Links: []string{"myName1"},
Links: []string{"myName1"},
}

containerWithNoVolumeOrLink := &apicontainer.Container{
Expand All @@ -2877,8 +2876,8 @@ func TestInitializeContainerOrderingWithLinksAndVolumesFrom(t *testing.T) {
task := &Task{
Arn: "test",
ResourcesMapUnsafe: make(map[string][]taskresource.TaskResource),
Containers: []*apicontainer.Container{containerWithOnlyVolume, containerWithOnlyLink,
containerWithBothVolumeAndLink, containerWithNoVolumeOrLink},
Containers: []*apicontainer.Container{containerWithOnlyVolume, containerWithOnlyLink,
containerWithBothVolumeAndLink, containerWithNoVolumeOrLink},
}

err := task.initializeContainerOrderingForVolumes()
Expand All @@ -2887,27 +2886,27 @@ func TestInitializeContainerOrderingWithLinksAndVolumesFrom(t *testing.T) {
assert.NoError(t, err)

containerResultWithVolume := task.Containers[0]
assert.Equal(t, "myName1", containerResultWithVolume.DependsOn[0].Container)
assert.Equal(t, ContainerOrderingStartCondition, containerResultWithVolume.DependsOn[0].Condition)
assert.Equal(t, "myName1", containerResultWithVolume.DependsOn[0].Container)
assert.Equal(t, ContainerOrderingStartCondition, containerResultWithVolume.DependsOn[0].Condition)

containerResultWithLink := task.Containers[1]
assert.Equal(t, "myName", containerResultWithLink.DependsOn[0].Container)
assert.Equal(t, ContainerOrderingRunningCondition, containerResultWithLink.DependsOn[0].Condition)
assert.Equal(t, "myName", containerResultWithLink.DependsOn[0].Container)
assert.Equal(t, ContainerOrderingRunningCondition, containerResultWithLink.DependsOn[0].Condition)

containerResultWithBothVolumeAndLink := task.Containers[2]
assert.Equal(t, "myName", containerResultWithBothVolumeAndLink.DependsOn[0].Container)
assert.Equal(t, ContainerOrderingStartCondition, containerResultWithBothVolumeAndLink.DependsOn[0].Condition)
assert.Equal(t, "myName1", containerResultWithBothVolumeAndLink.DependsOn[1].Container)
assert.Equal(t, ContainerOrderingRunningCondition, containerResultWithBothVolumeAndLink.DependsOn[1].Condition)
assert.Equal(t, "myName", containerResultWithBothVolumeAndLink.DependsOn[0].Container)
assert.Equal(t, ContainerOrderingStartCondition, containerResultWithBothVolumeAndLink.DependsOn[0].Condition)
assert.Equal(t, "myName1", containerResultWithBothVolumeAndLink.DependsOn[1].Container)
assert.Equal(t, ContainerOrderingRunningCondition, containerResultWithBothVolumeAndLink.DependsOn[1].Condition)

containerResultWithNoVolumeOrLink := task.Containers[3]
assert.Equal(t, 0, len(containerResultWithNoVolumeOrLink.DependsOn))
assert.Equal(t, 0, len(containerResultWithNoVolumeOrLink.DependsOn))
}

func TestInitializeContainerOrderingWithError(t *testing.T) {
containerWithVolumeError := &apicontainer.Container{
Name: "myName",
Image: "image:tag",
Name: "myName",
Image: "image:tag",
VolumesFrom: []apicontainer.VolumeFrom{{SourceContainer: "dummyContainer"}},
}

Expand Down Expand Up @@ -2945,3 +2944,23 @@ func TestInitializeContainerOrderingWithError(t *testing.T) {
errLink2 := task2.initializeContainerOrderingForLinks()
assert.Error(t, errLink2)
}

func TestTaskFromACSPerContainerTimeouts(t *testing.T) {
modelTimeout := int64(10)
expectedTimeout := uint(modelTimeout)

taskFromACS := ecsacs.Task{
Containers: []*ecsacs.Container{
{
StartTimeout: aws.Int64(modelTimeout),
StopTimeout: aws.Int64(modelTimeout),
},
},
}
seqNum := int64(42)
task, err := TaskFromACS(&taskFromACS, &ecsacs.PayloadMessage{SeqNum: &seqNum})
assert.Nil(t, err, "Should be able to handle acs task")

assert.Equal(t, task.Containers[0].StartTimeout, expectedTimeout)
assert.Equal(t, task.Containers[0].StopTimeout, expectedTimeout)
}
22 changes: 13 additions & 9 deletions agent/dockerclient/dockerapi/docker_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ const (
pullRetryJitterMultiplier = 0.2
)

var ctxTimeoutStopContainer = dockerclient.StopContainerTimeout

// DockerClient interface to make testing it easier
type DockerClient interface {
// SupportedVersions returns a slice of the supported docker versions (or at least supposedly supported).
Expand Down Expand Up @@ -550,8 +552,8 @@ func (dg *dockerGoClient) createContainer(ctx context.Context,
return dg.containerMetadata(ctx, dockerContainer.ID)
}

func (dg *dockerGoClient) StartContainer(ctx context.Context, id string, timeout time.Duration) DockerContainerMetadata {
ctx, cancel := context.WithTimeout(ctx, timeout)
func (dg *dockerGoClient) StartContainer(ctx context.Context, id string, ctxTimeout time.Duration) DockerContainerMetadata {
ctx, cancel := context.WithTimeout(ctx, ctxTimeout)
defer cancel()
defer metrics.MetricsEngineGlobal.RecordDockerMetric("START_CONTAINER")()
// Buffered channel so in the case of timeout it takes one write, never gets
Expand All @@ -566,7 +568,7 @@ func (dg *dockerGoClient) StartContainer(ctx context.Context, id string, timeout
// send back the DockerTimeoutError
err := ctx.Err()
if err == context.DeadlineExceeded {
return DockerContainerMetadata{Error: &DockerTimeoutError{timeout, "started"}}
return DockerContainerMetadata{Error: &DockerTimeoutError{ctxTimeout, "started"}}
}
return DockerContainerMetadata{Error: CannotStartContainerError{err}}
}
Expand Down Expand Up @@ -654,13 +656,16 @@ func (dg *dockerGoClient) inspectContainer(ctx context.Context, dockerID string)
}

func (dg *dockerGoClient) StopContainer(ctx context.Context, dockerID string, timeout time.Duration) DockerContainerMetadata {
ctx, cancel := context.WithTimeout(ctx, timeout)
// ctxTimeout is sum of timeout(applied to the StopContainer api call) and a fixed constant dockerclient.StopContainerTimeout
// the context's timeout should be greater than the sigkill timout for the StopContainer call
ctxTimeout := timeout + ctxTimeoutStopContainer
ctx, cancel := context.WithTimeout(ctx, ctxTimeout)
defer cancel()
defer metrics.MetricsEngineGlobal.RecordDockerMetric("STOP_CONTAINER")()
// Buffered channel so in the case of timeout it takes one write, never gets
// read, and can still be GC'd
response := make(chan DockerContainerMetadata, 1)
go func() { response <- dg.stopContainer(ctx, dockerID) }()
go func() { response <- dg.stopContainer(ctx, dockerID, timeout) }()
select {
case resp := <-response:
return resp
Expand All @@ -669,19 +674,18 @@ func (dg *dockerGoClient) StopContainer(ctx context.Context, dockerID string, ti
// send back the DockerTimeoutError
err := ctx.Err()
if err == context.DeadlineExceeded {
return DockerContainerMetadata{Error: &DockerTimeoutError{timeout, "stopped"}}
return DockerContainerMetadata{Error: &DockerTimeoutError{ctxTimeout, "stopped"}}
}
return DockerContainerMetadata{Error: CannotStopContainerError{err}}
}
}

func (dg *dockerGoClient) stopContainer(ctx context.Context, dockerID string) DockerContainerMetadata {
func (dg *dockerGoClient) stopContainer(ctx context.Context, dockerID string, timeout time.Duration) DockerContainerMetadata {
client, err := dg.sdkDockerClient()
if err != nil {
return DockerContainerMetadata{Error: CannotGetDockerClientError{version: dg.version, err: err}}
}

err = client.ContainerStop(ctx, dockerID, &dg.config.DockerStopTimeout)
err = client.ContainerStop(ctx, dockerID, &timeout)
metadata := dg.containerMetadata(ctx, dockerID)
if err != nil {
seelog.Infof("DockerGoClient: error stopping container %s: %v", dockerID, err)
Expand Down
1 change: 1 addition & 0 deletions agent/dockerclient/dockerapi/docker_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ func TestStopContainerTimeout(t *testing.T) {
cfg.DockerStopTimeout = xContainerShortTimeout
mockDockerSDK, client, _, _, _, done := dockerClientSetupWithConfig(t, cfg)
defer done()
ctxTimeoutStopContainer = xContainerShortTimeout

wait := &sync.WaitGroup{}
wait.Add(1)
Expand Down
18 changes: 14 additions & 4 deletions agent/engine/docker_task_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,13 @@ func (engine *DockerTaskEngine) startContainer(task *apitask.Task, container *ap
}
}
startContainerBegin := time.Now()
dockerContainerMD := client.StartContainer(engine.ctx, dockerContainer.DockerID, engine.cfg.ContainerStartTimeout)

ctxTimeoutStartContainer := container.GetStartTimeout()
if ctxTimeoutStartContainer <= 0 {
ctxTimeoutStartContainer = engine.cfg.ContainerStartTimeout
}

dockerContainerMD := client.StartContainer(engine.ctx, dockerContainer.DockerID, ctxTimeoutStartContainer)

// Get metadata through container inspection and available task information then write this to the metadata file
// Performs this in the background to avoid delaying container start
Expand Down Expand Up @@ -1095,9 +1101,13 @@ func (engine *DockerTaskEngine) stopContainer(task *apitask.Task, container *api
}
seelog.Infof("Task engine [%s]: cleaned pause container network namespace", task.Arn)
}
// timeout is defined by the const 'stopContainerTimeout' and the 'DockerStopTimeout' in the config
timeout := engine.cfg.DockerStopTimeout + dockerclient.StopContainerTimeout
return engine.client.StopContainer(engine.ctx, dockerContainer.DockerID, timeout)

apiTimeoutStopContainer := container.GetStopTimeout()
if apiTimeoutStopContainer <= 0 {
apiTimeoutStopContainer = engine.cfg.DockerStopTimeout
}

return engine.client.StopContainer(engine.ctx, dockerContainer.DockerID, apiTimeoutStopContainer)
}

func (engine *DockerTaskEngine) removeContainer(task *apitask.Task, container *apicontainer.Container) error {
Expand Down
2 changes: 1 addition & 1 deletion agent/engine/docker_task_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1271,7 +1271,7 @@ func TestStopPauseContainerCleanupCalled(t *testing.T) {
mockCNIClient.EXPECT().CleanupNS(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil),
dockerClient.EXPECT().StopContainer(gomock.Any(),
containerID,
defaultConfig.DockerStopTimeout+dockerclient.StopContainerTimeout,
defaultConfig.DockerStopTimeout,
).Return(dockerapi.DockerContainerMetadata{}),
)

Expand Down
39 changes: 39 additions & 0 deletions agent/engine/engine_unix_integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1447,3 +1447,42 @@ func TestGPUAssociationTask(t *testing.T) {
go taskEngine.AddTask(&taskUpdate)
verifyTaskIsStopped(stateChangeEvents, testTask)
}

func TestPerContainerStopTimeout(t *testing.T) {
// set the global stop timemout, but this should be ignored since the per container value is set
globalStopContainerTimeout := 1000 * time.Second
os.Setenv("ECS_CONTAINER_STOP_TIMEOUT", globalStopContainerTimeout.String())
defer os.Unsetenv("ECS_CONTAINER_STOP_TIMEOUT")
cfg := defaultTestConfigIntegTest()

taskEngine, _, _ := setup(cfg, nil, t)

dockerTaskEngine := taskEngine.(*DockerTaskEngine)

if dockerTaskEngine.cfg.DockerStopTimeout != globalStopContainerTimeout {
t.Errorf("Expect ECS_CONTAINER_STOP_TIMEOUT to be set to , %v", dockerTaskEngine.cfg.DockerStopTimeout)
}

testTask := createTestTask("TestDockerStopTimeout")
testTask.Containers[0].Command = []string{"sh", "-c", "trap 'echo hello' SIGTERM; while true; do echo `date +%T`; sleep 1s; done;"}
testTask.Containers[0].Image = testBusyboxImage
testTask.Containers[0].Name = "test-docker-timeout"
testTask.Containers[0].StopTimeout = uint(testDockerStopTimeout.Seconds())

go dockerTaskEngine.AddTask(testTask)

verifyContainerRunningStateChange(t, taskEngine)
verifyTaskRunningStateChange(t, taskEngine)

startTime := ttime.Now()
dockerTaskEngine.stopContainer(testTask, testTask.Containers[0])

verifyContainerStoppedStateChange(t, taskEngine)

if ttime.Since(startTime) < testDockerStopTimeout {
t.Errorf("Container stopped before the timeout: %v", ttime.Since(startTime))
}
if ttime.Since(startTime) > testDockerStopTimeout+1*time.Second {
t.Errorf("Container should have stopped eariler, but stopped after %v", ttime.Since(startTime))
}
}
5 changes: 4 additions & 1 deletion agent/statemanager/state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ const (
// a) Add 'Associations' field to 'api.task.task'
// b) Add 'GPUIDs' field to 'apicontainer.Container'
// c) Add 'NvidiaRuntime' field to 'api.task.task'
// 20) Add 'DependsOn' field to 'apicontainer.Container'
// 20)
// a) Add 'DependsOn' field to 'apicontainer.Container'
// b) Add 'StartTime' field to 'api.container.Container'
// c) Add 'StopTime' field to 'api.container.Container'
ECSDataVersion = 20

// ecsDataFile specifies the filename in the ECS_DATADIR
Expand Down
Loading

0 comments on commit f4da625

Please sign in to comment.