Skip to content

Commit

Permalink
Integrate with TCSHandler in /ecs-agent module
Browse files Browse the repository at this point in the history
  • Loading branch information
Realmonia committed Jun 28, 2023
1 parent 894ce44 commit 24cc036
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 794 deletions.
29 changes: 13 additions & 16 deletions agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ import (
"github.com/aws/amazon-ecs-agent/agent/sighandlers/exitcodes"
"github.com/aws/amazon-ecs-agent/agent/statemanager"
"github.com/aws/amazon-ecs-agent/agent/stats"
"github.com/aws/amazon-ecs-agent/agent/stats/reporter"
"github.com/aws/amazon-ecs-agent/agent/taskresource"
tcshandler "github.com/aws/amazon-ecs-agent/agent/tcs/handler"
"github.com/aws/amazon-ecs-agent/agent/utils"
"github.com/aws/amazon-ecs-agent/agent/utils/loader"
"github.com/aws/amazon-ecs-agent/agent/utils/mobypkgwrapper"
Expand Down Expand Up @@ -891,21 +891,18 @@ func (agent *ecsAgent) startAsyncRoutines(
}
go statsEngine.StartMetricsPublish()

telemetrySessionParams := tcshandler.TelemetrySessionParams{
Ctx: agent.ctx,
CredentialProvider: agent.credentialProvider,
Cfg: agent.cfg,
ContainerInstanceArn: agent.containerInstanceARN,
DeregisterInstanceEventStream: deregisterInstanceEventStream,
ECSClient: client,
TaskEngine: taskEngine,
StatsEngine: statsEngine,
MetricsChannel: telemetryMessages,
HealthChannel: healthMessages,
Doctor: doctor,
}
// Start metrics session in a go routine
go tcshandler.StartMetricsSession(&telemetrySessionParams)
session, err := reporter.NewDockerTelemetrySession(agent.containerInstanceARN, agent.credentialProvider, agent.cfg, deregisterInstanceEventStream,
client, taskEngine, telemetryMessages, healthMessages, doctor)
if err != nil {
seelog.Warnf("Error creating telemetry session: %v", err)
return
}
if session == nil {
seelog.Infof("Metrics disabled on the instance.")
return
}

go session.Start(agent.ctx)
}

func (agent *ecsAgent) startSpotInstanceDrainingPoller(ctx context.Context, client api.ECSClient) {
Expand Down
2 changes: 1 addition & 1 deletion agent/app/agent_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ func TestDoStartCgroupInitHappyPath(t *testing.T) {
}).Return("poll-endpoint", nil),
client.EXPECT().DiscoverPollEndpoint(gomock.Any()).Return("acs-endpoint", nil).AnyTimes(),
client.EXPECT().DiscoverTelemetryEndpoint(gomock.Any()).Do(func(x interface{}) {
// Ensures that the test waits until telemetry session has bee started
// Ensures that the test waits until telemetry session has been started
discoverEndpointsInvoked.Done()
}).Return("telemetry-endpoint", nil),
client.EXPECT().DiscoverTelemetryEndpoint(gomock.Any()).Return(
Expand Down
36 changes: 19 additions & 17 deletions agent/stats/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/aws/amazon-ecs-agent/ecs-agent/utils/retry"
"github.com/aws/amazon-ecs-agent/ecs-agent/wsclient"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/cihub/seelog"
)

