Skip to content

Commit

Permalink
use move tcs handler
Browse files Browse the repository at this point in the history
  • Loading branch information
Realmonia committed Jun 8, 2023
1 parent 7e640a5 commit 1986f20
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 26 deletions.
37 changes: 11 additions & 26 deletions agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,51 +20,49 @@ import (
"fmt"
"time"

"github.com/aws/amazon-ecs-agent/ecs-agent/logger"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger/field"

dockerdoctor "github.com/aws/amazon-ecs-agent/agent/doctor" // for Docker specific container instance health checks
"github.com/aws/amazon-ecs-agent/agent/eni/watcher"
"github.com/aws/amazon-ecs-agent/ecs-agent/doctor"
"github.com/aws/aws-sdk-go/aws/awserr"

"github.com/aws/amazon-ecs-agent/agent/credentials/instancecreds"
"github.com/aws/amazon-ecs-agent/agent/engine/execcmd"
"github.com/aws/amazon-ecs-agent/agent/metrics"

acshandler "github.com/aws/amazon-ecs-agent/agent/acs/handler"
"github.com/aws/amazon-ecs-agent/agent/api"
"github.com/aws/amazon-ecs-agent/agent/api/ecsclient"
"github.com/aws/amazon-ecs-agent/agent/app/factory"
"github.com/aws/amazon-ecs-agent/agent/config"
"github.com/aws/amazon-ecs-agent/agent/containermetadata"
"github.com/aws/amazon-ecs-agent/agent/credentials/instancecreds"
"github.com/aws/amazon-ecs-agent/agent/data"
"github.com/aws/amazon-ecs-agent/agent/dockerclient"
"github.com/aws/amazon-ecs-agent/agent/dockerclient/dockerapi"
"github.com/aws/amazon-ecs-agent/agent/dockerclient/sdkclientfactory"
dockerdoctor "github.com/aws/amazon-ecs-agent/agent/doctor" // for Docker specific container instance health checks
"github.com/aws/amazon-ecs-agent/agent/ec2"
"github.com/aws/amazon-ecs-agent/agent/ecs_client/model/ecs"
"github.com/aws/amazon-ecs-agent/agent/ecscni"
"github.com/aws/amazon-ecs-agent/agent/engine"
"github.com/aws/amazon-ecs-agent/agent/engine/dockerstate"
"github.com/aws/amazon-ecs-agent/agent/engine/execcmd"
engineserviceconnect "github.com/aws/amazon-ecs-agent/agent/engine/serviceconnect"
"github.com/aws/amazon-ecs-agent/agent/eni/pause"
"github.com/aws/amazon-ecs-agent/agent/eni/watcher"
"github.com/aws/amazon-ecs-agent/agent/eventhandler"
"github.com/aws/amazon-ecs-agent/agent/handlers"
"github.com/aws/amazon-ecs-agent/agent/metrics"
"github.com/aws/amazon-ecs-agent/agent/sighandlers"
"github.com/aws/amazon-ecs-agent/agent/sighandlers/exitcodes"
"github.com/aws/amazon-ecs-agent/agent/statemanager"
"github.com/aws/amazon-ecs-agent/agent/stats"
"github.com/aws/amazon-ecs-agent/agent/stats/reporter"
"github.com/aws/amazon-ecs-agent/agent/taskresource"
tcshandler "github.com/aws/amazon-ecs-agent/agent/tcs/handler"
"github.com/aws/amazon-ecs-agent/agent/utils"
"github.com/aws/amazon-ecs-agent/agent/utils/loader"
"github.com/aws/amazon-ecs-agent/agent/utils/mobypkgwrapper"
"github.com/aws/amazon-ecs-agent/agent/version"
acsclient "github.com/aws/amazon-ecs-agent/ecs-agent/acs/client"
apierrors "github.com/aws/amazon-ecs-agent/ecs-agent/api/errors"
"github.com/aws/amazon-ecs-agent/ecs-agent/credentials"
"github.com/aws/amazon-ecs-agent/ecs-agent/doctor"
"github.com/aws/amazon-ecs-agent/ecs-agent/eventstream"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger/field"
"github.com/aws/amazon-ecs-agent/ecs-agent/tcs/model/ecstcs"
"github.com/aws/amazon-ecs-agent/ecs-agent/utils/retry"
"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -866,20 +864,6 @@ func (agent *ecsAgent) startAsyncRoutines(
// Start sending events to the backend
go eventhandler.HandleEngineEvents(agent.ctx, taskEngine, client, taskHandler, attachmentEventHandler)

telemetrySessionParams := tcshandler.TelemetrySessionParams{
Ctx: agent.ctx,
CredentialProvider: agent.credentialProvider,
Cfg: agent.cfg,
ContainerInstanceArn: agent.containerInstanceARN,
DeregisterInstanceEventStream: deregisterInstanceEventStream,
ECSClient: client,
TaskEngine: taskEngine,
StatsEngine: statsEngine,
MetricsChannel: telemetryMessages,
HealthChannel: healthMessages,
Doctor: doctor,
}

err := statsEngine.MustInit(agent.ctx, taskEngine, agent.cfg.Cluster, agent.containerInstanceARN)
if err != nil {
seelog.Warnf("Error initializing metrics engine: %v", err)
Expand All @@ -888,7 +872,8 @@ func (agent *ecsAgent) startAsyncRoutines(
go statsEngine.StartMetricsPublish()

// Start metrics session in a go routine
go tcshandler.StartMetricsSession(&telemetrySessionParams)
go reporter.StartSession(agent.ctx, agent.containerInstanceARN, agent.credentialProvider, agent.cfg,
deregisterInstanceEventStream, client, taskEngine, telemetryMessages, healthMessages, doctor)
}

func (agent *ecsAgent) startSpotInstanceDrainingPoller(ctx context.Context, client api.ECSClient) {
Expand Down
115 changes: 115 additions & 0 deletions agent/stats/reporter/reporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package reporter

import (
"context"
"io"
"time"

"github.com/aws/amazon-ecs-agent/agent/api"
"github.com/aws/amazon-ecs-agent/agent/config"
"github.com/aws/amazon-ecs-agent/agent/engine"
"github.com/aws/amazon-ecs-agent/agent/version"
"github.com/aws/amazon-ecs-agent/ecs-agent/doctor"
"github.com/aws/amazon-ecs-agent/ecs-agent/eventstream"
tcshandler "github.com/aws/amazon-ecs-agent/ecs-agent/tcs/handler"
"github.com/aws/amazon-ecs-agent/ecs-agent/tcs/model/ecstcs"
"github.com/aws/amazon-ecs-agent/ecs-agent/utils/retry"
"github.com/aws/amazon-ecs-agent/ecs-agent/wsclient"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/cihub/seelog"
"github.com/pkg/errors"
)

const (
// The maximum time to wait between heartbeats without disconnecting
defaultHeartbeatTimeout = 1 * time.Minute
defaultHeartbeatJitter = 1 * time.Minute
// Default websocket client disconnection timeout initiated by agent
defaultDisconnectionTimeout = 15 * time.Minute
defaultDisconnectionJitter = 30 * time.Minute
)

func StartSession(
ctx context.Context,
containerInstanceArn string,
credentialProvider *credentials.Credentials,
cfg *config.Config,
deregisterInstanceEventStream *eventstream.EventStream,
ecsClient api.ECSClient,
taskEngine engine.TaskEngine,
metricsChannel <-chan ecstcs.TelemetryMessage,
healthChannel <-chan ecstcs.HealthMessage,
doctor *doctor.Doctor) {
ok, cfgParseErr := isContainerHealthMetricsDisabled(cfg)
if cfgParseErr != nil {
seelog.Warnf("Error starting metrics session: %v", cfgParseErr)
return
}
if ok {
seelog.Warnf("Metrics were disabled, not starting the telemetry session")
return
}

backoff := retry.NewExponentialBackoff(time.Second, 1*time.Minute, 0.2, 2)
params := tcshandler.TelemetrySessionParams{
Ctx: ctx,
ContainerInstanceArn: containerInstanceArn,
Cluster: cfg.Cluster,
DisableContainerHealthMetrics: cfg.DisableMetrics.Enabled(),
CredentialProvider: credentialProvider,
Cfg: &wsclient.WSClientMinAgentConfig{
AWSRegion: cfg.AWSRegion,
AcceptInsecureCert: cfg.AcceptInsecureCert,
DockerEndpoint: cfg.DockerEndpoint,
IsDocker: true,
},
DeregisterInstanceEventStream: deregisterInstanceEventStream,
HeartbeatTimeout: defaultHeartbeatTimeout,
HeartbeatJitterMax: defaultHeartbeatJitter,
DisconnectTimeout: defaultDisconnectionTimeout,
DisconnectJitterMax: defaultDisconnectionJitter,
MetricsChannel: metricsChannel,
HealthChannel: healthChannel,
Doctor: doctor,
}
for {
tcsError := startTelemetrySessionAttempt(&params, containerInstanceArn, taskEngine, ecsClient)
if tcsError == nil || tcsError == io.EOF {
seelog.Info("TCS Websocket connection closed for a valid reason")
backoff.Reset()
} else {
seelog.Errorf("Error: lost websocket connection with ECS Telemetry service (TCS): %v", tcsError)
params.Time().Sleep(backoff.Duration())
}
select {
case <-params.Ctx.Done():
seelog.Info("TCS session exited cleanly.")
return
default:
}
}
}

func startTelemetrySessionAttempt(params *tcshandler.TelemetrySessionParams,
containerInstanceArn string, taskEngine engine.TaskEngine, ecsClient api.ECSClient) error {
params.AgentVersion = version.Version
params.AgentHash = version.GitHashString()
if dockerVersion, getVersionErr := taskEngine.Version(); getVersionErr == nil {
params.ContainerRuntime = tcshandler.ContainerRuntimeDocker
params.ContainerRuntimeVersion = dockerVersion
}
tcsEndpoint, err := ecsClient.DiscoverTelemetryEndpoint(containerInstanceArn)
if err != nil {
seelog.Errorf("tcs: unable to discover poll endpoint: %v", err)
}
params.Endpoint = tcsEndpoint
return tcshandler.StartMetricsSession(params)
}

func isContainerHealthMetricsDisabled(cfg *config.Config) (bool, error) {
if cfg != nil {
return cfg.DisableMetrics.Enabled() && cfg.DisableDockerHealthCheck.Enabled(), nil

}
return false, errors.New("Config is empty in the tcs session parameter")
}

0 comments on commit 1986f20

Please sign in to comment.