-
Notifications
You must be signed in to change notification settings - Fork 617
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
batch container state change events #867
Conversation
agent/api/ecsclient/client.go
Outdated
trimmed := change.Reason[0:ecsMaxReasonLength] | ||
statechange.Reason = &trimmed | ||
} else { | ||
statechange.Reason = &change.Reason |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use aws.String
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
agent/api/ecsclient/client.go
Outdated
} | ||
} | ||
stat := change.Status.String() | ||
if stat == "DEAD" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm not sure tbh. the check was in the original SubmitContainerStateChange code. although api.ContainerStatus doesnt have a "DEAD" state, so we may be good to remove this. @aaithal any ideas?
agent/api/ecsclient/client.go
Outdated
stat = "STOPPED" | ||
} | ||
|
||
if stat != "STOPPED" && stat != "RUNNING" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can compare directly with api.ContainerStopped
and api.ContainerRunning
without converting the status to string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
agent/api/ecsclient/client.go
Outdated
log.Info("Not submitting not supported upstream container state", "state", stat) | ||
return nil | ||
} | ||
statechange.Status = &stat |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above, please use aws.String
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
agent/api/ecsclient/client.go
Outdated
statechange.Status = &stat | ||
|
||
if change.ExitCode != nil { | ||
exitCode := int64(*change.ExitCode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above, you can use aws.IntValue
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
agent/api/ecsclient/client.go
Outdated
bindIP := binding.BindIP | ||
protocol := binding.Protocol.String() | ||
networkBindings[i] = &ecs.NetworkBinding{ | ||
BindIP: &bindIP, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above, use aws.String
or aws.Int64
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
agent/eventhandler/task_handler.go
Outdated
@@ -47,13 +47,16 @@ type TaskHandler struct { | |||
tasksToEvents map[string]*eventList | |||
// tasksToEventsLock for locking the map | |||
tasksToEventsLock sync.RWMutex | |||
|
|||
batchMap map[string][]api.ContainerStateChange |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a comment here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
agent/eventhandler/task_handler.go
Outdated
return nil | ||
|
||
default: | ||
return errors.New("eventhandler: unable to determine event type from state change event") | ||
} | ||
} | ||
|
||
// batchContainerEvent collects container state change events for a given task arn | ||
func (handler *TaskHandler) batchContainerEvent(event api.ContainerStateChange) { | ||
handler.batchMap[event.TaskArn] = append(handler.batchMap[event.TaskArn], event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may need a lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm not sure if this needs a lock. the batchMap data is not shared across goroutines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe we guarantee that handler.AddStateChangeEvent
is not called across goroutines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hrm. that's true. i'll make this change.
agent/eventhandler/task_handler.go
Outdated
// flushBatch attaches the task arn's container events to TaskStateChange event that | ||
// is being submittied to the backend | ||
func (handler *TaskHandler) flushBatch(event *api.TaskStateChange) { | ||
event.Containers = append(event.Containers, handler.batchMap[event.TaskArn]...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above, this needs a lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above.
7dade26
to
9750ac7
Compare
agent/api/ecsclient/client.go
Outdated
@@ -279,6 +279,51 @@ func (client *APIECSClient) getCustomAttributes() []*ecs.Attribute { | |||
return attributes | |||
} | |||
|
|||
func (client *APIECSClient) buildContainerStateChangePayload(change api.ContainerStateChange) *ecs.ContainerStateChange { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can you move this function below SubitTaskStateChange
? As a general rule, I try to organize the code such that caller is above callee.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good.
agent/eventhandler/task_handler.go
Outdated
return nil | ||
|
||
default: | ||
return errors.New("eventhandler: unable to determine event type from state change event") | ||
} | ||
} | ||
|
||
// batchContainerEvent collects container state change events for a given task arn | ||
func (handler *TaskHandler) batchContainerEvent(event api.ContainerStateChange) { | ||
handler.batchMap[event.TaskArn] = append(handler.batchMap[event.TaskArn], event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe we guarantee that handler.AddStateChangeEvent
is not called across goroutines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mostly looks good. I have a bunch of minor comments.
agent/api/ecsclient/client.go
Outdated
status := change.Status | ||
|
||
if status != api.ContainerStopped && status != api.ContainerRunning { | ||
log.Info("Not submitting not supported upstream container state", "state", status) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use seelog instead here?
agent/api/ecsclient/client.go
Outdated
statechange.Status = aws.String(status.String()) | ||
|
||
if change.ExitCode != nil { | ||
exitCode := int64(*change.ExitCode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use aws.Int64Val
instead
@@ -70,6 +70,9 @@ func New(p client.ConfigProvider, cfgs ...*aws.Config) *ECS { | |||
|
|||
// newClient creates, initializes and returns a new service client instance. | |||
func newClient(cfg aws.Config, handlers request.Handlers, endpoint, signingRegion, signingName string) *ECS { | |||
if len(signingName) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's stick to signingName == ""
since this is a string type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about the nil case?
agent/eventhandler/task_handler.go
Outdated
@@ -47,13 +47,18 @@ type TaskHandler struct { | |||
tasksToEvents map[string]*eventList | |||
// tasksToEventsLock for locking the map | |||
tasksToEventsLock sync.RWMutex | |||
// batchMap is used to collect container events | |||
// between task transitions | |||
batchMap map[string][]api.ContainerStateChange |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you rename this as tasksToContainerStates
? If you do that, you can rename the lock as well
agent/eventhandler/task_handler.go
Outdated
return nil | ||
|
||
default: | ||
return errors.New("eventhandler: unable to determine event type from state change event") | ||
} | ||
} | ||
|
||
// batchContainerEvent collects container state change events for a given task arn | ||
func (handler *TaskHandler) batchContainerEvent(event api.ContainerStateChange) { | ||
handler.batchMapLock.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use defer
for unlocking:
handler.batchMapLock.Lock()
defer handler.batchMapLock.Unlock()
handler.batchMap[event.TaskArn] = append(handler.batchMap[event.TaskArn], event)
agent/eventhandler/task_handler.go
Outdated
// flushBatch attaches the task arn's container events to TaskStateChange event that | ||
// is being submittied to the backend | ||
func (handler *TaskHandler) flushBatch(event *api.TaskStateChange) { | ||
handler.batchMapLock.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, use defer
for unlocking
agent/api/ecsclient/client.go
Outdated
if err != nil { | ||
log.Warn("Could not submit a task state change", "err", err) | ||
return err | ||
} | ||
|
||
// WIP LOG payload |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When are you getting rid of this?
ContainerName: aws.String(change.ContainerName), | ||
} | ||
|
||
if change.Reason != "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about the nil case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so change api.ContainerStateChange
is a struct value and can't be nil. at worst it may have its fields zeroed if no value is specified.
@@ -70,6 +70,9 @@ func New(p client.ConfigProvider, cfgs ...*aws.Config) *ECS { | |||
|
|||
// newClient creates, initializes and returns a new service client instance. | |||
func newClient(cfg aws.Config, handlers request.Handlers, endpoint, signingRegion, signingName string) *ECS { | |||
if len(signingName) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about the nil case?
This change updates the model for the ecs client and includes the corresponding code changes to SubmitTaskStateChange. Secondly we've added the batching logic to eventhandler, which collects container state change events as the generated and then attaches the collection of events to the SubmitTaskStateChange payload as dictated but the model changes.
This change updates the eventhandler tests to reflect the batching changes.
This change updates existing test coverage. We also add a lock around the map that aggregates container state change events. There is also a minor fix to a test failure that was causing a nil pointer panic in the agent/stats package.
a474c3b
to
900e02c
Compare
This commit includes several minor changes to address reviewer comments
900e02c
to
19b2238
Compare
agent/api/ecsclient/client.go
Outdated
status := change.Status | ||
|
||
if status != api.ContainerStopped && status != api.ContainerRunning { | ||
seelog.Info("Not submitting not supported upstream container state", "state", status) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be logged as a warning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed.
agent/eventhandler/task_handler.go
Outdated
// tasksToContainerStates is used to collect container events | ||
// between task transitions | ||
tasksToContainerStates map[string][]api.ContainerStateChange | ||
tasksToContainerStatesLock sync.RWMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you switch to using a single lock for this struct? Want to avoid the repeat of "deadly embrace" because of multiple locks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do. and yup 😅
containerEvents[i] = client.buildContainerStateChangePayload(containerEvent) | ||
} | ||
|
||
req.Containers = containerEvents |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we combined several event into one, it may be good to log the actual events here, so that we can know which statechange was actually been submitted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed. i've added info level logging in agent/eventhandler/task_handler.go
to capture this info. we log other information about which events are submitted there as well.
d8d0646
to
6a096a4
Compare
6a096a4
to
26478c1
Compare
agent/api/ecsclient/client.go
Outdated
status := change.Status | ||
|
||
if status != api.ContainerStopped && status != api.ContainerRunning { | ||
seelog.Warn("Not submitting unsupported upstream container state", "state", status) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use seelog.Warnf here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
out of curiosity - why one over the other? the existing code seems to use seelog.Warn
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer the formatted string kind because it gives you control over the output format and not having to deal with whatever golang thinks the stringified version of an object should look like.
agent/eventhandler/handler_test.go
Outdated
client.EXPECT().SubmitContainerStateChange(sortaRedundantCont.(api.ContainerStateChange)).Do(func(interface{}) { wait.Done() }) | ||
client.EXPECT().SubmitTaskStateChange(notReplacedTask.(api.TaskStateChange)).Do(func(interface{}) { wait.Done() }) | ||
client.EXPECT().SubmitTaskStateChange(sortaRedundantTask.(api.TaskStateChange)).Do(func(interface{}) { wait.Done() }) | ||
time.Sleep(1 * time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the sleep here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i was using it to ensure ordering. ill change it to use waitgroups.
3fe6f58
to
38d5a99
Compare
agent/api/ecsclient/client.go
Outdated
status := change.Status | ||
|
||
if status != api.ContainerStopped && status != api.ContainerRunning { | ||
seelog.Warnf("Not submitting unsupported upstream container state: %v", status) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also log details of the task here? It's not much use when debugging if that's not logged. Also, please use ".. : %s", status.String()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed formatting. also - we are capturing details about both task and container events a layer above in agent/eventhandler/task_handler.go
as they are passed to the client. did you mean we should capture debug logs here as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, when you're printing a warning/error, it's useful to have as much info as possible about what led to that in the message itself (because earlier lines might get lost because of other async routines getting in the way)
@@ -70,6 +70,9 @@ func New(p client.ConfigProvider, cfgs ...*aws.Config) *ECS { | |||
|
|||
// newClient creates, initializes and returns a new service client instance. | |||
func newClient(cfg aws.Config, handlers request.Handlers, endpoint, signingRegion, signingName string) *ECS { | |||
if signingName == "" { | |||
signingName = "ecs" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a const (or, you can reuse ServiceName
)
agent/eventhandler/task_handler.go
Outdated
handler.taskHandlerLock.Lock() | ||
defer handler.taskHandlerLock.Unlock() | ||
|
||
seelog.Info("TaskHandler, batching container event :", event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use Infof
here.
38d5a99
to
a41ff4f
Compare
This commit also contains changes to log formatting to address code review comments.
a41ff4f
to
211d4ba
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, after this you should work on fix the leak in #798.
This implements the changes required to batch container state change events and include them with the task state change payload
Summary
This change updates the model for the ecs client and includes the corresponding code changes to SubmitTaskStateChange in the api package. Secondly we've added the batching logic to eventhandler.
Implementation details
Two main points of interest:
In agent/eventhandler/task_handler.go, as ContainerStateChange events are propagated up, they are collected and bucketed by task arn. Then on a task state transition, we attach the the collection of ContainerStateChange events to the TaskStateChange payload that is sent to the the backend.
In agent/api/ecsclient/client.go, we made the changes required to reflect the model changes and also added logic to attach the container state change message payloads to task state change message.
Testing
make release
)go build -out amazon-ecs-agent.exe ./agent
)make test
) passgo test -timeout=25s ./agent/...
) passmake run-integ-tests
) pass.\scripts\run-integ-tests.ps1
) passmake run-functional-tests
) pass.\scripts\run-functional-tests.ps1
) passNew tests cover the changes:
Description for the changelog
Licensing
This contribution is under the terms of the Apache 2.0 License:
yes