const (
Expand Down Expand Up @@ -48,22 +47,20 @@ func NewDockerTelemetrySession(
taskEngine engine.TaskEngine,
metricsChannel <-chan ecstcs.TelemetryMessage,
healthChannel <-chan ecstcs.HealthMessage,
doctor *doctor.Doctor) *DockerTelemetrySession {
doctor *doctor.Doctor) (*DockerTelemetrySession, error) {
ok, cfgParseErr := isContainerHealthMetricsDisabled(cfg)
if cfgParseErr != nil {
seelog.Warnf("Error starting metrics session: %v", cfgParseErr)
return nil
logger.Warn("Error starting metrics session", logger.Fields{
field.Error: cfgParseErr,
})
return nil, cfgParseErr
}
if ok {
seelog.Warnf("Metrics were disabled, not starting the telemetry session")
return nil
logger.Warn("Metrics were disabled, not starting the telemetry session")
return nil, nil
}

agentVersion, agentHash, containerRuntimeVersion := generateVersionInfo(taskEngine)
if cfg == nil {
logger.Error("Config is empty in the tcs session parameter")
return nil
}

session := tcshandler.NewTelemetrySession(
containerInstanceArn,
Expand All @@ -90,7 +87,7 @@ func NewDockerTelemetrySession(
healthChannel,
doctor,
)
return &DockerTelemetrySession{session, ecsClient, containerInstanceArn}
return &DockerTelemetrySession{session, ecsClient, containerInstanceArn}, nil
}

// Start "overloads" tcshandler.TelemetrySession's Start with extra handling of discoverTelemetryEndpoint result.
Expand All @@ -99,18 +96,23 @@ func NewDockerTelemetrySession(
func (session *DockerTelemetrySession) Start(ctx context.Context) error {
backoff := retry.NewExponentialBackoff(time.Second, 1*time.Minute, 0.2, 2)
for {
select {
case <-ctx.Done():
logger.Info("TCS session exited cleanly.")
return nil
default:
}
endpoint, tcsError := discoverPollEndpoint(session.containerInstanceArn, session.ecsClient)
if tcsError == nil {
tcsError = session.s.StartTelemetrySession(ctx, endpoint)
}
switch tcsError {
case context.Canceled, context.DeadlineExceeded:
return tcsError
case io.EOF, nil:
if tcsError == nil || tcsError == io.EOF {
logger.Info("TCS Websocket connection closed for a valid reason")
backoff.Reset()
default:
seelog.Errorf("Error: lost websocket connection with ECS Telemetry service (TCS): %v", tcsError)
} else {
logger.Error("Error: lost websocket connection with ECS Telemetry service (TCS)", logger.Fields{
field.Error: tcsError,
})
time.Sleep(backoff.Duration())
}
}
Expand Down
125 changes: 125 additions & 0 deletions agent/stats/reporter/reporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package reporter

import (
"context"
"errors"
"testing"

"github.com/aws/amazon-ecs-agent/agent/config"
mock_engine "github.com/aws/amazon-ecs-agent/agent/engine/mocks"
"github.com/aws/amazon-ecs-agent/agent/version"
"github.com/aws/amazon-ecs-agent/ecs-agent/doctor"
"github.com/aws/amazon-ecs-agent/ecs-agent/eventstream"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
)

const (
testContainerInstanceArn = "testContainerInstanceArn"
testCluster = "testCluster"
testRegion = "us-west-2"
testDockerEndpoint = "testDockerEndpoint"
testDockerVersion = "testDockerVersion"
)

func TestNewDockerTelemetrySession(t *testing.T) {
emptyDoctor, _ := doctor.NewDoctor([]doctor.Healthcheck{}, testCluster, testContainerInstanceArn)
testCredentials := credentials.NewStaticCredentials("test-id", "test-secret", "test-token")
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockEngine := mock_engine.NewMockTaskEngine(ctrl)
mockEngine.EXPECT().Version().Return(testDockerVersion, nil)
testCases := []struct {
name string
cfg *config.Config
expectedSession bool
expectedError bool
}{
{
name: "happy case",
cfg: &config.Config{
DisableMetrics: config.BooleanDefaultFalse{},
DisableDockerHealthCheck: config.BooleanDefaultFalse{},
Cluster: testCluster,
AWSRegion: testRegion,
AcceptInsecureCert: false,
DockerEndpoint: testDockerEndpoint,
},
expectedSession: true,
expectedError: false,
},
{
name: "cfg parsing error",
cfg: nil,
expectedSession: false,
expectedError: true,
},
{
name: "metrics disabled",
cfg: &config.Config{
DisableMetrics: config.BooleanDefaultFalse{
Value: config.ExplicitlyEnabled,
},
DisableDockerHealthCheck: config.BooleanDefaultFalse{
Value: config.ExplicitlyEnabled,
},
Cluster: testCluster,
AWSRegion: testRegion,
AcceptInsecureCert: false,
DockerEndpoint: testDockerEndpoint,
},
expectedSession: false,
expectedError: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
dockerTelemetrySession, err := NewDockerTelemetrySession(
testContainerInstanceArn,
testCredentials,
tc.cfg,
eventstream.NewEventStream("Deregister_Instance", context.Background()),
nil,
mockEngine,
nil,
nil,
emptyDoctor,
)
if tc.expectedSession {
assert.NotNil(t, dockerTelemetrySession)
} else {
assert.Nil(t, dockerTelemetrySession)
}

if tc.expectedError {
assert.NotNil(t, err)
} else {
assert.NoError(t, err)
}
})
}
}

func TestGenerateVersionInfo_GetVersionError(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockEngine := mock_engine.NewMockTaskEngine(ctrl)
mockEngine.EXPECT().Version().Times(1).Return("", errors.New("error"))
agentVersion, agentHash, containerRuntimeVersion := generateVersionInfo(mockEngine)
assert.Equal(t, version.Version, agentVersion)
assert.Equal(t, version.GitShortHash, agentHash)
assert.Equal(t, "", containerRuntimeVersion)
}

func TestGenerateVersionInfo_NoError(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockEngine := mock_engine.NewMockTaskEngine(ctrl)
mockEngine.EXPECT().Version().Times(1).Return(testDockerVersion, nil)
agentVersion, agentHash, containerRuntimeVersion := generateVersionInfo(mockEngine)
assert.Equal(t, version.Version, agentVersion)
assert.Equal(t, version.GitShortHash, agentHash)
assert.Equal(t, testDockerVersion, containerRuntimeVersion)
}
Loading

0 comments on commit 24cc036

Please sign in to comment.