Skip to content

Commit

Permalink
Create TCS Handler in ecs-agent model (#3731)
Browse files Browse the repository at this point in the history
  • Loading branch information
Realmonia authored Jun 20, 2023
1 parent 0de7229 commit 33570e0
Show file tree
Hide file tree
Showing 10 changed files with 1,363 additions and 113 deletions.
37 changes: 17 additions & 20 deletions agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,29 @@ import (
"fmt"
"time"

"github.com/aws/amazon-ecs-agent/ecs-agent/logger"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger/field"

dockerdoctor "github.com/aws/amazon-ecs-agent/agent/doctor" // for Docker specific container instance health checks
"github.com/aws/amazon-ecs-agent/agent/eni/watcher"
"github.com/aws/amazon-ecs-agent/ecs-agent/doctor"
"github.com/aws/aws-sdk-go/aws/awserr"

"github.com/aws/amazon-ecs-agent/agent/credentials/instancecreds"
"github.com/aws/amazon-ecs-agent/agent/engine/execcmd"
"github.com/aws/amazon-ecs-agent/agent/metrics"

acshandler "github.com/aws/amazon-ecs-agent/agent/acs/handler"
"github.com/aws/amazon-ecs-agent/agent/api"
"github.com/aws/amazon-ecs-agent/agent/api/ecsclient"
"github.com/aws/amazon-ecs-agent/agent/app/factory"
"github.com/aws/amazon-ecs-agent/agent/config"
"github.com/aws/amazon-ecs-agent/agent/containermetadata"
"github.com/aws/amazon-ecs-agent/agent/credentials/instancecreds"
"github.com/aws/amazon-ecs-agent/agent/data"
"github.com/aws/amazon-ecs-agent/agent/dockerclient"
"github.com/aws/amazon-ecs-agent/agent/dockerclient/dockerapi"
"github.com/aws/amazon-ecs-agent/agent/dockerclient/sdkclientfactory"
dockerdoctor "github.com/aws/amazon-ecs-agent/agent/doctor" // for Docker specific container instance health checks
"github.com/aws/amazon-ecs-agent/agent/ec2"
"github.com/aws/amazon-ecs-agent/agent/ecscni"
"github.com/aws/amazon-ecs-agent/agent/engine"
"github.com/aws/amazon-ecs-agent/agent/engine/dockerstate"
"github.com/aws/amazon-ecs-agent/agent/engine/execcmd"
engineserviceconnect "github.com/aws/amazon-ecs-agent/agent/engine/serviceconnect"
"github.com/aws/amazon-ecs-agent/agent/eni/pause"
"github.com/aws/amazon-ecs-agent/agent/eni/watcher"
"github.com/aws/amazon-ecs-agent/agent/eventhandler"
"github.com/aws/amazon-ecs-agent/agent/handlers"
"github.com/aws/amazon-ecs-agent/agent/metrics"
"github.com/aws/amazon-ecs-agent/agent/sighandlers"
"github.com/aws/amazon-ecs-agent/agent/sighandlers/exitcodes"
"github.com/aws/amazon-ecs-agent/agent/statemanager"
Expand All @@ -63,12 +56,17 @@ import (
acsclient "github.com/aws/amazon-ecs-agent/ecs-agent/acs/client"
apierrors "github.com/aws/amazon-ecs-agent/ecs-agent/api/errors"
"github.com/aws/amazon-ecs-agent/ecs-agent/credentials"
"github.com/aws/amazon-ecs-agent/ecs-agent/doctor"
"github.com/aws/amazon-ecs-agent/ecs-agent/ecs_client/model/ecs"
"github.com/aws/amazon-ecs-agent/ecs-agent/eventstream"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger/field"
"github.com/aws/amazon-ecs-agent/ecs-agent/tcs/model/ecstcs"
"github.com/aws/amazon-ecs-agent/ecs-agent/utils/retry"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
aws_credentials "github.com/aws/aws-sdk-go/aws/credentials"

"github.com/cihub/seelog"
"github.com/pborman/uuid"
)
Expand Down Expand Up @@ -866,6 +864,13 @@ func (agent *ecsAgent) startAsyncRoutines(
// Start sending events to the backend
go eventhandler.HandleEngineEvents(agent.ctx, taskEngine, client, taskHandler, attachmentEventHandler)

err := statsEngine.MustInit(agent.ctx, taskEngine, agent.cfg.Cluster, agent.containerInstanceARN)
if err != nil {
seelog.Warnf("Error initializing metrics engine: %v", err)
return
}
go statsEngine.StartMetricsPublish()

telemetrySessionParams := tcshandler.TelemetrySessionParams{
Ctx: agent.ctx,
CredentialProvider: agent.credentialProvider,
Expand All @@ -879,14 +884,6 @@ func (agent *ecsAgent) startAsyncRoutines(
HealthChannel: healthMessages,
Doctor: doctor,
}

err := statsEngine.MustInit(agent.ctx, taskEngine, agent.cfg.Cluster, agent.containerInstanceARN)
if err != nil {
seelog.Warnf("Error initializing metrics engine: %v", err)
return
}
go statsEngine.StartMetricsPublish()

// Start metrics session in a go routine
go tcshandler.StartMetricsSession(&telemetrySessionParams)
}
Expand Down
147 changes: 147 additions & 0 deletions agent/stats/reporter/reporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package reporter

import (
"context"
"errors"
"io"
"time"

"github.com/aws/amazon-ecs-agent/agent/api"
"github.com/aws/amazon-ecs-agent/agent/config"
"github.com/aws/amazon-ecs-agent/agent/engine"
"github.com/aws/amazon-ecs-agent/agent/version"
"github.com/aws/amazon-ecs-agent/ecs-agent/doctor"
"github.com/aws/amazon-ecs-agent/ecs-agent/eventstream"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger/field"
tcshandler "github.com/aws/amazon-ecs-agent/ecs-agent/tcs/handler"
"github.com/aws/amazon-ecs-agent/ecs-agent/tcs/model/ecstcs"
"github.com/aws/amazon-ecs-agent/ecs-agent/utils/retry"
"github.com/aws/amazon-ecs-agent/ecs-agent/wsclient"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/cihub/seelog"
)

const (
// The maximum time to wait between heartbeats without disconnecting
defaultHeartbeatTimeout = 1 * time.Minute
defaultHeartbeatJitter = 1 * time.Minute
// Default websocket client disconnection timeout initiated by agent
defaultDisconnectionTimeout = 15 * time.Minute
defaultDisconnectionJitter = 30 * time.Minute
)

type DockerTelemetrySession struct {
s tcshandler.TelemetrySession
ecsClient api.ECSClient
containerInstanceArn string
}

// NewDockerTelemetrySession returns creates a DockerTelemetrySession, which has a tcshandler.TelemetrySession embedded.
// tcshandler.TelemetrySession contains the logic to manage the TCSClient and corresponding websocket connection
func NewDockerTelemetrySession(
containerInstanceArn string,
credentialProvider *credentials.Credentials,
cfg *config.Config,
deregisterInstanceEventStream *eventstream.EventStream,
ecsClient api.ECSClient,
taskEngine engine.TaskEngine,
metricsChannel <-chan ecstcs.TelemetryMessage,
healthChannel <-chan ecstcs.HealthMessage,
doctor *doctor.Doctor) *DockerTelemetrySession {
ok, cfgParseErr := isContainerHealthMetricsDisabled(cfg)
if cfgParseErr != nil {
seelog.Warnf("Error starting metrics session: %v", cfgParseErr)
return nil
}
if ok {
seelog.Warnf("Metrics were disabled, not starting the telemetry session")
return nil
}

agentVersion, agentHash, containerRuntimeVersion := generateVersionInfo(taskEngine)
if cfg == nil {
logger.Error("Config is empty in the tcs session parameter")
return nil
}

session := tcshandler.NewTelemetrySession(
containerInstanceArn,
cfg.Cluster,
agentVersion,
agentHash,
containerRuntimeVersion,
"", // this will be overridden by DockerTelemetrySession.Start()
cfg.DisableMetrics.Enabled(),
credentialProvider,
&wsclient.WSClientMinAgentConfig{
AWSRegion: cfg.AWSRegion,
AcceptInsecureCert: cfg.AcceptInsecureCert,
DockerEndpoint: cfg.DockerEndpoint,
IsDocker: true,
},
deregisterInstanceEventStream,
defaultHeartbeatTimeout,
defaultHeartbeatJitter,
defaultDisconnectionTimeout,
defaultDisconnectionJitter,
nil,
metricsChannel,
healthChannel,
doctor,
)
return &DockerTelemetrySession{session, ecsClient, containerInstanceArn}
}

// Start "overloads" tcshandler.TelemetrySession's Start with extra handling of discoverTelemetryEndpoint result.
// discoverTelemetryEndpoint and tcshandler.TelemetrySession's StartTelemetrySession errors are handled
// (retryWithBackoff or return) in a combined manner
func (session *DockerTelemetrySession) Start(ctx context.Context) error {
backoff := retry.NewExponentialBackoff(time.Second, 1*time.Minute, 0.2, 2)
for {
endpoint, tcsError := discoverPollEndpoint(session.containerInstanceArn, session.ecsClient)
if tcsError == nil {
tcsError = session.s.StartTelemetrySession(ctx, endpoint)
}
switch tcsError {
case context.Canceled, context.DeadlineExceeded:
return tcsError
case io.EOF, nil:
logger.Info("TCS Websocket connection closed for a valid reason")
backoff.Reset()
default:
seelog.Errorf("Error: lost websocket connection with ECS Telemetry service (TCS): %v", tcsError)
time.Sleep(backoff.Duration())
}
}
}

// generateVersionInfo generates the agentVersion, agentHash and containerRuntimeVersion from dockerTaskEngine state
func generateVersionInfo(taskEngine engine.TaskEngine) (string, string, string) {
agentVersion := version.Version
agentHash := version.GitHashString()
var containerRuntimeVersion string
if dockerVersion, getVersionErr := taskEngine.Version(); getVersionErr == nil {
containerRuntimeVersion = dockerVersion
}

return agentVersion, agentHash, containerRuntimeVersion
}

// discoverPollEndpoint calls DiscoverTelemetryEndpoint to get the TCS endpoint url for TCS client to connect
func discoverPollEndpoint(containerInstanceArn string, ecsClient api.ECSClient) (string, error) {
tcsEndpoint, err := ecsClient.DiscoverTelemetryEndpoint(containerInstanceArn)
if err != nil {
logger.Error("tcs: unable to discover poll endpoint", logger.Fields{
field.Error: err,
})
}
return tcsEndpoint, err
}

func isContainerHealthMetricsDisabled(cfg *config.Config) (bool, error) {
if cfg != nil {
return cfg.DisableMetrics.Enabled() && cfg.DisableDockerHealthCheck.Enabled(), nil
}
return false, errors.New("config is empty in the tcs session parameter")
}
1 change: 0 additions & 1 deletion agent/tcs/handler/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ type TelemetrySessionParams struct {
CredentialProvider *credentials.Credentials
Cfg *config.Config
DeregisterInstanceEventStream *eventstream.EventStream
AcceptInvalidCert bool
ECSClient api.ECSClient
TaskEngine engine.TaskEngine
StatsEngine *stats.DockerStatsEngine
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 33570e0

Please sign in to comment.