From 958fe51f029530f5d9710b36ed34231266e233b6 Mon Sep 17 00:00:00 2001 From: Sharanya Devaraj Date: Thu, 25 Apr 2019 21:40:12 +0000 Subject: [PATCH 01/17] tcs: Update tcs model for network and storage stats --- agent/tcs/model/api/api-2.json | 56 +++++++- agent/tcs/model/ecstcs/api.go | 234 +++++++++++++++++++++++++++++++++ 2 files changed, 285 insertions(+), 5 deletions(-) diff --git a/agent/tcs/model/api/api-2.json b/agent/tcs/model/api/api-2.json index e050811d5e2..1c79cbf9e27 100644 --- a/agent/tcs/model/api/api-2.json +++ b/agent/tcs/model/api/api-2.json @@ -123,7 +123,7 @@ "members":{ "min":{"shape":"Double"}, "max":{"shape":"Double"}, - "sampleCount":{"shape":"Integer"}, + "sampleCount":{"shape":"UInteger"}, "sum":{"shape":"Double"} } }, @@ -142,8 +142,11 @@ "ContainerMetric":{ "type":"structure", "members":{ + "containerArn":{"shape":"String"}, "cpuStatsSet":{"shape":"CWStatsSet"}, - "memoryStatsSet":{"shape":"CWStatsSet"} + "memoryStatsSet":{"shape":"CWStatsSet"}, + "networkStatsSet":{"shape":"NetworkStatsSet"}, + "storageStatsSet":{"shape":"StorageStatsSet"} } }, "ContainerMetrics":{ @@ -174,7 +177,6 @@ "healthy":{"shape":"Boolean"} } }, - "Integer":{"type":"integer"}, "InvalidParameterException":{ "type":"structure", "members":{ @@ -192,6 +194,19 @@ "fin":{"shape":"Boolean"} } }, + "NetworkStatsSet":{ + "type":"structure", + "members":{ + "rxBytes":{"shape":"ULongStatsSet"}, + "rxDropped":{"shape":"ULongStatsSet"}, + "rxErrors":{"shape":"ULongStatsSet"}, + "rxPackets":{"shape":"ULongStatsSet"}, + "txBytes":{"shape":"ULongStatsSet"}, + "txDropped":{"shape":"ULongStatsSet"}, + "txErrors":{"shape":"ULongStatsSet"}, + "txPackets":{"shape":"ULongStatsSet"} + } + }, "PublishHealthRequest":{ "type":"structure", "members":{ @@ -235,6 +250,13 @@ "message":{"shape":"String"} } }, + "StorageStatsSet":{ + "type":"structure", + "members":{ + "readSizeBytes":{"shape":"ULongStatsSet"}, + "writeSizeBytes":{"shape":"ULongStatsSet"} + } + }, "String":{"type":"string"}, "TaskHealth":{ "type":"structure", @@ -260,8 +282,32 @@ }, "TaskMetrics":{ "type":"list", - "member":{"shape":"TaskMetric"} + "member":{"shape":"TaskMetric"}, + "max":10 + }, + "Timestamp":{"type":"timestamp"}, + "UInteger":{ + "type":"integer", + "min":0 }, - "Timestamp":{"type":"timestamp"} + "ULong":{ + "type":"long", + "min":0 + }, + "ULongStatsSet":{ + "type":"structure", + "required":[ + "min", + "max", + "sampleCount", + "sum" + ], + "members":{ + "min":{"shape":"ULong"}, + "max":{"shape":"ULong"}, + "sampleCount":{"shape":"ULong"}, + "sum":{"shape":"ULong"} + } + } } } diff --git a/agent/tcs/model/ecstcs/api.go b/agent/tcs/model/ecstcs/api.go index 45f925528c2..06f34f7fab7 100644 --- a/agent/tcs/model/ecstcs/api.go +++ b/agent/tcs/model/ecstcs/api.go @@ -14,9 +14,11 @@ package ecstcs import ( + "fmt" "time" "github.com/aws/aws-sdk-go/aws/awsutil" + "github.com/aws/aws-sdk-go/aws/request" ) type AckPublishHealth struct { @@ -112,9 +114,15 @@ func (s ContainerHealth) GoString() string { type ContainerMetric struct { _ struct{} `type:"structure"` + ContainerArn *string `locationName:"containerArn" type:"string"` + CpuStatsSet *CWStatsSet `locationName:"cpuStatsSet" type:"structure"` MemoryStatsSet *CWStatsSet `locationName:"memoryStatsSet" type:"structure"` + + NetworkStatsSet *NetworkStatsSet `locationName:"networkStatsSet" type:"structure"` + + StorageStatsSet *StorageStatsSet `locationName:"storageStatsSet" type:"structure"` } // String returns the string representation @@ -127,6 +135,26 @@ func (s ContainerMetric) GoString() string { return s.String() } +// Validate inspects the fields of the type to determine if they are valid. +func (s *ContainerMetric) Validate() error { + invalidParams := request.ErrInvalidParams{Context: "ContainerMetric"} + if s.NetworkStatsSet != nil { + if err := s.NetworkStatsSet.Validate(); err != nil { + invalidParams.AddNested("NetworkStatsSet", err.(request.ErrInvalidParams)) + } + } + if s.StorageStatsSet != nil { + if err := s.StorageStatsSet.Validate(); err != nil { + invalidParams.AddNested("StorageStatsSet", err.(request.ErrInvalidParams)) + } + } + + if invalidParams.Len() > 0 { + return invalidParams + } + return nil +} + type HealthMetadata struct { _ struct{} `type:"structure"` @@ -235,6 +263,86 @@ func (s MetricsMetadata) GoString() string { return s.String() } +type NetworkStatsSet struct { + _ struct{} `type:"structure"` + + RxBytes *ULongStatsSet `locationName:"rxBytes" type:"structure"` + + RxDropped *ULongStatsSet `locationName:"rxDropped" type:"structure"` + + RxErrors *ULongStatsSet `locationName:"rxErrors" type:"structure"` + + RxPackets *ULongStatsSet `locationName:"rxPackets" type:"structure"` + + TxBytes *ULongStatsSet `locationName:"txBytes" type:"structure"` + + TxDropped *ULongStatsSet `locationName:"txDropped" type:"structure"` + + TxErrors *ULongStatsSet `locationName:"txErrors" type:"structure"` + + TxPackets *ULongStatsSet `locationName:"txPackets" type:"structure"` +} + +// String returns the string representation +func (s NetworkStatsSet) String() string { + return awsutil.Prettify(s) +} + +// GoString returns the string representation +func (s NetworkStatsSet) GoString() string { + return s.String() +} + +// Validate inspects the fields of the type to determine if they are valid. +func (s *NetworkStatsSet) Validate() error { + invalidParams := request.ErrInvalidParams{Context: "NetworkStatsSet"} + if s.RxBytes != nil { + if err := s.RxBytes.Validate(); err != nil { + invalidParams.AddNested("RxBytes", err.(request.ErrInvalidParams)) + } + } + if s.RxDropped != nil { + if err := s.RxDropped.Validate(); err != nil { + invalidParams.AddNested("RxDropped", err.(request.ErrInvalidParams)) + } + } + if s.RxErrors != nil { + if err := s.RxErrors.Validate(); err != nil { + invalidParams.AddNested("RxErrors", err.(request.ErrInvalidParams)) + } + } + if s.RxPackets != nil { + if err := s.RxPackets.Validate(); err != nil { + invalidParams.AddNested("RxPackets", err.(request.ErrInvalidParams)) + } + } + if s.TxBytes != nil { + if err := s.TxBytes.Validate(); err != nil { + invalidParams.AddNested("TxBytes", err.(request.ErrInvalidParams)) + } + } + if s.TxDropped != nil { + if err := s.TxDropped.Validate(); err != nil { + invalidParams.AddNested("TxDropped", err.(request.ErrInvalidParams)) + } + } + if s.TxErrors != nil { + if err := s.TxErrors.Validate(); err != nil { + invalidParams.AddNested("TxErrors", err.(request.ErrInvalidParams)) + } + } + if s.TxPackets != nil { + if err := s.TxPackets.Validate(); err != nil { + invalidParams.AddNested("TxPackets", err.(request.ErrInvalidParams)) + } + } + + if invalidParams.Len() > 0 { + return invalidParams + } + return nil +} + type PublishHealthInput struct { _ struct{} `type:"structure"` @@ -311,6 +419,26 @@ func (s PublishMetricsInput) GoString() string { return s.String() } +// Validate inspects the fields of the type to determine if they are valid. +func (s *PublishMetricsInput) Validate() error { + invalidParams := request.ErrInvalidParams{Context: "PublishMetricsInput"} + if s.TaskMetrics != nil { + for i, v := range s.TaskMetrics { + if v == nil { + continue + } + if err := v.Validate(); err != nil { + invalidParams.AddNested(fmt.Sprintf("%s[%v]", "TaskMetrics", i), err.(request.ErrInvalidParams)) + } + } + } + + if invalidParams.Len() > 0 { + return invalidParams + } + return nil +} + type PublishMetricsOutput struct { _ struct{} `type:"structure"` @@ -447,6 +575,44 @@ func (s StopTelemetrySessionMessage) GoString() string { return s.String() } +type StorageStatsSet struct { + _ struct{} `type:"structure"` + + ReadSizeBytes *ULongStatsSet `locationName:"readSizeBytes" type:"structure"` + + WriteSizeBytes *ULongStatsSet `locationName:"writeSizeBytes" type:"structure"` +} + +// String returns the string representation +func (s StorageStatsSet) String() string { + return awsutil.Prettify(s) +} + +// GoString returns the string representation +func (s StorageStatsSet) GoString() string { + return s.String() +} + +// Validate inspects the fields of the type to determine if they are valid. +func (s *StorageStatsSet) Validate() error { + invalidParams := request.ErrInvalidParams{Context: "StorageStatsSet"} + if s.ReadSizeBytes != nil { + if err := s.ReadSizeBytes.Validate(); err != nil { + invalidParams.AddNested("ReadSizeBytes", err.(request.ErrInvalidParams)) + } + } + if s.WriteSizeBytes != nil { + if err := s.WriteSizeBytes.Validate(); err != nil { + invalidParams.AddNested("WriteSizeBytes", err.(request.ErrInvalidParams)) + } + } + + if invalidParams.Len() > 0 { + return invalidParams + } + return nil +} + type TaskHealth struct { _ struct{} `type:"structure"` @@ -490,3 +656,71 @@ func (s TaskMetric) String() string { func (s TaskMetric) GoString() string { return s.String() } + +// Validate inspects the fields of the type to determine if they are valid. +func (s *TaskMetric) Validate() error { + invalidParams := request.ErrInvalidParams{Context: "TaskMetric"} + if s.ContainerMetrics != nil { + for i, v := range s.ContainerMetrics { + if v == nil { + continue + } + if err := v.Validate(); err != nil { + invalidParams.AddNested(fmt.Sprintf("%s[%v]", "ContainerMetrics", i), err.(request.ErrInvalidParams)) + } + } + } + + if invalidParams.Len() > 0 { + return invalidParams + } + return nil +} + +type ULongStatsSet struct { + _ struct{} `type:"structure"` + + // Max is a required field + Max *int64 `locationName:"max" type:"long" required:"true"` + + // Min is a required field + Min *int64 `locationName:"min" type:"long" required:"true"` + + // SampleCount is a required field + SampleCount *int64 `locationName:"sampleCount" type:"long" required:"true"` + + // Sum is a required field + Sum *int64 `locationName:"sum" type:"long" required:"true"` +} + +// String returns the string representation +func (s ULongStatsSet) String() string { + return awsutil.Prettify(s) +} + +// GoString returns the string representation +func (s ULongStatsSet) GoString() string { + return s.String() +} + +// Validate inspects the fields of the type to determine if they are valid. +func (s *ULongStatsSet) Validate() error { + invalidParams := request.ErrInvalidParams{Context: "ULongStatsSet"} + if s.Max == nil { + invalidParams.Add(request.NewErrParamRequired("Max")) + } + if s.Min == nil { + invalidParams.Add(request.NewErrParamRequired("Min")) + } + if s.SampleCount == nil { + invalidParams.Add(request.NewErrParamRequired("SampleCount")) + } + if s.Sum == nil { + invalidParams.Add(request.NewErrParamRequired("Sum")) + } + + if invalidParams.Len() > 0 { + return invalidParams + } + return nil +} From 64ade5fae3678469eeabda69864f79b5650a7971 Mon Sep 17 00:00:00 2001 From: Sharanya Devaraj Date: Tue, 30 Apr 2019 17:43:10 +0000 Subject: [PATCH 02/17] add overflow field for int64 --- agent/tcs/model/api/api-2.json | 5 ++++- agent/tcs/model/ecstcs/api.go | 6 ++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/agent/tcs/model/api/api-2.json b/agent/tcs/model/api/api-2.json index 1c79cbf9e27..b46e3770c38 100644 --- a/agent/tcs/model/api/api-2.json +++ b/agent/tcs/model/api/api-2.json @@ -306,7 +306,10 @@ "min":{"shape":"ULong"}, "max":{"shape":"ULong"}, "sampleCount":{"shape":"ULong"}, - "sum":{"shape":"ULong"} + "sum":{"shape":"ULong"}, + "overflowMin":{"shape":"ULong"}, + "overflowMax":{"shape":"ULong"}, + "overflowSum":{"shape":"ULong"} } } } diff --git a/agent/tcs/model/ecstcs/api.go b/agent/tcs/model/ecstcs/api.go index 06f34f7fab7..7566dee170d 100644 --- a/agent/tcs/model/ecstcs/api.go +++ b/agent/tcs/model/ecstcs/api.go @@ -686,6 +686,12 @@ type ULongStatsSet struct { // Min is a required field Min *int64 `locationName:"min" type:"long" required:"true"` + OverflowMax *int64 `locationName:"overflowMax" type:"long"` + + OverflowMin *int64 `locationName:"overflowMin" type:"long"` + + OverflowSum *int64 `locationName:"overflowSum" type:"long"` + // SampleCount is a required field SampleCount *int64 `locationName:"sampleCount" type:"long" required:"true"` From 02529957004a28e343d4b72e7600882b4de9cd1f Mon Sep 17 00:00:00 2001 From: Ray Allan Date: Tue, 30 Apr 2019 22:02:30 +0000 Subject: [PATCH 03/17] adds StorageReadBytes and StorageWriteBytes to stats collection --- agent/stats/common_test.go | 4 +- agent/stats/engine_test.go | 4 +- agent/stats/queue.go | 97 ++++++++++++++++++++++-- agent/stats/queue_test.go | 126 +++++++++++++++++++++++++++---- agent/stats/types.go | 10 ++- agent/stats/unix_test_stats.json | 61 +++++++++++++++ agent/stats/utils_unix.go | 22 +++++- agent/stats/utils_unix_test.go | 50 ++++++------ 8 files changed, 317 insertions(+), 57 deletions(-) create mode 100644 agent/stats/unix_test_stats.json diff --git a/agent/stats/common_test.go b/agent/stats/common_test.go index 652eeaf0a4e..8c561805760 100644 --- a/agent/stats/common_test.go +++ b/agent/stats/common_test.go @@ -248,8 +248,8 @@ func validateEmptyTaskHealthMetrics(t *testing.T, engine *DockerStatsEngine) { func createFakeContainerStats() []*ContainerStats { return []*ContainerStats{ - {22400432, 1839104, parseNanoTime("2015-02-12T21:22:05.131117533Z")}, - {116499979, 3649536, parseNanoTime("2015-02-12T21:22:05.232291187Z")}, + {22400432, 1839104, uint64(0), uint64(0), parseNanoTime("2015-02-12T21:22:05.131117533Z")}, + {116499979, 3649536, uint64(0), uint64(0), parseNanoTime("2015-02-12T21:22:05.232291187Z")}, } } diff --git a/agent/stats/engine_test.go b/agent/stats/engine_test.go index 7c77628e1c3..67c1b1eb2a3 100644 --- a/agent/stats/engine_test.go +++ b/agent/stats/engine_test.go @@ -191,8 +191,8 @@ func TestStatsEngineMetadataInStatsSets(t *testing.T) { ts1 := parseNanoTime("2015-02-12T21:22:05.131117533Z") ts2 := parseNanoTime("2015-02-12T21:22:05.232291187Z") containerStats := []*ContainerStats{ - {22400432, 1839104, ts1}, - {116499979, 3649536, ts2}, + {22400432, 1839104, uint64(0), uint64(0), ts1}, + {116499979, 3649536, uint64(0), uint64(0), ts2}, } dockerStats := []*types.StatsJSON{{}, {},} dockerStats[0].Read = ts1 diff --git a/agent/stats/queue.go b/agent/stats/queue.go index d0ac5416082..304c977eb51 100644 --- a/agent/stats/queue.go +++ b/agent/stats/queue.go @@ -82,6 +82,8 @@ func (queue *Queue) add(rawStat *ContainerStats) { stat := UsageStats{ CPUUsagePerc: float32(nan32()), MemoryUsageInMegs: uint32(rawStat.memoryUsage / BytesInMiB), + StorageReadBytes: rawStat.storageReadBytes, + StorageWriteBytes: rawStat.storageWriteBytes, Timestamp: rawStat.timestamp, cpuUsage: rawStat.cpuUsage, } @@ -125,6 +127,16 @@ func (queue *Queue) GetMemoryStatsSet() (*ecstcs.CWStatsSet, error) { return queue.getCWStatsSet(getMemoryUsagePerc) } +// GetStorageReadStatsSet gets the stats set for aggregate storage read +func (queue *Queue) GetStorageReadStatsSet() (*ecstcs.ULongStatsSet, error) { + return queue.getULongStatsSet(getStorageReadBytes) +} + +// GetStorageWriteStatsSet gets the stats set for aggregate storage written +func (queue *Queue) GetStorageWriteStatsSet() (*ecstcs.ULongStatsSet, error) { + return queue.getULongStatsSet(getStorageWriteBytes) +} + // GetRawUsageStats gets the array of most recent raw UsageStats, in descending // order of timestamps. func (queue *Queue) GetRawUsageStats(numStats int) ([]UsageStats, error) { @@ -147,6 +159,8 @@ func (queue *Queue) GetRawUsageStats(numStats int) ([]UsageStats, error) { usageStats[i] = UsageStats{ CPUUsagePerc: rawUsageStat.CPUUsagePerc, MemoryUsageInMegs: rawUsageStat.MemoryUsageInMegs, + StorageReadBytes: rawUsageStat.StorageReadBytes, + StorageWriteBytes: rawUsageStat.StorageWriteBytes, Timestamp: rawUsageStat.Timestamp, } } @@ -162,7 +176,26 @@ func getMemoryUsagePerc(s *UsageStats) float64 { return float64(s.MemoryUsageInMegs) } -type getUsageFunc func(*UsageStats) float64 +func getStorageReadBytes(s *UsageStats) uint64 { + return s.StorageReadBytes +} + +func getStorageWriteBytes(s *UsageStats) uint64 { + return s.StorageWriteBytes +} + +// getInt64WithOverflow truncates a uint64 to fit an int64 +// it returns overflow as a second int64 +func getInt64WithOverflow(uintStat uint64) (int64, int64) { + if uintStat > math.MaxInt64 { + overflow := int64(uintStat % uint64(math.MaxInt64)) + return math.MaxInt64, overflow + } + return int64(uintStat), int64(0) +} + +type getUsageFloatFunc func(*UsageStats) float64 +type getUsageIntFunc func(*UsageStats) uint64 func (queue *Queue) resetThresholdElapsed(timeout time.Duration) bool { queue.lock.RLock() @@ -179,14 +212,14 @@ func (queue *Queue) enoughDatapointsInBuffer() bool { // getCWStatsSet gets the stats set for either CPU or Memory based on the // function pointer. -func (queue *Queue) getCWStatsSet(f getUsageFunc) (*ecstcs.CWStatsSet, error) { +func (queue *Queue) getCWStatsSet(f getUsageFloatFunc) (*ecstcs.CWStatsSet, error) { queue.lock.Lock() defer queue.lock.Unlock() queueLength := len(queue.buffer) if queueLength < 2 { // Need at least 2 data points to calculate this. - return nil, fmt.Errorf("No data in the queue") + return nil, fmt.Errorf("We need at least 2 data points in queue to calculate float stats") } var min, max, sum float64 @@ -197,15 +230,15 @@ func (queue *Queue) getCWStatsSet(f getUsageFunc) (*ecstcs.CWStatsSet, error) { sampleCount = 0 for _, stat := range queue.buffer { - perc := f(&stat) - if math.IsNaN(perc) { + thisStat := f(&stat) + if math.IsNaN(thisStat) { continue } - min = math.Min(min, perc) - max = math.Max(max, perc) + min = math.Min(min, thisStat) + max = math.Max(max, thisStat) sampleCount++ - sum += perc + sum += thisStat } return &ecstcs.CWStatsSet{ @@ -215,3 +248,51 @@ func (queue *Queue) getCWStatsSet(f getUsageFunc) (*ecstcs.CWStatsSet, error) { Sum: &sum, }, nil } + +// getULongStatsSet gets the stats set for the specified raw stat type +// stats come from docker as uint64 type, and by neccesity are packed into int64 type +// where there is overflow (math.MaxInt64 + 1 or greater) +// we capture the excess in optional overflow fields. +func (queue *Queue) getULongStatsSet(f getUsageIntFunc) (*ecstcs.ULongStatsSet, error) { + queue.lock.Lock() + defer queue.lock.Unlock() + + queueLength := len(queue.buffer) + if queueLength < 2 { + // Need at least 2 data points to calculate this. + return nil, fmt.Errorf("We need at least 2 data points in the queue to calculate int stats") + } + + var min, max, sum uint64 + var sampleCount int64 + min = math.MaxUint64 + max = 0 + sum = 0 + sampleCount = 0 + + for _, stat := range queue.buffer { + thisStat := f(&stat) + if thisStat < min { + min = thisStat + } + if thisStat > max { + max = thisStat + } + sum += thisStat + sampleCount++ + } + + baseMin, overflowMin := getInt64WithOverflow(min) + baseMax, overflowMax := getInt64WithOverflow(max) + baseSum, overflowSum := getInt64WithOverflow(sum) + + return &ecstcs.ULongStatsSet{ + Max: &baseMax, + OverflowMax: &overflowMax, + Min: &baseMin, + OverflowMin: &overflowMin, + SampleCount: &sampleCount, + Sum: &baseSum, + OverflowSum: &overflowSum, + }, nil +} diff --git a/agent/stats/queue_test.go b/agent/stats/queue_test.go index 2ee219d5c89..19cad6c503a 100644 --- a/agent/stats/queue_test.go +++ b/agent/stats/queue_test.go @@ -30,6 +30,9 @@ const ( // predictableHighMemoryUtilizationInMiB is the expected Memory usage in MiB for // the "predictableHighMemoryUtilizationInBytes" value (7377772544 / (1024 * 1024)) predictableHighMemoryUtilizationInMiB = 7035 + // the "predictableInt64Overflow" requires 3 metrics to guarantee overflow + // note math.MaxInt64 is odd, so integer division will trim off 1 + predictableInt64Overflow = math.MaxInt64 / int64(2) ) func getTimestamps() []time.Time { @@ -60,7 +63,7 @@ func getTimestamps() []time.Time { } -func getCPUTimes() []uint64 { +func getUintStats() []uint64 { return []uint64{ 22400432, 116499979, @@ -122,18 +125,34 @@ func getPredictableHighMemoryUtilizationInBytes(size int) []uint64 { return memBytes } -func createQueue(size int, predictableHighMemoryUtilization bool) *Queue { +func getLargeInt64Stats(size int) []uint64 { + var uintStats []uint64 + for i := 0; i < size; i++ { + uintStats = append(uintStats, uint64(predictableInt64Overflow)) + } + return uintStats +} + +func createQueue(size int, predictableHighUtilization bool) *Queue { timestamps := getTimestamps() - cpuTimes := getCPUTimes() + cpuTimes := getUintStats() var memoryUtilizationInBytes []uint64 - if predictableHighMemoryUtilization { + var uintStats []uint64 + if predictableHighUtilization { memoryUtilizationInBytes = getPredictableHighMemoryUtilizationInBytes(len(cpuTimes)) + uintStats = getLargeInt64Stats(len(cpuTimes)) } else { memoryUtilizationInBytes = getRandomMemoryUtilizationInBytes() + uintStats = getUintStats() } queue := NewQueue(size) for i, time := range timestamps { - queue.add(&ContainerStats{cpuUsage: cpuTimes[i], memoryUsage: memoryUtilizationInBytes[i], timestamp: time}) + queue.add(&ContainerStats{ + cpuUsage: cpuTimes[i], + memoryUsage: memoryUtilizationInBytes[i], + storageReadBytes: uintStats[i], + storageWriteBytes: uintStats[i], + timestamp: time}) } return queue } @@ -141,7 +160,7 @@ func createQueue(size int, predictableHighMemoryUtilization bool) *Queue { func TestQueueAddRemove(t *testing.T) { timestamps := getTimestamps() queueLength := 5 - // Set predictableHighMemoryUtilization to false, expect random values when aggregated. + // Set predictableHighUtilization to false, expect random values when aggregated. queue := createQueue(queueLength, false) buf := queue.buffer if len(buf) != queueLength { @@ -157,7 +176,7 @@ func TestQueueAddRemove(t *testing.T) { cpuStatsSet, err := queue.GetCPUStatsSet() if err != nil { - t.Error("Error gettting cpu stats set:", err) + t.Error("Error getting cpu stats set:", err) } if *cpuStatsSet.Min == math.MaxFloat64 || math.IsNaN(*cpuStatsSet.Min) { t.Error("Min value incorrectly set: ", *cpuStatsSet.Min) @@ -174,12 +193,12 @@ func TestQueueAddRemove(t *testing.T) { memStatsSet, err := queue.GetMemoryStatsSet() if err != nil { - t.Error("Error gettting memory stats set:", err) + t.Error("Error getting memory stats set:", err) } - if *memStatsSet.Min == float64(-math.MaxFloat32) { + if *memStatsSet.Min == math.MaxFloat64 || math.IsNaN(*memStatsSet.Min) { t.Error("Min value incorrectly set: ", *memStatsSet.Min) } - if *memStatsSet.Max == 0 { + if *memStatsSet.Max == -math.MaxFloat64 || math.IsNaN(*memStatsSet.Max) { t.Error("Max value incorrectly set: ", *memStatsSet.Max) } if *memStatsSet.SampleCount != int64(queueLength) { @@ -189,9 +208,45 @@ func TestQueueAddRemove(t *testing.T) { t.Error("Sum value incorrectly set: ", *memStatsSet.Sum) } + storageReadStatsSet, err := queue.GetStorageReadStatsSet() + if err != nil { + t.Error("Error getting storage read stats set:", err) + } + // assuming min is initialized to math.MaxUint64 then truncated + if *storageReadStatsSet.Min == int64(math.MaxInt64) && + *storageReadStatsSet.OverflowMin == int64(math.MaxInt64) { + t.Error("Min value incorrectly set: ", *storageReadStatsSet.Min) + } + if *storageReadStatsSet.Max == 0 { + t.Error("Max value incorrectly set: ", *storageReadStatsSet.Max) + } + if *storageReadStatsSet.SampleCount != int64(queueLength) { + t.Error("Expected samplecount: ", queueLength, " got: ", *storageReadStatsSet.SampleCount) + } + if *storageReadStatsSet.Sum == 0 { + t.Error("Sum value incorrectly set: ", *storageReadStatsSet.Sum) + } + + storageWriteStatsSet, err := queue.GetStorageWriteStatsSet() + if err != nil { + t.Error("Error getting storage read stats set:", err) + } + if *storageWriteStatsSet.Min == int64(math.MaxInt64) { + t.Error("Min value incorrectly set: ", *storageWriteStatsSet.Min) + } + if *storageWriteStatsSet.Max == 0 { + t.Error("Max value incorrectly set: ", *storageWriteStatsSet.Max) + } + if *storageWriteStatsSet.SampleCount != int64(queueLength) { + t.Error("Expected samplecount: ", queueLength, " got: ", *storageWriteStatsSet.SampleCount) + } + if *storageWriteStatsSet.Sum == 0 { + t.Error("Sum value incorrectly set: ", *storageWriteStatsSet.Sum) + } + rawUsageStats, err := queue.GetRawUsageStats(2 * queueLength) if err != nil { - t.Error("Error gettting raw usage stats: ", err) + t.Error("Error getting raw usage stats: ", err) } if len(rawUsageStats) != queueLength { @@ -215,10 +270,37 @@ func TestQueueAddRemove(t *testing.T) { } +func TestQueueUintStats(t *testing.T) { + queueLength := 3 + queue := createQueue(queueLength, true) + buf := queue.buffer + if len(buf) != queueLength { + t.Errorf("Buffer size is incorrect. Expected: %d, Got: %d", queueLength, len(buf)) + } + + storageReadStatsSet, err := queue.GetStorageReadStatsSet() + + if err != nil { + t.Error("Error getting storage read stats set:", err) + } + // assuming min is initialized to math.MaxUint64 then truncated + // min/max should be the same as predictableInt64Overflow + // their overflow should be 0 + assert.Equal(t, *storageReadStatsSet.Min, predictableInt64Overflow) + assert.Equal(t, *storageReadStatsSet.OverflowMin, int64(0)) + assert.Equal(t, *storageReadStatsSet.Max, predictableInt64Overflow) + assert.Equal(t, *storageReadStatsSet.OverflowMax, int64(0)) + // the sum of three predictableInt64Overflow should be equal to MaxInt64 + // with an overflow of predictableInt64Overflow - 1 + // (see the definition of predictableInt64Overflow for why -1) + assert.Equal(t, *storageReadStatsSet.Sum, int64(math.MaxInt64)) + assert.Equal(t, *storageReadStatsSet.OverflowSum, predictableInt64Overflow-1) +} + func TestQueueAddPredictableHighMemoryUtilization(t *testing.T) { timestamps := getTimestamps() queueLength := 5 - // Set predictableHighMemoryUtilization to true + // Set predictableHighUtilization to true // This lets us compare the computed values against pre-computed expected values queue := createQueue(queueLength, true) buf := queue.buffer @@ -235,7 +317,7 @@ func TestQueueAddPredictableHighMemoryUtilization(t *testing.T) { memStatsSet, err := queue.GetMemoryStatsSet() if err != nil { - t.Error("Error gettting memory stats set:", err) + t.Error("Error getting memory stats set:", err) } // Test if both min and max for memory utilization are set to 7035MiB @@ -257,6 +339,24 @@ func TestQueueAddPredictableHighMemoryUtilization(t *testing.T) { } } +// tests just below and just above the threshold (+/- 1) of int64 +func TestUintOverflow(t *testing.T) { + var underUint, overUint uint64 + underUint = uint64(math.MaxInt64 - 1) + overUint = uint64(math.MaxInt64 + 1) + + baseUnderUint, overflowUnderUint := getInt64WithOverflow(underUint) + baseMaxInt, overflowMaxInt := getInt64WithOverflow(uint64(math.MaxInt64)) + baseOverUint, overflowOverUint := getInt64WithOverflow(overUint) + + assert.Equal(t, baseUnderUint, int64(math.MaxInt64-1)) + assert.Equal(t, overflowUnderUint, int64(0)) + assert.Equal(t, baseMaxInt, int64(math.MaxInt64)) + assert.Equal(t, overflowMaxInt, int64(0)) + assert.Equal(t, baseOverUint, int64(math.MaxInt64)) + assert.Equal(t, overflowOverUint, int64(1)) +} + func TestCpuStatsSetNotSetToInfinity(t *testing.T) { // timestamps will be used to simulate +Inf CPU Usage // timestamps[0] = timestamps[1] diff --git a/agent/stats/types.go b/agent/stats/types.go index 4df1cadb059..afd1c7e9669 100644 --- a/agent/stats/types.go +++ b/agent/stats/types.go @@ -24,15 +24,19 @@ import ( // ContainerStats encapsulates the raw CPU and memory utilization from cgroup fs. type ContainerStats struct { - cpuUsage uint64 - memoryUsage uint64 - timestamp time.Time + cpuUsage uint64 + memoryUsage uint64 + storageReadBytes uint64 + storageWriteBytes uint64 + timestamp time.Time } // UsageStats abstracts the format in which the queue stores data. type UsageStats struct { CPUUsagePerc float32 `json:"cpuUsagePerc"` MemoryUsageInMegs uint32 `json:"memoryUsageInMegs"` + StorageReadBytes uint64 `json:"storageReadBytes"` + StorageWriteBytes uint64 `json:"storageWriteBytes"` Timestamp time.Time `json:"timestamp"` cpuUsage uint64 } diff --git a/agent/stats/unix_test_stats.json b/agent/stats/unix_test_stats.json new file mode 100644 index 00000000000..14b93c435b7 --- /dev/null +++ b/agent/stats/unix_test_stats.json @@ -0,0 +1,61 @@ +{ + "blkio_stats": { + "io_service_bytes_recursive": [ + { + "major": 202, + "minor": 192, + "op": "Read", + "value": 1 + }, + { + "major": 202, + "minor": 192, + "op": "Write", + "value": 5 + }, + { + "major": 202, + "minor": 26368, + "op": "Read", + "value": 1 + }, + { + "major": 202, + "minor": 26368, + "op": "Write", + "value": 5 + }, + { + "major": 253, + "minor": 1, + "op": "Read", + "value": 1 + }, + { + "major": 253, + "minor": 1, + "op": "Write", + "value": 5 + } + ] + }, + "cpu_stats": { + "cpu_usage": { + "total_usage": 262857821518, + "percpu_usage": [ + 129951031737, + 132906789781 + ] + }, + "online_cpus": 2 + }, + "memory_stats":{ + "usage": 30, + "max_usage": 100, + "stats": { + "cache": 20, + "rss": 10 + }, + "privateworkingset": 10 + } +} diff --git a/agent/stats/utils_unix.go b/agent/stats/utils_unix.go index 562ee8e8d58..c7c770d9b9d 100644 --- a/agent/stats/utils_unix.go +++ b/agent/stats/utils_unix.go @@ -31,9 +31,25 @@ func dockerStatsToContainerStats(dockerStats *types.StatsJSON) (*ContainerStats, cpuUsage := dockerStats.CPUStats.CPUUsage.TotalUsage / numCores memoryUsage := dockerStats.MemoryStats.Usage - dockerStats.MemoryStats.Stats["cache"] + // initialize block io and loop over stats to aggregate + storageReadBytes := uint64(0) + storageWriteBytes := uint64(0) + for _, blockStat := range dockerStats.BlkioStats.IoServiceBytesRecursive { + switch op := blockStat.Op; op { + case "Read": + storageReadBytes += blockStat.Value + case "Write": + storageWriteBytes += blockStat.Value + default: + //ignoring "Async", "Total", "Sum", etc + continue + } + } return &ContainerStats{ - cpuUsage: cpuUsage, - memoryUsage: memoryUsage, - timestamp: dockerStats.Read, + cpuUsage: cpuUsage, + memoryUsage: memoryUsage, + storageReadBytes: storageReadBytes, + storageWriteBytes: storageWriteBytes, + timestamp: dockerStats.Read, }, nil } diff --git a/agent/stats/utils_unix_test.go b/agent/stats/utils_unix_test.go index f3d3c07f081..acba581db9c 100644 --- a/agent/stats/utils_unix_test.go +++ b/agent/stats/utils_unix_test.go @@ -16,7 +16,8 @@ package stats import ( "encoding/json" - "fmt" + "io/ioutil" + "path/filepath" "testing" "github.com/docker/docker/api/types" @@ -25,43 +26,40 @@ import ( ) func TestDockerStatsToContainerStatsZeroCoresGeneratesError(t *testing.T) { - // doing this with json makes me sad, but is the easiest way to deal with - // the inner structs - jsonStat := fmt.Sprintf(` - { - "cpu_stats":{ - "cpu_usage":{ - "total_usage":%d - } - } - }`, 100) + inputJsonFile, _ := filepath.Abs("./unix_test_stats.json") + jsonBytes, _ := ioutil.ReadFile(inputJsonFile) dockerStat := &types.StatsJSON{} - json.Unmarshal([]byte(jsonStat), dockerStat) + json.Unmarshal([]byte(jsonBytes), dockerStat) + // empty the PercpuUsage array + dockerStat.CPUStats.CPUUsage.PercpuUsage = make([]uint64, 0) _, err := dockerStatsToContainerStats(dockerStat) assert.Error(t, err, "expected error converting container stats with empty PercpuUsage") } func TestDockerStatsToContainerStatsCpuUsage(t *testing.T) { - // doing this with json makes me sad, but is the easiest way to deal with - // the inner structs - // numCores is a global variable in package agent/stats // which denotes the number of cpu cores + // TODO should we take this from the docker stats `online_cpus`? numCores = 4 - jsonStat := fmt.Sprintf(` - { - "cpu_stats":{ - "cpu_usage":{ - "percpu_usage":[%d, %d, %d, %d], - "total_usage":%d - } - } - }`, 1, 2, 3, 4, 100) + inputJsonFile, _ := filepath.Abs("./unix_test_stats.json") + jsonBytes, _ := ioutil.ReadFile(inputJsonFile) dockerStat := &types.StatsJSON{} - json.Unmarshal([]byte(jsonStat), dockerStat) + json.Unmarshal([]byte(jsonBytes), dockerStat) containerStats, err := dockerStatsToContainerStats(dockerStat) assert.NoError(t, err, "converting container stats failed") + require.NotNil(t, containerStats, "containerStats should not be nil") + assert.Equal(t, uint64(65714455379), containerStats.cpuUsage, "unexpected value for cpuUsage", containerStats.cpuUsage) +} +func TestDockerStatsToContainerStatsStorageBytes(t *testing.T) { + inputJsonFile, _ := filepath.Abs("./unix_test_stats.json") + jsonBytes, _ := ioutil.ReadFile(inputJsonFile) + dockerStat := &types.StatsJSON{} + json.Unmarshal([]byte(jsonBytes), dockerStat) + containerStats, err := dockerStatsToContainerStats(dockerStat) + assert.NoError(t, err, "converting container stats failed") require.NotNil(t, containerStats, "containerStats should not be nil") - assert.Equal(t, uint64(25), containerStats.cpuUsage, "unexpected value for cpuUsage", containerStats.cpuUsage) + + assert.Equal(t, uint64(3), containerStats.storageReadBytes, "unexpected value for storageReadBytes", containerStats.storageReadBytes) + assert.Equal(t, uint64(15), containerStats.storageWriteBytes, "Unexpected value for storageWriteBytes", containerStats.storageWriteBytes) } From f3827a68bc5fdc825cc512ff42d8a3f2cdda7cee Mon Sep 17 00:00:00 2001 From: Sharanya Devaraj Date: Tue, 30 Apr 2019 22:02:30 +0000 Subject: [PATCH 04/17] stats: add network stats to container metrics --- agent/stats/common_test.go | 17 ++++- agent/stats/engine.go | 11 ++- agent/stats/engine_test.go | 16 ++++- agent/stats/queue.go | 101 +++++++++++++++++++++++++++- agent/stats/queue_test.go | 53 ++++++++++++++- agent/stats/types.go | 24 +++++-- agent/stats/unix_test_stats.json | 22 ++++++ agent/stats/utils.go | 20 ++++++ agent/stats/utils_test.go | 24 +++++++ agent/stats/utils_unix.go | 21 ++++-- agent/stats/utils_unix_test.go | 18 ++--- agent/stats/utils_windows.go | 8 ++- agent/stats/utils_windows_test.go | 16 +++++ agent/stats/windows_test_stats.json | 20 ++++++ 14 files changed, 334 insertions(+), 37 deletions(-) create mode 100644 agent/stats/windows_test_stats.json diff --git a/agent/stats/common_test.go b/agent/stats/common_test.go index 8c561805760..68db421be24 100644 --- a/agent/stats/common_test.go +++ b/agent/stats/common_test.go @@ -154,6 +154,9 @@ func validateContainerMetrics(containerMetrics []*ecstcs.ContainerMetric, expect if containerMetric.MemoryStatsSet == nil { return fmt.Errorf("MemoryStatsSet is nil") } + if containerMetric.NetworkStatsSet == nil { + return fmt.Errorf("NetworkStatsSet is nil") + } } return nil } @@ -247,9 +250,19 @@ func validateEmptyTaskHealthMetrics(t *testing.T, engine *DockerStatsEngine) { } func createFakeContainerStats() []*ContainerStats { + netStats := &NetworkStats{ + rxBytes: 796, + rxDropped: 6, + rxErrors: 0, + rxPackets: 10, + txBytes: 8192, + txDropped: 5, + txErrors: 0, + txPackets: 60, + } return []*ContainerStats{ - {22400432, 1839104, uint64(0), uint64(0), parseNanoTime("2015-02-12T21:22:05.131117533Z")}, - {116499979, 3649536, uint64(0), uint64(0), parseNanoTime("2015-02-12T21:22:05.232291187Z")}, + {22400432, 1839104, uint64(0), uint64(0), netStats, parseNanoTime("2015-02-12T21:22:05.131117533Z")}, + {116499979, 3649536, uint64(0), uint64(0), netStats, parseNanoTime("2015-02-12T21:22:05.232291187Z")}, } } diff --git a/agent/stats/engine.go b/agent/stats/engine.go index 540661f5fca..09dbbe0ca58 100644 --- a/agent/stats/engine.go +++ b/agent/stats/engine.go @@ -601,9 +601,16 @@ func (engine *DockerStatsEngine) taskContainerMetricsUnsafe(taskArn string) ([]* continue } + networkStatsSet, err := container.statsQueue.GetNetworkStatsSet() + if err != nil { + // we log the error and still continue to publish cpu, memory stats and empty/partial network stats + seelog.Warnf("Error getting network stats: %v, container: %v", err, dockerID) + } + containerMetrics = append(containerMetrics, &ecstcs.ContainerMetric{ - CpuStatsSet: cpuStatsSet, - MemoryStatsSet: memoryStatsSet, + CpuStatsSet: cpuStatsSet, + MemoryStatsSet: memoryStatsSet, + NetworkStatsSet: networkStatsSet, }) } diff --git a/agent/stats/engine_test.go b/agent/stats/engine_test.go index 67c1b1eb2a3..f6a98d3490c 100644 --- a/agent/stats/engine_test.go +++ b/agent/stats/engine_test.go @@ -190,11 +190,21 @@ func TestStatsEngineMetadataInStatsSets(t *testing.T) { engine.addAndStartStatsContainer("c1") ts1 := parseNanoTime("2015-02-12T21:22:05.131117533Z") ts2 := parseNanoTime("2015-02-12T21:22:05.232291187Z") + netStats := &NetworkStats{ + rxBytes: 796, + rxDropped: 6, + rxErrors: 0, + rxPackets: 10, + txBytes: 8192, + txDropped: 5, + txErrors: 0, + txPackets: 60, + } containerStats := []*ContainerStats{ - {22400432, 1839104, uint64(0), uint64(0), ts1}, - {116499979, 3649536, uint64(0), uint64(0), ts2}, + {22400432, 1839104, uint64(0), uint64(0), netStats, ts1}, + {116499979, 3649536, uint64(0), uint64(0), netStats, ts2}, } - dockerStats := []*types.StatsJSON{{}, {},} + dockerStats := []*types.StatsJSON{{}, {}} dockerStats[0].Read = ts1 dockerStats[1].Read = ts2 containers, _ := engine.tasksToContainers["t1"] diff --git a/agent/stats/queue.go b/agent/stats/queue.go index 304c977eb51..a577eb841e8 100644 --- a/agent/stats/queue.go +++ b/agent/stats/queue.go @@ -84,6 +84,7 @@ func (queue *Queue) add(rawStat *ContainerStats) { MemoryUsageInMegs: uint32(rawStat.memoryUsage / BytesInMiB), StorageReadBytes: rawStat.storageReadBytes, StorageWriteBytes: rawStat.storageWriteBytes, + NetworkStats: rawStat.networkStats, Timestamp: rawStat.timestamp, cpuUsage: rawStat.cpuUsage, } @@ -137,6 +138,101 @@ func (queue *Queue) GetStorageWriteStatsSet() (*ecstcs.ULongStatsSet, error) { return queue.getULongStatsSet(getStorageWriteBytes) } +// GetNetworkStatsSet gets the stats set for network metrics. +func (queue *Queue) GetNetworkStatsSet() (*ecstcs.NetworkStatsSet, error) { + networkStatsSet := &ecstcs.NetworkStatsSet{} + var err error + networkStatsSet.RxBytes, err = queue.getULongStatsSet(getNetworkRxBytes) + if err != nil { + seelog.Warnf("Error getting network rx bytes: %v", err) + } + networkStatsSet.RxDropped, err = queue.getULongStatsSet(getNetworkRxDropped) + if err != nil { + seelog.Warnf("Error getting network rx dropped: %v", err) + } + networkStatsSet.RxErrors, err = queue.getULongStatsSet(getNetworkRxErrors) + if err != nil { + seelog.Warnf("Error getting network rx errors: %v", err) + } + networkStatsSet.RxPackets, err = queue.getULongStatsSet(getNetworkRxPackets) + if err != nil { + seelog.Warnf("Error getting network rx packets: %v", err) + } + networkStatsSet.TxBytes, err = queue.getULongStatsSet(getNetworkTxBytes) + if err != nil { + seelog.Warnf("Error getting network tx bytes: %v", err) + } + networkStatsSet.TxDropped, err = queue.getULongStatsSet(getNetworkTxDropped) + if err != nil { + seelog.Warnf("Error getting network tx dropped: %v", err) + } + networkStatsSet.TxErrors, err = queue.getULongStatsSet(getNetworkTxErrors) + if err != nil { + seelog.Warnf("Error getting network tx errors: %v", err) + } + networkStatsSet.TxPackets, err = queue.getULongStatsSet(getNetworkTxPackets) + if err != nil { + seelog.Warnf("Error getting network tx packets: %v", err) + } + return networkStatsSet, err +} + +func getNetworkRxBytes(s *UsageStats) uint64 { + if s.NetworkStats != nil { + return s.NetworkStats.rxBytes + } + return uint64(0) +} + +func getNetworkRxDropped(s *UsageStats) uint64 { + if s.NetworkStats != nil { + return s.NetworkStats.rxDropped + } + return uint64(0) +} + +func getNetworkRxErrors(s *UsageStats) uint64 { + if s.NetworkStats != nil { + return s.NetworkStats.rxErrors + } + return uint64(0) +} + +func getNetworkRxPackets(s *UsageStats) uint64 { + if s.NetworkStats != nil { + return s.NetworkStats.rxPackets + } + return uint64(0) +} + +func getNetworkTxBytes(s *UsageStats) uint64 { + if s.NetworkStats != nil { + return s.NetworkStats.txBytes + } + return uint64(0) +} + +func getNetworkTxDropped(s *UsageStats) uint64 { + if s.NetworkStats != nil { + return s.NetworkStats.txDropped + } + return uint64(0) +} + +func getNetworkTxErrors(s *UsageStats) uint64 { + if s.NetworkStats != nil { + return s.NetworkStats.txErrors + } + return uint64(0) +} + +func getNetworkTxPackets(s *UsageStats) uint64 { + if s.NetworkStats != nil { + return s.NetworkStats.txPackets + } + return uint64(0) +} + // GetRawUsageStats gets the array of most recent raw UsageStats, in descending // order of timestamps. func (queue *Queue) GetRawUsageStats(numStats int) ([]UsageStats, error) { @@ -161,6 +257,7 @@ func (queue *Queue) GetRawUsageStats(numStats int) ([]UsageStats, error) { MemoryUsageInMegs: rawUsageStat.MemoryUsageInMegs, StorageReadBytes: rawUsageStat.StorageReadBytes, StorageWriteBytes: rawUsageStat.StorageWriteBytes, + NetworkStats: rawUsageStat.NetworkStats, Timestamp: rawUsageStat.Timestamp, } } @@ -219,7 +316,7 @@ func (queue *Queue) getCWStatsSet(f getUsageFloatFunc) (*ecstcs.CWStatsSet, erro queueLength := len(queue.buffer) if queueLength < 2 { // Need at least 2 data points to calculate this. - return nil, fmt.Errorf("We need at least 2 data points in queue to calculate float stats") + return nil, fmt.Errorf("Need at least 2 data points in queue to calculate CW stats set") } var min, max, sum float64 @@ -260,7 +357,7 @@ func (queue *Queue) getULongStatsSet(f getUsageIntFunc) (*ecstcs.ULongStatsSet, queueLength := len(queue.buffer) if queueLength < 2 { // Need at least 2 data points to calculate this. - return nil, fmt.Errorf("We need at least 2 data points in the queue to calculate int stats") + return nil, fmt.Errorf("Need at least 2 data points in the queue to calculate int stats") } var min, max, sum uint64 diff --git a/agent/stats/queue_test.go b/agent/stats/queue_test.go index 19cad6c503a..b738991fc44 100644 --- a/agent/stats/queue_test.go +++ b/agent/stats/queue_test.go @@ -20,6 +20,8 @@ import ( "testing" "time" + "github.com/aws/amazon-ecs-agent/agent/tcs/model/ecstcs" + "github.com/aws/aws-sdk-go/aws" "github.com/stretchr/testify/assert" ) @@ -152,7 +154,17 @@ func createQueue(size int, predictableHighUtilization bool) *Queue { memoryUsage: memoryUtilizationInBytes[i], storageReadBytes: uintStats[i], storageWriteBytes: uintStats[i], - timestamp: time}) + networkStats: &NetworkStats{ + rxBytes: uintStats[i], + rxDropped: 0, + rxErrors: uintStats[i], + rxPackets: uintStats[i], + txBytes: uintStats[i], + txDropped: uintStats[i], + txErrors: 0, + txPackets: uintStats[i], + }, + timestamp: time}) } return queue } @@ -244,6 +256,10 @@ func TestQueueAddRemove(t *testing.T) { t.Error("Sum value incorrectly set: ", *storageWriteStatsSet.Sum) } + netStatsSet, err := queue.GetNetworkStatsSet() + assert.NoError(t, err, "error getting network stats set") + validateNetStatsSet(t, netStatsSet, queueLength) + rawUsageStats, err := queue.GetRawUsageStats(2 * queueLength) if err != nil { t.Error("Error getting raw usage stats: ", err) @@ -270,6 +286,41 @@ func TestQueueAddRemove(t *testing.T) { } +func validateNetStatsSet(t *testing.T, netStats *ecstcs.NetworkStatsSet, queueLen int) { + // checking only the fields RxBytes, RxDropped, TxBytes, TxErrors since others are similar + assert.NotEqual(t, int64(math.MaxInt64), *netStats.RxBytes.Min, "incorrect rxbytes min") + assert.Equal(t, int64(0), *netStats.RxBytes.OverflowMin, "incorrect rxbytes overlfowMin") + assert.NotEqual(t, int64(0), *netStats.RxBytes.Max, "incorrect rxbytes max") + assert.Equal(t, int64(0), *netStats.RxBytes.OverflowMax, "incorrect rxbytes overlfowMax") + assert.Equal(t, int64(queueLen), *netStats.RxBytes.SampleCount, "incorrect rxbytes sampleCount") + assert.NotEqual(t, int64(0), *netStats.RxBytes.Sum, "incorrect rxbytes sum") + assert.Equal(t, int64(0), *netStats.RxBytes.OverflowSum, "incorrect rxbytes overlfowSum") + + assert.Equal(t, int64(0), *netStats.RxDropped.Min, "incorrect RxDropped min") + assert.Equal(t, int64(0), *netStats.RxDropped.OverflowMin, "incorrect RxDropped overlfowMin") + assert.Equal(t, int64(0), *netStats.RxDropped.Max, "incorrect RxDropped max") + assert.Equal(t, int64(0), *netStats.RxDropped.OverflowMax, "incorrect RxDropped overlfowMax") + assert.Equal(t, int64(queueLen), *netStats.RxDropped.SampleCount, "incorrect RxDropped sampleCount") + assert.Equal(t, int64(0), *netStats.RxDropped.Sum, "incorrect RxDropped sum") + assert.Equal(t, int64(0), *netStats.RxDropped.OverflowSum, "incorrect RxDropped overlfowSum") + + assert.NotEqual(t, int64(math.MaxInt64), *netStats.TxBytes.Min, "incorrect TxBytes min") + assert.Equal(t, int64(0), *netStats.TxBytes.OverflowMin, "incorrect TxBytes overlfowMin") + assert.NotEqual(t, int64(0), *netStats.TxBytes.Max, "incorrect TxBytes max") + assert.Equal(t, int64(0), *netStats.TxBytes.OverflowMax, "incorrect TxBytes overlfowMax") + assert.Equal(t, int64(queueLen), *netStats.TxBytes.SampleCount, "incorrect TxBytes sampleCount") + assert.NotEqual(t, int64(0), *netStats.TxBytes.Sum, "incorrect TxBytes sum") + assert.Equal(t, int64(0), *netStats.TxBytes.OverflowSum, "incorrect TxBytes overlfowSum") + + assert.Equal(t, int64(0), *netStats.TxErrors.Min, "incorrect TxErrors min") + assert.Equal(t, int64(0), *netStats.TxErrors.OverflowMin, "incorrect TxErrors overlfowMin") + assert.Equal(t, int64(0), *netStats.TxErrors.Max, "incorrect TxErrors max") + assert.Equal(t, int64(0), *netStats.TxErrors.OverflowMax, "incorrect TxErrors overlfowMax") + assert.Equal(t, int64(queueLen), *netStats.TxErrors.SampleCount, "incorrect TxErrors sampleCount") + assert.Equal(t, int64(0), *netStats.TxErrors.Sum, "incorrect TxErrors sum") + assert.Equal(t, int64(0), *netStats.TxErrors.OverflowSum, "incorrect TxErrors overlfowSum") +} + func TestQueueUintStats(t *testing.T) { queueLength := 3 queue := createQueue(queueLength, true) diff --git a/agent/stats/types.go b/agent/stats/types.go index afd1c7e9669..419455736f7 100644 --- a/agent/stats/types.go +++ b/agent/stats/types.go @@ -28,16 +28,30 @@ type ContainerStats struct { memoryUsage uint64 storageReadBytes uint64 storageWriteBytes uint64 + networkStats *NetworkStats timestamp time.Time } +// NetworkStats contains the network stats information for a container +type NetworkStats struct { + rxBytes uint64 + rxDropped uint64 + rxErrors uint64 + rxPackets uint64 + txBytes uint64 + txDropped uint64 + txErrors uint64 + txPackets uint64 +} + // UsageStats abstracts the format in which the queue stores data. type UsageStats struct { - CPUUsagePerc float32 `json:"cpuUsagePerc"` - MemoryUsageInMegs uint32 `json:"memoryUsageInMegs"` - StorageReadBytes uint64 `json:"storageReadBytes"` - StorageWriteBytes uint64 `json:"storageWriteBytes"` - Timestamp time.Time `json:"timestamp"` + CPUUsagePerc float32 `json:"cpuUsagePerc"` + MemoryUsageInMegs uint32 `json:"memoryUsageInMegs"` + StorageReadBytes uint64 `json:"storageReadBytes"` + StorageWriteBytes uint64 `json:"storageWriteBytes"` + NetworkStats *NetworkStats `json:"networkStats"` + Timestamp time.Time `json:"timestamp"` cpuUsage uint64 } diff --git a/agent/stats/unix_test_stats.json b/agent/stats/unix_test_stats.json index 14b93c435b7..8188d2251ea 100644 --- a/agent/stats/unix_test_stats.json +++ b/agent/stats/unix_test_stats.json @@ -57,5 +57,27 @@ "rss": 10 }, "privateworkingset": 10 + }, + "networks": { + "eth0": { + "rx_bytes": 796, + "rx_packets": 10, + "rx_errors": 0, + "rx_dropped": 0, + "tx_bytes": 8192, + "tx_packets": 112, + "tx_errors": 0, + "tx_dropped": 5 + }, + "eth1": { + "rx_bytes": 300, + "rx_packets": 4, + "rx_errors": 0, + "rx_dropped": 1, + "tx_bytes": 800, + "tx_packets": 11, + "tx_errors": 0, + "tx_dropped": 5 + } } } diff --git a/agent/stats/utils.go b/agent/stats/utils.go index e2ddd17a007..1e0fafeba58 100644 --- a/agent/stats/utils.go +++ b/agent/stats/utils.go @@ -20,6 +20,7 @@ import ( "time" "github.com/cihub/seelog" + "github.com/docker/docker/api/types" ) // networkStatsErrorPattern defines the pattern that is used to evaluate @@ -50,3 +51,22 @@ func isNetworkStatsError(err error) bool { return matched } + +func getNetworkStats(dockerStats *types.StatsJSON) *NetworkStats { + if dockerStats.Networks == nil { + return nil + } + networkStats := &NetworkStats{} + for _, netStats := range dockerStats.Networks { + networkStats.rxBytes += netStats.RxBytes + networkStats.rxDropped += netStats.RxDropped + networkStats.rxErrors += netStats.RxErrors + networkStats.rxPackets += netStats.RxPackets + + networkStats.txBytes += netStats.TxBytes + networkStats.txDropped += netStats.TxDropped + networkStats.txErrors += netStats.TxErrors + networkStats.txPackets += netStats.TxPackets + } + return networkStats +} diff --git a/agent/stats/utils_test.go b/agent/stats/utils_test.go index 6adbb6a40a4..8feee783bf4 100644 --- a/agent/stats/utils_test.go +++ b/agent/stats/utils_test.go @@ -21,6 +21,19 @@ import ( "testing" "github.com/docker/docker/api/types" + "github.com/stretchr/testify/assert" +) + +const ( + // below is the sum of each field in each network interface json in unix_test_stats.json + expectedRxBytes = uint64(1096) + expectedRxPackets = uint64(14) + expectedRxDropped = uint64(1) + expectedRxErrors = uint64(0) + expectedTxBytes = uint64(8992) + expectedTxPackets = uint64(123) + expectedTxDropped = uint64(10) + expectedTxErrors = uint64(0) ) func TestIsNetworkStatsError(t *testing.T) { @@ -69,3 +82,14 @@ func TestDockerStatsToContainerStatsMemUsage(t *testing.T) { t.Error("Unexpected value for memoryUsage", containerStats.memoryUsage) } } + +func validateNetworkMetrics(t *testing.T, netStats *NetworkStats) { + assert.Equal(t, expectedRxBytes, netStats.rxBytes) + assert.Equal(t, expectedRxPackets, netStats.rxPackets) + assert.Equal(t, expectedRxDropped, netStats.rxDropped) + assert.Equal(t, expectedRxErrors, netStats.rxErrors) + assert.Equal(t, expectedTxBytes, netStats.txBytes) + assert.Equal(t, expectedTxPackets, netStats.txPackets) + assert.Equal(t, expectedTxDropped, netStats.txDropped) + assert.Equal(t, expectedTxErrors, netStats.txErrors) +} diff --git a/agent/stats/utils_unix.go b/agent/stats/utils_unix.go index c7c770d9b9d..9b53f5379cd 100644 --- a/agent/stats/utils_unix.go +++ b/agent/stats/utils_unix.go @@ -31,6 +31,19 @@ func dockerStatsToContainerStats(dockerStats *types.StatsJSON) (*ContainerStats, cpuUsage := dockerStats.CPUStats.CPUUsage.TotalUsage / numCores memoryUsage := dockerStats.MemoryStats.Usage - dockerStats.MemoryStats.Stats["cache"] + storageReadBytes, storageWriteBytes := getStorageStats(dockerStats) + networkStats := getNetworkStats(dockerStats) + return &ContainerStats{ + cpuUsage: cpuUsage, + memoryUsage: memoryUsage, + storageReadBytes: storageReadBytes, + storageWriteBytes: storageWriteBytes, + networkStats: networkStats, + timestamp: dockerStats.Read, + }, nil +} + +func getStorageStats(dockerStats *types.StatsJSON) (uint64, uint64) { // initialize block io and loop over stats to aggregate storageReadBytes := uint64(0) storageWriteBytes := uint64(0) @@ -45,11 +58,5 @@ func dockerStatsToContainerStats(dockerStats *types.StatsJSON) (*ContainerStats, continue } } - return &ContainerStats{ - cpuUsage: cpuUsage, - memoryUsage: memoryUsage, - storageReadBytes: storageReadBytes, - storageWriteBytes: storageWriteBytes, - timestamp: dockerStats.Read, - }, nil + return storageReadBytes, storageWriteBytes } diff --git a/agent/stats/utils_unix_test.go b/agent/stats/utils_unix_test.go index acba581db9c..980abd44532 100644 --- a/agent/stats/utils_unix_test.go +++ b/agent/stats/utils_unix_test.go @@ -36,7 +36,7 @@ func TestDockerStatsToContainerStatsZeroCoresGeneratesError(t *testing.T) { assert.Error(t, err, "expected error converting container stats with empty PercpuUsage") } -func TestDockerStatsToContainerStatsCpuUsage(t *testing.T) { +func TestDockerStatsToContainerStats(t *testing.T) { // numCores is a global variable in package agent/stats // which denotes the number of cpu cores // TODO should we take this from the docker stats `online_cpus`? @@ -49,17 +49,11 @@ func TestDockerStatsToContainerStatsCpuUsage(t *testing.T) { assert.NoError(t, err, "converting container stats failed") require.NotNil(t, containerStats, "containerStats should not be nil") assert.Equal(t, uint64(65714455379), containerStats.cpuUsage, "unexpected value for cpuUsage", containerStats.cpuUsage) -} - -func TestDockerStatsToContainerStatsStorageBytes(t *testing.T) { - inputJsonFile, _ := filepath.Abs("./unix_test_stats.json") - jsonBytes, _ := ioutil.ReadFile(inputJsonFile) - dockerStat := &types.StatsJSON{} - json.Unmarshal([]byte(jsonBytes), dockerStat) - containerStats, err := dockerStatsToContainerStats(dockerStat) - assert.NoError(t, err, "converting container stats failed") - require.NotNil(t, containerStats, "containerStats should not be nil") - + // storage bytes check assert.Equal(t, uint64(3), containerStats.storageReadBytes, "unexpected value for storageReadBytes", containerStats.storageReadBytes) assert.Equal(t, uint64(15), containerStats.storageWriteBytes, "Unexpected value for storageWriteBytes", containerStats.storageWriteBytes) + // network stats check + netStats := containerStats.networkStats + assert.NotNil(t, netStats, "networkStats should not be nil") + validateNetworkMetrics(t, netStats) } diff --git a/agent/stats/utils_windows.go b/agent/stats/utils_windows.go index a38dde8b320..047dec45207 100644 --- a/agent/stats/utils_windows.go +++ b/agent/stats/utils_windows.go @@ -30,9 +30,11 @@ func dockerStatsToContainerStats(dockerStats *types.StatsJSON) (*ContainerStats, cpuUsage := (dockerStats.CPUStats.CPUUsage.TotalUsage * 100) / numCores memoryUsage := dockerStats.MemoryStats.PrivateWorkingSet + networkStats := getNetworkStats(dockerStats) return &ContainerStats{ - cpuUsage: cpuUsage, - memoryUsage: memoryUsage, - timestamp: dockerStats.Read, + cpuUsage: cpuUsage, + memoryUsage: memoryUsage, + timestamp: dockerStats.Read, + networkStats: networkStats, }, nil } diff --git a/agent/stats/utils_windows_test.go b/agent/stats/utils_windows_test.go index 5d4159325a2..e5d3177bd1c 100644 --- a/agent/stats/utils_windows_test.go +++ b/agent/stats/utils_windows_test.go @@ -17,6 +17,8 @@ package stats import ( "encoding/json" "fmt" + "io/ioutil" + "path/filepath" "testing" "github.com/docker/docker/api/types" @@ -62,3 +64,17 @@ func TestDockerStatsToContainerStatsCpuUsage(t *testing.T) { require.NotNil(t, containerStats, "containerStats should not be nil") assert.Equal(t, uint64(2500), containerStats.cpuUsage, "unexpected value for cpuUsage", containerStats.cpuUsage) } + +func TestDockerStatsToContainerStats(t *testing.T) { + inputJsonFile, _ := filepath.Abs("./windows_test_stats.json") + jsonBytes, _ := ioutil.ReadFile(inputJsonFile) + dockerStat := &types.StatsJSON{} + json.Unmarshal([]byte(jsonBytes), dockerStat) + containerStats, err := dockerStatsToContainerStats(dockerStat) + assert.NoError(t, err, "converting container stats failed") + require.NotNil(t, containerStats, "containerStats should not be nil") + // network stats check + netStats := containerStats.networkStats + assert.NotNil(t, netStats, "networkStats should not be nil") + validateNetworkMetrics(t, netStats) +} diff --git a/agent/stats/windows_test_stats.json b/agent/stats/windows_test_stats.json new file mode 100644 index 00000000000..38f40d8c3f2 --- /dev/null +++ b/agent/stats/windows_test_stats.json @@ -0,0 +1,20 @@ +{ + "networks": { + "eth0": { + "rx_bytes": 796, + "rx_packets": 10, + "rx_dropped": 0, + "tx_bytes": 8192, + "tx_packets": 112, + "tx_dropped": 5 + }, + "eth1": { + "rx_bytes": 300, + "rx_packets": 4, + "rx_dropped": 1, + "tx_bytes": 800, + "tx_packets": 11, + "tx_dropped": 5 + } + } +} From 2b02a4a3a47f53bff5c6dca8d562da9fc2717466 Mon Sep 17 00:00:00 2001 From: Sharanya Devaraj Date: Fri, 3 May 2019 18:09:59 +0000 Subject: [PATCH 05/17] remove network stats addition to task metrics --- agent/stats/common_test.go | 19 ++++++++----------- agent/stats/engine.go | 13 +++---------- agent/stats/engine_test.go | 15 +-------------- agent/stats/queue.go | 16 ++++++++-------- agent/stats/queue_test.go | 17 ++++++++--------- agent/stats/types.go | 16 ++++++++-------- agent/stats/utils.go | 17 +++++++++-------- agent/stats/utils_test.go | 16 ++++++++-------- 8 files changed, 53 insertions(+), 76 deletions(-) diff --git a/agent/stats/common_test.go b/agent/stats/common_test.go index 68db421be24..4c19f71794d 100644 --- a/agent/stats/common_test.go +++ b/agent/stats/common_test.go @@ -154,9 +154,6 @@ func validateContainerMetrics(containerMetrics []*ecstcs.ContainerMetric, expect if containerMetric.MemoryStatsSet == nil { return fmt.Errorf("MemoryStatsSet is nil") } - if containerMetric.NetworkStatsSet == nil { - return fmt.Errorf("NetworkStatsSet is nil") - } } return nil } @@ -251,14 +248,14 @@ func validateEmptyTaskHealthMetrics(t *testing.T, engine *DockerStatsEngine) { func createFakeContainerStats() []*ContainerStats { netStats := &NetworkStats{ - rxBytes: 796, - rxDropped: 6, - rxErrors: 0, - rxPackets: 10, - txBytes: 8192, - txDropped: 5, - txErrors: 0, - txPackets: 60, + RxBytes: 796, + RxDropped: 6, + RxErrors: 0, + RxPackets: 10, + TxBytes: 8192, + TxDropped: 5, + TxErrors: 0, + TxPackets: 60, } return []*ContainerStats{ {22400432, 1839104, uint64(0), uint64(0), netStats, parseNanoTime("2015-02-12T21:22:05.131117533Z")}, diff --git a/agent/stats/engine.go b/agent/stats/engine.go index 09dbbe0ca58..43824d48bc3 100644 --- a/agent/stats/engine.go +++ b/agent/stats/engine.go @@ -594,23 +594,16 @@ func (engine *DockerStatsEngine) taskContainerMetricsUnsafe(taskArn string) ([]* continue } - // Get memory stats set. + // Get memory stats set memoryStatsSet, err := container.statsQueue.GetMemoryStatsSet() if err != nil { seelog.Warnf("Error getting memory stats, err: %v, container: %v", err, dockerID) continue } - networkStatsSet, err := container.statsQueue.GetNetworkStatsSet() - if err != nil { - // we log the error and still continue to publish cpu, memory stats and empty/partial network stats - seelog.Warnf("Error getting network stats: %v, container: %v", err, dockerID) - } - containerMetrics = append(containerMetrics, &ecstcs.ContainerMetric{ - CpuStatsSet: cpuStatsSet, - MemoryStatsSet: memoryStatsSet, - NetworkStatsSet: networkStatsSet, + CpuStatsSet: cpuStatsSet, + MemoryStatsSet: memoryStatsSet, }) } diff --git a/agent/stats/engine_test.go b/agent/stats/engine_test.go index f6a98d3490c..02ce097cc27 100644 --- a/agent/stats/engine_test.go +++ b/agent/stats/engine_test.go @@ -190,20 +190,7 @@ func TestStatsEngineMetadataInStatsSets(t *testing.T) { engine.addAndStartStatsContainer("c1") ts1 := parseNanoTime("2015-02-12T21:22:05.131117533Z") ts2 := parseNanoTime("2015-02-12T21:22:05.232291187Z") - netStats := &NetworkStats{ - rxBytes: 796, - rxDropped: 6, - rxErrors: 0, - rxPackets: 10, - txBytes: 8192, - txDropped: 5, - txErrors: 0, - txPackets: 60, - } - containerStats := []*ContainerStats{ - {22400432, 1839104, uint64(0), uint64(0), netStats, ts1}, - {116499979, 3649536, uint64(0), uint64(0), netStats, ts2}, - } + containerStats := createFakeContainerStats() dockerStats := []*types.StatsJSON{{}, {}} dockerStats[0].Read = ts1 dockerStats[1].Read = ts2 diff --git a/agent/stats/queue.go b/agent/stats/queue.go index a577eb841e8..8c57b6c2a7b 100644 --- a/agent/stats/queue.go +++ b/agent/stats/queue.go @@ -179,56 +179,56 @@ func (queue *Queue) GetNetworkStatsSet() (*ecstcs.NetworkStatsSet, error) { func getNetworkRxBytes(s *UsageStats) uint64 { if s.NetworkStats != nil { - return s.NetworkStats.rxBytes + return s.NetworkStats.RxBytes } return uint64(0) } func getNetworkRxDropped(s *UsageStats) uint64 { if s.NetworkStats != nil { - return s.NetworkStats.rxDropped + return s.NetworkStats.RxDropped } return uint64(0) } func getNetworkRxErrors(s *UsageStats) uint64 { if s.NetworkStats != nil { - return s.NetworkStats.rxErrors + return s.NetworkStats.RxErrors } return uint64(0) } func getNetworkRxPackets(s *UsageStats) uint64 { if s.NetworkStats != nil { - return s.NetworkStats.rxPackets + return s.NetworkStats.RxPackets } return uint64(0) } func getNetworkTxBytes(s *UsageStats) uint64 { if s.NetworkStats != nil { - return s.NetworkStats.txBytes + return s.NetworkStats.TxBytes } return uint64(0) } func getNetworkTxDropped(s *UsageStats) uint64 { if s.NetworkStats != nil { - return s.NetworkStats.txDropped + return s.NetworkStats.TxDropped } return uint64(0) } func getNetworkTxErrors(s *UsageStats) uint64 { if s.NetworkStats != nil { - return s.NetworkStats.txErrors + return s.NetworkStats.TxErrors } return uint64(0) } func getNetworkTxPackets(s *UsageStats) uint64 { if s.NetworkStats != nil { - return s.NetworkStats.txPackets + return s.NetworkStats.TxPackets } return uint64(0) } diff --git a/agent/stats/queue_test.go b/agent/stats/queue_test.go index b738991fc44..da60f486ee8 100644 --- a/agent/stats/queue_test.go +++ b/agent/stats/queue_test.go @@ -21,7 +21,6 @@ import ( "time" "github.com/aws/amazon-ecs-agent/agent/tcs/model/ecstcs" - "github.com/aws/aws-sdk-go/aws" "github.com/stretchr/testify/assert" ) @@ -155,14 +154,14 @@ func createQueue(size int, predictableHighUtilization bool) *Queue { storageReadBytes: uintStats[i], storageWriteBytes: uintStats[i], networkStats: &NetworkStats{ - rxBytes: uintStats[i], - rxDropped: 0, - rxErrors: uintStats[i], - rxPackets: uintStats[i], - txBytes: uintStats[i], - txDropped: uintStats[i], - txErrors: 0, - txPackets: uintStats[i], + RxBytes: uintStats[i], + RxDropped: 0, + RxErrors: uintStats[i], + RxPackets: uintStats[i], + TxBytes: uintStats[i], + TxDropped: uintStats[i], + TxErrors: 0, + TxPackets: uintStats[i], }, timestamp: time}) } diff --git a/agent/stats/types.go b/agent/stats/types.go index 419455736f7..3b277aa3404 100644 --- a/agent/stats/types.go +++ b/agent/stats/types.go @@ -34,14 +34,14 @@ type ContainerStats struct { // NetworkStats contains the network stats information for a container type NetworkStats struct { - rxBytes uint64 - rxDropped uint64 - rxErrors uint64 - rxPackets uint64 - txBytes uint64 - txDropped uint64 - txErrors uint64 - txPackets uint64 + RxBytes uint64 `json:"rxBytes"` + RxDropped uint64 `json:"rxDropped"` + RxErrors uint64 `json:"rxErrors"` + RxPackets uint64 `json:"rxPackets"` + TxBytes uint64 `json:"txBytes"` + TxDropped uint64 `json:"txDropped"` + TxErrors uint64 `json:"txErrors"` + TxPackets uint64 `json:"txPackets"` } // UsageStats abstracts the format in which the queue stores data. diff --git a/agent/stats/utils.go b/agent/stats/utils.go index 1e0fafeba58..45880131d80 100644 --- a/agent/stats/utils.go +++ b/agent/stats/utils.go @@ -54,19 +54,20 @@ func isNetworkStatsError(err error) bool { func getNetworkStats(dockerStats *types.StatsJSON) *NetworkStats { if dockerStats.Networks == nil { + seelog.Debug("Network stats not reported for container") return nil } networkStats := &NetworkStats{} for _, netStats := range dockerStats.Networks { - networkStats.rxBytes += netStats.RxBytes - networkStats.rxDropped += netStats.RxDropped - networkStats.rxErrors += netStats.RxErrors - networkStats.rxPackets += netStats.RxPackets + networkStats.RxBytes += netStats.RxBytes + networkStats.RxDropped += netStats.RxDropped + networkStats.RxErrors += netStats.RxErrors + networkStats.RxPackets += netStats.RxPackets - networkStats.txBytes += netStats.TxBytes - networkStats.txDropped += netStats.TxDropped - networkStats.txErrors += netStats.TxErrors - networkStats.txPackets += netStats.TxPackets + networkStats.TxBytes += netStats.TxBytes + networkStats.TxDropped += netStats.TxDropped + networkStats.TxErrors += netStats.TxErrors + networkStats.TxPackets += netStats.TxPackets } return networkStats } diff --git a/agent/stats/utils_test.go b/agent/stats/utils_test.go index 8feee783bf4..9294b72141a 100644 --- a/agent/stats/utils_test.go +++ b/agent/stats/utils_test.go @@ -84,12 +84,12 @@ func TestDockerStatsToContainerStatsMemUsage(t *testing.T) { } func validateNetworkMetrics(t *testing.T, netStats *NetworkStats) { - assert.Equal(t, expectedRxBytes, netStats.rxBytes) - assert.Equal(t, expectedRxPackets, netStats.rxPackets) - assert.Equal(t, expectedRxDropped, netStats.rxDropped) - assert.Equal(t, expectedRxErrors, netStats.rxErrors) - assert.Equal(t, expectedTxBytes, netStats.txBytes) - assert.Equal(t, expectedTxPackets, netStats.txPackets) - assert.Equal(t, expectedTxDropped, netStats.txDropped) - assert.Equal(t, expectedTxErrors, netStats.txErrors) + assert.Equal(t, expectedRxBytes, netStats.RxBytes) + assert.Equal(t, expectedRxPackets, netStats.RxPackets) + assert.Equal(t, expectedRxDropped, netStats.RxDropped) + assert.Equal(t, expectedRxErrors, netStats.RxErrors) + assert.Equal(t, expectedTxBytes, netStats.TxBytes) + assert.Equal(t, expectedTxPackets, netStats.TxPackets) + assert.Equal(t, expectedTxDropped, netStats.TxDropped) + assert.Equal(t, expectedTxErrors, netStats.TxErrors) } From c970da135d0806caf494e2a2cdeed6eda6615b61 Mon Sep 17 00:00:00 2001 From: Ray Allan Date: Fri, 3 May 2019 21:20:21 +0000 Subject: [PATCH 06/17] add StorageStatsSet and windows stats collection --- agent/stats/engine.go | 1 - agent/stats/queue.go | 21 ++++++++------ agent/stats/queue_test.go | 13 ++++----- agent/stats/utils_unix_test.go | 2 +- agent/stats/utils_windows.go | 12 +++++--- agent/stats/utils_windows_test.go | 33 ++++++---------------- agent/stats/windows_test_stats.json | 44 +++++++++++++++++++++++++++++ 7 files changed, 81 insertions(+), 45 deletions(-) diff --git a/agent/stats/engine.go b/agent/stats/engine.go index 43824d48bc3..e9f40a4027e 100644 --- a/agent/stats/engine.go +++ b/agent/stats/engine.go @@ -594,7 +594,6 @@ func (engine *DockerStatsEngine) taskContainerMetricsUnsafe(taskArn string) ([]* continue } - // Get memory stats set memoryStatsSet, err := container.statsQueue.GetMemoryStatsSet() if err != nil { seelog.Warnf("Error getting memory stats, err: %v, container: %v", err, dockerID) diff --git a/agent/stats/queue.go b/agent/stats/queue.go index 8c57b6c2a7b..1b6bfa733e7 100644 --- a/agent/stats/queue.go +++ b/agent/stats/queue.go @@ -128,14 +128,19 @@ func (queue *Queue) GetMemoryStatsSet() (*ecstcs.CWStatsSet, error) { return queue.getCWStatsSet(getMemoryUsagePerc) } -// GetStorageReadStatsSet gets the stats set for aggregate storage read -func (queue *Queue) GetStorageReadStatsSet() (*ecstcs.ULongStatsSet, error) { - return queue.getULongStatsSet(getStorageReadBytes) -} - -// GetStorageWriteStatsSet gets the stats set for aggregate storage written -func (queue *Queue) GetStorageWriteStatsSet() (*ecstcs.ULongStatsSet, error) { - return queue.getULongStatsSet(getStorageWriteBytes) +// GetStorageStatsSet gets the stats set for aggregate storage +func (queue *Queue) GetStorageStatsSet() (*ecstcs.StorageStatsSet, error) { + storageStatsSet := &ecstcs.StorageStatsSet{} + var err error + storageStatsSet.ReadSizeBytes, err = queue.getULongStatsSet(getStorageReadBytes) + if err != nil { + seelog.Warnf("Error getting storage read size bytes: %v", err) + } + storageStatsSet.WriteSizeBytes, err = queue.getULongStatsSet(getStorageWriteBytes) + if err != nil { + seelog.Warnf("Error getting storage write size bytes: %v", err) + } + return storageStatsSet, err } // GetNetworkStatsSet gets the stats set for network metrics. diff --git a/agent/stats/queue_test.go b/agent/stats/queue_test.go index da60f486ee8..407cac2ac34 100644 --- a/agent/stats/queue_test.go +++ b/agent/stats/queue_test.go @@ -219,11 +219,12 @@ func TestQueueAddRemove(t *testing.T) { t.Error("Sum value incorrectly set: ", *memStatsSet.Sum) } - storageReadStatsSet, err := queue.GetStorageReadStatsSet() + storageStatsSet, err := queue.GetStorageStatsSet() if err != nil { - t.Error("Error getting storage read stats set:", err) + t.Error("Error getting storage stats set:", err) } // assuming min is initialized to math.MaxUint64 then truncated + storageReadStatsSet := storageStatsSet.ReadSizeBytes if *storageReadStatsSet.Min == int64(math.MaxInt64) && *storageReadStatsSet.OverflowMin == int64(math.MaxInt64) { t.Error("Min value incorrectly set: ", *storageReadStatsSet.Min) @@ -238,10 +239,7 @@ func TestQueueAddRemove(t *testing.T) { t.Error("Sum value incorrectly set: ", *storageReadStatsSet.Sum) } - storageWriteStatsSet, err := queue.GetStorageWriteStatsSet() - if err != nil { - t.Error("Error getting storage read stats set:", err) - } + storageWriteStatsSet := storageStatsSet.WriteSizeBytes if *storageWriteStatsSet.Min == int64(math.MaxInt64) { t.Error("Min value incorrectly set: ", *storageWriteStatsSet.Min) } @@ -328,7 +326,8 @@ func TestQueueUintStats(t *testing.T) { t.Errorf("Buffer size is incorrect. Expected: %d, Got: %d", queueLength, len(buf)) } - storageReadStatsSet, err := queue.GetStorageReadStatsSet() + storageStatsSet, err := queue.GetStorageStatsSet() + storageReadStatsSet := storageStatsSet.ReadSizeBytes if err != nil { t.Error("Error getting storage read stats set:", err) diff --git a/agent/stats/utils_unix_test.go b/agent/stats/utils_unix_test.go index 980abd44532..a44c3fc7f3b 100644 --- a/agent/stats/utils_unix_test.go +++ b/agent/stats/utils_unix_test.go @@ -25,7 +25,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestDockerStatsToContainerStatsZeroCoresGeneratesError(t *testing.T) { +func TestDockerStatsToContainerStatsEmptyCpuUsageGeneratesError(t *testing.T) { inputJsonFile, _ := filepath.Abs("./unix_test_stats.json") jsonBytes, _ := ioutil.ReadFile(inputJsonFile) dockerStat := &types.StatsJSON{} diff --git a/agent/stats/utils_windows.go b/agent/stats/utils_windows.go index 047dec45207..08562d4198f 100644 --- a/agent/stats/utils_windows.go +++ b/agent/stats/utils_windows.go @@ -31,10 +31,14 @@ func dockerStatsToContainerStats(dockerStats *types.StatsJSON) (*ContainerStats, cpuUsage := (dockerStats.CPUStats.CPUUsage.TotalUsage * 100) / numCores memoryUsage := dockerStats.MemoryStats.PrivateWorkingSet networkStats := getNetworkStats(dockerStats) + storageReadBytes := dockerStats.StorageStats.ReadSizeBytes + storageWriteBytes := dockerStats.StorageStats.WriteSizeBytes return &ContainerStats{ - cpuUsage: cpuUsage, - memoryUsage: memoryUsage, - timestamp: dockerStats.Read, - networkStats: networkStats, + cpuUsage: cpuUsage, + memoryUsage: memoryUsage, + timestamp: dockerStats.Read, + storageReadBytes: storageReadBytes, + storageWriteBytes: storageWriteBytes, + networkStats: networkStats, }, nil } diff --git a/agent/stats/utils_windows_test.go b/agent/stats/utils_windows_test.go index e5d3177bd1c..aace5e44f20 100644 --- a/agent/stats/utils_windows_test.go +++ b/agent/stats/utils_windows_test.go @@ -28,6 +28,7 @@ import ( func TestDockerStatsToContainerStatsZeroCoresGeneratesError(t *testing.T) { numCores = uint64(0) + // not using windows_test_stats.json here to save file open/read time jsonStat := fmt.Sprintf(` { "cpu_stats":{ @@ -42,30 +43,8 @@ func TestDockerStatsToContainerStatsZeroCoresGeneratesError(t *testing.T) { assert.Error(t, err, "expected error converting container stats with zero cpu cores") } -func TestDockerStatsToContainerStatsCpuUsage(t *testing.T) { - // doing this with json makes me sad, but is the easiest way to deal with - // the inner structs - - // numCores is a global variable in package agent/stats - // which denotes the number of cpu cores - numCores = 4 - jsonStat := fmt.Sprintf(` - { - "cpu_stats":{ - "cpu_usage":{ - "total_usage":%d - } - } - }`, 100) - dockerStat := &types.StatsJSON{} - json.Unmarshal([]byte(jsonStat), dockerStat) - containerStats, err := dockerStatsToContainerStats(dockerStat) - assert.NoError(t, err, "converting container stats failed") - require.NotNil(t, containerStats, "containerStats should not be nil") - assert.Equal(t, uint64(2500), containerStats.cpuUsage, "unexpected value for cpuUsage", containerStats.cpuUsage) -} - func TestDockerStatsToContainerStats(t *testing.T) { + numCores = 4 inputJsonFile, _ := filepath.Abs("./windows_test_stats.json") jsonBytes, _ := ioutil.ReadFile(inputJsonFile) dockerStat := &types.StatsJSON{} @@ -73,8 +52,14 @@ func TestDockerStatsToContainerStats(t *testing.T) { containerStats, err := dockerStatsToContainerStats(dockerStat) assert.NoError(t, err, "converting container stats failed") require.NotNil(t, containerStats, "containerStats should not be nil") - // network stats check netStats := containerStats.networkStats assert.NotNil(t, netStats, "networkStats should not be nil") validateNetworkMetrics(t, netStats) + assert.Equal(t, uint64(2500), containerStats.cpuUsage, + "unexpected value for cpuUsage", containerStats.cpuUsage) + assert.Equal(t, uint64(3), containerStats.storageReadBytes, + "unexpected value for storageReadBytes", containerStats.storageReadBytes) + assert.Equal(t, uint64(15), containerStats.storageWriteBytes, + "Unexpected value for storageWriteBytes", containerStats.storageWriteBytes) + } diff --git a/agent/stats/windows_test_stats.json b/agent/stats/windows_test_stats.json index 38f40d8c3f2..5a1ddc7a484 100644 --- a/agent/stats/windows_test_stats.json +++ b/agent/stats/windows_test_stats.json @@ -1,4 +1,48 @@ { + "blkio_stats": { + "io_service_bytes_recursive": null, + "io_serviced_recursive": null, + "io_queue_recursive": null, + "io_service_time_recursive": null, + "io_wait_time_recursive": null, + "io_merged_recursive": null, + "io_time_recursive": null, + "sectors_recursive": null + }, + "num_procs": 8, + "storage_stats": { + "read_count_normalized": 1, + "read_size_bytes": 3, + "write_count_normalized": 1, + "write_size_bytes": 15 + }, + "cpu_stats": { + "cpu_usage": { + "total_usage": 100 + }, + "throttling_data": { + "periods": 0, + "throttled_periods": 0, + "throttled_time": 0 + } + }, + "precpu_stats": { + "cpu_usage": { + "total_usage": 0, + "usage_in_kernelmode": 0, + "usage_in_usermode": 0 + }, + "throttling_data": { + "periods": 0, + "throttled_periods": 0, + "throttled_time": 0 + } + }, + "memory_stats": { + "commitbytes": 114061312, + "commitpeakbytes": 139227136, + "privateworkingset": 85532672 + }, "networks": { "eth0": { "rx_bytes": 796, From d1beea4de18c6fc9cbec5487500e89c1f0cd8585 Mon Sep 17 00:00:00 2001 From: Ray Allan Date: Tue, 7 May 2019 20:52:47 +0000 Subject: [PATCH 07/17] add nil check for iobytes --- agent/stats/utils_unix.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/agent/stats/utils_unix.go b/agent/stats/utils_unix.go index 9b53f5379cd..1e2cfa2d34b 100644 --- a/agent/stats/utils_unix.go +++ b/agent/stats/utils_unix.go @@ -45,6 +45,10 @@ func dockerStatsToContainerStats(dockerStats *types.StatsJSON) (*ContainerStats, func getStorageStats(dockerStats *types.StatsJSON) (uint64, uint64) { // initialize block io and loop over stats to aggregate + if dockerStats.BlkioStats.IoServiceBytesRecursive == nil { + seelog.Debug("Storage stats not reported for container") + return uint64(0), uint64(0) + } storageReadBytes := uint64(0) storageWriteBytes := uint64(0) for _, blockStat := range dockerStats.BlkioStats.IoServiceBytesRecursive { From ef394ae6233cad62bfcf1d7421c387089461be45 Mon Sep 17 00:00:00 2001 From: Sharanya Devaraj Date: Thu, 9 May 2019 00:26:05 +0000 Subject: [PATCH 08/17] add container name to metrics --- agent/tcs/model/api/api-2.json | 1 + agent/tcs/model/ecstcs/api.go | 2 ++ 2 files changed, 3 insertions(+) diff --git a/agent/tcs/model/api/api-2.json b/agent/tcs/model/api/api-2.json index b46e3770c38..3e8656183b5 100644 --- a/agent/tcs/model/api/api-2.json +++ b/agent/tcs/model/api/api-2.json @@ -143,6 +143,7 @@ "type":"structure", "members":{ "containerArn":{"shape":"String"}, + "containerName":{"shape":"String"}, "cpuStatsSet":{"shape":"CWStatsSet"}, "memoryStatsSet":{"shape":"CWStatsSet"}, "networkStatsSet":{"shape":"NetworkStatsSet"}, diff --git a/agent/tcs/model/ecstcs/api.go b/agent/tcs/model/ecstcs/api.go index 7566dee170d..9685812bc8f 100644 --- a/agent/tcs/model/ecstcs/api.go +++ b/agent/tcs/model/ecstcs/api.go @@ -116,6 +116,8 @@ type ContainerMetric struct { ContainerArn *string `locationName:"containerArn" type:"string"` + ContainerName *string `locationName:"containerName" type:"string"` + CpuStatsSet *CWStatsSet `locationName:"cpuStatsSet" type:"structure"` MemoryStatsSet *CWStatsSet `locationName:"memoryStatsSet" type:"structure"` From 131040fb35dfd3611f3000b0eade1ce31194d6d4 Mon Sep 17 00:00:00 2001 From: Sharanya Devaraj Date: Thu, 9 May 2019 00:04:16 +0000 Subject: [PATCH 09/17] stats: add container name to stats container --- agent/stats/container.go | 9 +++++++-- agent/stats/engine.go | 6 +++++- agent/stats/engine_test.go | 24 ++++++++++++++++-------- agent/stats/types.go | 1 + 4 files changed, 29 insertions(+), 11 deletions(-) diff --git a/agent/stats/container.go b/agent/stats/container.go index bb5b29287d0..38be75bab13 100644 --- a/agent/stats/container.go +++ b/agent/stats/container.go @@ -33,17 +33,22 @@ const ( ContainerStatsBufferLength = 120 ) -func newStatsContainer(dockerID string, client dockerapi.DockerClient, resolver resolver.ContainerMetadataResolver) *StatsContainer { +func newStatsContainer(dockerID string, client dockerapi.DockerClient, resolver resolver.ContainerMetadataResolver) (*StatsContainer, error) { + dockerContainer, err := resolver.ResolveContainer(dockerID) + if err != nil { + return nil, err + } ctx, cancel := context.WithCancel(context.Background()) return &StatsContainer{ containerMetadata: &ContainerMetadata{ DockerID: dockerID, + Name: dockerContainer.Container.Name, }, ctx: ctx, cancel: cancel, client: client, resolver: resolver, - } + }, nil } func (container *StatsContainer) StartStatsCollection() { diff --git a/agent/stats/engine.go b/agent/stats/engine.go index e9f40a4027e..f5bb4c11760 100644 --- a/agent/stats/engine.go +++ b/agent/stats/engine.go @@ -246,8 +246,12 @@ func (engine *DockerStatsEngine) addContainerUnsafe(dockerID string) (*StatsCont return nil, errors.Errorf("stats add container: task is terminal, ignoring container: %s, task: %s", dockerID, task.Arn) } + statsContainer, err := newStatsContainer(dockerID, engine.client, engine.resolver) + if err != nil { + return nil, errors.Wrapf(err, "could not map docker container ID to container, ignoring container: %s", dockerID) + } + seelog.Debugf("Adding container to stats watch list, id: %s, task: %s", dockerID, task.Arn) - statsContainer := newStatsContainer(dockerID, engine.client, engine.resolver) engine.tasksToDefinitions[task.Arn] = &taskDefinition{family: task.Family, version: task.Version} watchStatsContainer := false diff --git a/agent/stats/engine_test.go b/agent/stats/engine_test.go index 02ce097cc27..642e8761f36 100644 --- a/agent/stats/engine_test.go +++ b/agent/stats/engine_test.go @@ -43,6 +43,7 @@ func TestStatsEngineAddRemoveContainers(t *testing.T) { t1 := &apitask.Task{Arn: "t1", Family: "f1"} t2 := &apitask.Task{Arn: "t2", Family: "f2"} t3 := &apitask.Task{Arn: "t3"} + name := "testContainer" resolver.EXPECT().ResolveTask("c1").AnyTimes().Return(t1, nil) resolver.EXPECT().ResolveTask("c2").AnyTimes().Return(t1, nil) resolver.EXPECT().ResolveTask("c3").AnyTimes().Return(t2, nil) @@ -50,7 +51,9 @@ func TestStatsEngineAddRemoveContainers(t *testing.T) { resolver.EXPECT().ResolveTask("c5").AnyTimes().Return(t2, nil) resolver.EXPECT().ResolveTask("c6").AnyTimes().Return(t3, nil) resolver.EXPECT().ResolveContainer(gomock.Any()).AnyTimes().Return(&apicontainer.DockerContainer{ - Container: &apicontainer.Container{}, + Container: &apicontainer.Container{ + Name: name, + }, }, nil) mockStatsChannel := make(chan *types.StatsJSON) defer close(mockStatsChannel) @@ -90,6 +93,7 @@ func TestStatsEngineAddRemoveContainers(t *testing.T) { } for _, statsContainer := range containers { + assert.Equal(t, name, statsContainer.containerMetadata.Name) for _, fakeContainerStats := range createFakeContainerStats() { statsContainer.statsQueue.add(fakeContainerStats) } @@ -289,7 +293,7 @@ func TestGetTaskHealthMetrics(t *testing.T) { Since: aws.Time(time.Now()), }, }, - }, nil).Times(2) + }, nil).Times(3) engine := NewDockerStatsEngine(&cfg, nil, eventStream("TestGetTaskHealthMetrics")) ctx, cancel := context.WithCancel(context.TODO()) @@ -298,7 +302,9 @@ func TestGetTaskHealthMetrics(t *testing.T) { engine.containerInstanceArn = "container_instance" containerToStats := make(map[string]*StatsContainer) - containerToStats[containerID] = newStatsContainer(containerID, nil, resolver) + var err error + containerToStats[containerID], err = newStatsContainer(containerID, nil, resolver) + assert.NoError(t, err) engine.tasksToHealthCheckContainers["t1"] = containerToStats engine.tasksToDefinitions["t1"] = &taskDefinition{ family: "f1", @@ -331,7 +337,7 @@ func TestGetTaskHealthMetricsStoppedContainer(t *testing.T) { Since: aws.Time(time.Now()), }, }, - }, nil) + }, nil).Times(2) engine := NewDockerStatsEngine(&cfg, nil, eventStream("TestGetTaskHealthMetrics")) ctx, cancel := context.WithCancel(context.TODO()) @@ -340,7 +346,9 @@ func TestGetTaskHealthMetricsStoppedContainer(t *testing.T) { engine.containerInstanceArn = "container_instance" containerToStats := make(map[string]*StatsContainer) - containerToStats[containerID] = newStatsContainer(containerID, nil, resolver) + var err error + containerToStats[containerID], err = newStatsContainer(containerID, nil, resolver) + assert.NoError(t, err) engine.tasksToHealthCheckContainers["t1"] = containerToStats engine.tasksToDefinitions["t1"] = &taskDefinition{ family: "f1", @@ -348,7 +356,7 @@ func TestGetTaskHealthMetricsStoppedContainer(t *testing.T) { } engine.resolver = resolver - _, _, err := engine.GetTaskHealthMetrics() + _, _, err = engine.GetTaskHealthMetrics() assert.Error(t, err, "empty metrics should cause an error") } @@ -372,7 +380,7 @@ func TestMetricsDisabled(t *testing.T) { Container: &apicontainer.Container{ HealthCheckType: "docker", }, - }, nil) + }, nil).Times(2) engine := NewDockerStatsEngine(&disableMetricsConfig, nil, eventStream("TestMetricsDisabled")) ctx, cancel := context.WithCancel(context.TODO()) @@ -418,7 +426,7 @@ func TestSynchronizeOnRestart(t *testing.T) { Container: &apicontainer.Container{ HealthCheckType: "docker", }, - }, nil) + }, nil).Times(2) err := engine.synchronizeState() assert.NoError(t, err) diff --git a/agent/stats/types.go b/agent/stats/types.go index 3b277aa3404..5d63d34aeea 100644 --- a/agent/stats/types.go +++ b/agent/stats/types.go @@ -58,6 +58,7 @@ type UsageStats struct { // ContainerMetadata contains meta-data information for a container. type ContainerMetadata struct { DockerID string `json:"-"` + Name string `json:"-"` } // StatsContainer abstracts methods to gather and aggregate utilization data for a container. From df47b1b41fb12218fb471608a6794820f8df42fb Mon Sep 17 00:00:00 2001 From: Sharanya Devaraj Date: Wed, 15 May 2019 21:05:15 +0000 Subject: [PATCH 10/17] stats: add network and storage stats to container metrics The new stats added as part of the container metrics struct is sent to TCS as part of `PublishMetricsInput` --- agent/stats/common_test.go | 10 ++++++++-- agent/stats/engine.go | 18 ++++++++++++++++-- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/agent/stats/common_test.go b/agent/stats/common_test.go index 4c19f71794d..0ec0ce469ec 100644 --- a/agent/stats/common_test.go +++ b/agent/stats/common_test.go @@ -154,6 +154,12 @@ func validateContainerMetrics(containerMetrics []*ecstcs.ContainerMetric, expect if containerMetric.MemoryStatsSet == nil { return fmt.Errorf("MemoryStatsSet is nil") } + if containerMetric.NetworkStatsSet == nil { + return fmt.Errorf("NetworkStatsSet is nil") + } + if containerMetric.StorageStatsSet == nil { + return fmt.Errorf("StorageStatsSet is nil") + } } return nil } @@ -258,8 +264,8 @@ func createFakeContainerStats() []*ContainerStats { TxPackets: 60, } return []*ContainerStats{ - {22400432, 1839104, uint64(0), uint64(0), netStats, parseNanoTime("2015-02-12T21:22:05.131117533Z")}, - {116499979, 3649536, uint64(0), uint64(0), netStats, parseNanoTime("2015-02-12T21:22:05.232291187Z")}, + {22400432, 1839104, uint64(100), uint64(200), netStats, parseNanoTime("2015-02-12T21:22:05.131117533Z")}, + {116499979, 3649536, uint64(300), uint64(400), netStats, parseNanoTime("2015-02-12T21:22:05.232291187Z")}, } } diff --git a/agent/stats/engine.go b/agent/stats/engine.go index f5bb4c11760..4b6dfa7be41 100644 --- a/agent/stats/engine.go +++ b/agent/stats/engine.go @@ -604,9 +604,23 @@ func (engine *DockerStatsEngine) taskContainerMetricsUnsafe(taskArn string) ([]* continue } + networkStatsSet, err := container.statsQueue.GetNetworkStatsSet() + if err != nil { + // we log the error and still continue to publish cpu, memory stats + seelog.Warnf("Error getting network stats: %v, container: %v", err, dockerID) + } + + storageStatsSet, err := container.statsQueue.GetStorageStatsSet() + if err != nil { + seelog.Warnf("Error getting storage stats, err: %v, container: %v", err, dockerID) + continue + } + containerMetrics = append(containerMetrics, &ecstcs.ContainerMetric{ - CpuStatsSet: cpuStatsSet, - MemoryStatsSet: memoryStatsSet, + CpuStatsSet: cpuStatsSet, + MemoryStatsSet: memoryStatsSet, + NetworkStatsSet: networkStatsSet, + StorageStatsSet: storageStatsSet, }) } From f30c90fe8315513b1941933f76bd6d54d5985978 Mon Sep 17 00:00:00 2001 From: Sharanya Devaraj Date: Thu, 16 May 2019 18:55:24 +0000 Subject: [PATCH 11/17] stats:add container's name to container metrics --- agent/stats/common_test.go | 3 +++ agent/stats/engine.go | 1 + agent/stats/engine_test.go | 4 +++- 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/agent/stats/common_test.go b/agent/stats/common_test.go index 0ec0ce469ec..e3066d26056 100644 --- a/agent/stats/common_test.go +++ b/agent/stats/common_test.go @@ -148,6 +148,9 @@ func validateContainerMetrics(containerMetrics []*ecstcs.ContainerMetric, expect return fmt.Errorf("Mismatch in number of ContainerStatsSet elements. Expected: %d, Got: %d", expected, len(containerMetrics)) } for _, containerMetric := range containerMetrics { + if *containerMetric.ContainerName == "" { + return fmt.Errorf("ContainerName is empty") + } if containerMetric.CpuStatsSet == nil { return fmt.Errorf("CPUStatsSet is nil") } diff --git a/agent/stats/engine.go b/agent/stats/engine.go index 4b6dfa7be41..9171d4798f2 100644 --- a/agent/stats/engine.go +++ b/agent/stats/engine.go @@ -617,6 +617,7 @@ func (engine *DockerStatsEngine) taskContainerMetricsUnsafe(taskArn string) ([]* } containerMetrics = append(containerMetrics, &ecstcs.ContainerMetric{ + ContainerName: &container.containerMetadata.Name, CpuStatsSet: cpuStatsSet, MemoryStatsSet: memoryStatsSet, NetworkStatsSet: networkStatsSet, diff --git a/agent/stats/engine_test.go b/agent/stats/engine_test.go index 642e8761f36..ce39a34445d 100644 --- a/agent/stats/engine_test.go +++ b/agent/stats/engine_test.go @@ -179,7 +179,9 @@ func TestStatsEngineMetadataInStatsSets(t *testing.T) { t1 := &apitask.Task{Arn: "t1", Family: "f1"} resolver.EXPECT().ResolveTask("c1").AnyTimes().Return(t1, nil) resolver.EXPECT().ResolveContainer(gomock.Any()).AnyTimes().Return(&apicontainer.DockerContainer{ - Container: &apicontainer.Container{}, + Container: &apicontainer.Container{ + Name: "test", + }, }, nil) mockDockerClient.EXPECT().Stats(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() From b2fe45ef750c6123f681ce85817d042ba052fd02 Mon Sep 17 00:00:00 2001 From: Sharanya Devaraj Date: Fri, 17 May 2019 20:13:27 +0000 Subject: [PATCH 12/17] stats: empty network stats for none,host and awsvpc mode Currently, Docker does not return Network stats for ECS network modes - awsvpc, host, none. Do not send zero value network metrics to TCS for those network modes. --- agent/stats/container.go | 5 ++- agent/stats/engine.go | 34 ++++++++++------ agent/stats/engine_test.go | 70 ++++++++++++++++++++++++++++++++- agent/stats/engine_unix_test.go | 38 ++++++++++++++++++ agent/stats/types.go | 5 ++- 5 files changed, 135 insertions(+), 17 deletions(-) create mode 100644 agent/stats/engine_unix_test.go diff --git a/agent/stats/container.go b/agent/stats/container.go index 38be75bab13..26ed6c637eb 100644 --- a/agent/stats/container.go +++ b/agent/stats/container.go @@ -41,8 +41,9 @@ func newStatsContainer(dockerID string, client dockerapi.DockerClient, resolver ctx, cancel := context.WithCancel(context.Background()) return &StatsContainer{ containerMetadata: &ContainerMetadata{ - DockerID: dockerID, - Name: dockerContainer.Container.Name, + DockerID: dockerID, + Name: dockerContainer.Container.Name, + NetworkMode: dockerContainer.Container.GetNetworkMode(), }, ctx: ctx, cancel: cancel, diff --git a/agent/stats/engine.go b/agent/stats/engine.go index 9171d4798f2..31945911f04 100644 --- a/agent/stats/engine.go +++ b/agent/stats/engine.go @@ -42,6 +42,8 @@ import ( const ( containerChangeHandler = "DockerStatsEngineDockerEventsHandler" queueResetThreshold = 2 * dockerclient.StatsInactivityTimeout + hostNetworkMode = "host" + noneNetworkMode = "none" ) var ( @@ -604,26 +606,34 @@ func (engine *DockerStatsEngine) taskContainerMetricsUnsafe(taskArn string) ([]* continue } - networkStatsSet, err := container.statsQueue.GetNetworkStatsSet() + containerMetric := &ecstcs.ContainerMetric{ + ContainerName: &container.containerMetadata.Name, + CpuStatsSet: cpuStatsSet, + MemoryStatsSet: memoryStatsSet, + } + + task, err := engine.resolver.ResolveTask(dockerID) if err != nil { - // we log the error and still continue to publish cpu, memory stats - seelog.Warnf("Error getting network stats: %v, container: %v", err, dockerID) + seelog.Warnf("Task not found for container ID: %s", dockerID) + } else { + // send network stats for default/bridge/nat network modes + if task.ENI == nil && container.containerMetadata.NetworkMode != hostNetworkMode && container.containerMetadata.NetworkMode != noneNetworkMode { + networkStatsSet, err := container.statsQueue.GetNetworkStatsSet() + if err != nil { + // we log the error and still continue to publish cpu, memory stats + seelog.Warnf("Error getting network stats: %v, container: %v", err, dockerID) + } + containerMetric.NetworkStatsSet = networkStatsSet + } } storageStatsSet, err := container.statsQueue.GetStorageStatsSet() if err != nil { seelog.Warnf("Error getting storage stats, err: %v, container: %v", err, dockerID) - continue } + containerMetric.StorageStatsSet = storageStatsSet - containerMetrics = append(containerMetrics, &ecstcs.ContainerMetric{ - ContainerName: &container.containerMetadata.Name, - CpuStatsSet: cpuStatsSet, - MemoryStatsSet: memoryStatsSet, - NetworkStatsSet: networkStatsSet, - StorageStatsSet: storageStatsSet, - }) - + containerMetrics = append(containerMetrics, containerMetric) } return containerMetrics, nil diff --git a/agent/stats/engine_test.go b/agent/stats/engine_test.go index ce39a34445d..19942f8efd0 100644 --- a/agent/stats/engine_test.go +++ b/agent/stats/engine_test.go @@ -23,6 +23,7 @@ import ( apicontainer "github.com/aws/amazon-ecs-agent/agent/api/container" apicontainerstatus "github.com/aws/amazon-ecs-agent/agent/api/container/status" + apieni "github.com/aws/amazon-ecs-agent/agent/api/eni" apitask "github.com/aws/amazon-ecs-agent/agent/api/task" apitaskstatus "github.com/aws/amazon-ecs-agent/agent/api/task/status" "github.com/aws/amazon-ecs-agent/agent/dockerclient/dockerapi" @@ -44,6 +45,7 @@ func TestStatsEngineAddRemoveContainers(t *testing.T) { t2 := &apitask.Task{Arn: "t2", Family: "f2"} t3 := &apitask.Task{Arn: "t3"} name := "testContainer" + networkMode := "bridge" resolver.EXPECT().ResolveTask("c1").AnyTimes().Return(t1, nil) resolver.EXPECT().ResolveTask("c2").AnyTimes().Return(t1, nil) resolver.EXPECT().ResolveTask("c3").AnyTimes().Return(t2, nil) @@ -52,7 +54,8 @@ func TestStatsEngineAddRemoveContainers(t *testing.T) { resolver.EXPECT().ResolveTask("c6").AnyTimes().Return(t3, nil) resolver.EXPECT().ResolveContainer(gomock.Any()).AnyTimes().Return(&apicontainer.DockerContainer{ Container: &apicontainer.Container{ - Name: name, + Name: name, + NetworkModeUnsafe: networkMode, }, }, nil) mockStatsChannel := make(chan *types.StatsJSON) @@ -94,6 +97,7 @@ func TestStatsEngineAddRemoveContainers(t *testing.T) { for _, statsContainer := range containers { assert.Equal(t, name, statsContainer.containerMetadata.Name) + assert.Equal(t, networkMode, statsContainer.containerMetadata.NetworkMode) for _, fakeContainerStats := range createFakeContainerStats() { statsContainer.statsQueue.add(fakeContainerStats) } @@ -440,3 +444,67 @@ func TestSynchronizeOnRestart(t *testing.T) { <-statsStarted statsContainer.StopStatsCollection() } + +func TestTaskNetworkStatsSet(t *testing.T) { + var networkModes = []struct { + ENI *apieni.ENI + NetworkMode string + StatsEmpty bool + }{ + {nil, "default", false}, + } + for _, tc := range networkModes { + testNetworkModeStats(t, tc.NetworkMode, tc.ENI, tc.StatsEmpty) + } +} + +func testNetworkModeStats(t *testing.T, netMode string, eni *apieni.ENI, emptyStats bool) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + resolver := mock_resolver.NewMockContainerMetadataResolver(mockCtrl) + mockDockerClient := mock_dockerapi.NewMockDockerClient(mockCtrl) + t1 := &apitask.Task{ + Arn: "t1", + Family: "f1", + ENI: eni, + } + resolver.EXPECT().ResolveTask("c1").AnyTimes().Return(t1, nil) + resolver.EXPECT().ResolveContainer(gomock.Any()).AnyTimes().Return(&apicontainer.DockerContainer{ + Container: &apicontainer.Container{ + Name: "test", + NetworkModeUnsafe: netMode, + }, + }, nil) + mockDockerClient.EXPECT().Stats(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + + engine := NewDockerStatsEngine(&cfg, nil, eventStream("TestTaskNetworkStatsSet")) + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + engine.ctx = ctx + engine.resolver = resolver + engine.cluster = defaultCluster + engine.containerInstanceArn = defaultContainerInstance + engine.client = mockDockerClient + engine.addAndStartStatsContainer("c1") + ts1 := parseNanoTime("2015-02-12T21:22:05.131117533Z") + containerStats := createFakeContainerStats() + dockerStats := []*types.StatsJSON{{}, {}} + dockerStats[0].Read = ts1 + containers, _ := engine.tasksToContainers["t1"] + for _, statsContainer := range containers { + for i := 0; i < 2; i++ { + statsContainer.statsQueue.add(containerStats[i]) + statsContainer.statsQueue.setLastStat(dockerStats[i]) + } + } + _, taskMetrics, err := engine.GetInstanceMetrics() + assert.NoError(t, err) + assert.Len(t, taskMetrics, 1) + for _, containerMetric := range taskMetrics[0].ContainerMetrics { + if emptyStats { + assert.Nil(t, containerMetric.NetworkStatsSet, "network stats should be empty") + } else { + assert.NotNil(t, containerMetric.NetworkStatsSet, "network stats should be non-empty") + } + } +} diff --git a/agent/stats/engine_unix_test.go b/agent/stats/engine_unix_test.go new file mode 100644 index 00000000000..117c85a2f3b --- /dev/null +++ b/agent/stats/engine_unix_test.go @@ -0,0 +1,38 @@ +//+build unit + +// Copyright 2014-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package stats + +import ( + "testing" + + apieni "github.com/aws/amazon-ecs-agent/agent/api/eni" +) + +func TestLinuxTaskNetworkStatsSet(t *testing.T) { + var networkModes = []struct { + ENI *apieni.ENI + NetworkMode string + StatsEmpty bool + }{ + {&apieni.ENI{ID: "ec2Id"}, "", true}, + {nil, "host", true}, + {nil, "bridge", false}, + {nil, "none", true}, + } + for _, tc := range networkModes { + testNetworkModeStats(t, tc.NetworkMode, tc.ENI, tc.StatsEmpty) + } +} diff --git a/agent/stats/types.go b/agent/stats/types.go index 5d63d34aeea..e96ffcbf9f6 100644 --- a/agent/stats/types.go +++ b/agent/stats/types.go @@ -57,8 +57,9 @@ type UsageStats struct { // ContainerMetadata contains meta-data information for a container. type ContainerMetadata struct { - DockerID string `json:"-"` - Name string `json:"-"` + DockerID string `json:"-"` + Name string `json:"-"` + NetworkMode string `json:"-"` } // StatsContainer abstracts methods to gather and aggregate utilization data for a container. From f07a9ebcae6fdcd5da049f4ba77dad50a5378bf2 Mon Sep 17 00:00:00 2001 From: Sharanya Devaraj Date: Tue, 21 May 2019 21:57:34 +0000 Subject: [PATCH 13/17] network stats integration tests --- agent/stats/common_test.go | 6 +- agent/stats/engine_integ_test.go | 91 ++++++++++++++++++++++++++- agent/stats/engine_unix_integ_test.go | 34 ++++++++++ 3 files changed, 126 insertions(+), 5 deletions(-) create mode 100644 agent/stats/engine_unix_integ_test.go diff --git a/agent/stats/common_test.go b/agent/stats/common_test.go index e3066d26056..0f85574a8dc 100644 --- a/agent/stats/common_test.go +++ b/agent/stats/common_test.go @@ -75,12 +75,14 @@ func eventStream(name string) *eventstream.EventStream { // createGremlin creates the gremlin container using the docker client. // It is used only in the test code. -func createGremlin(client *sdkClient.Client) (*dockercontainer.ContainerCreateCreatedBody, error) { +func createGremlin(client *sdkClient.Client, netMode string) (*dockercontainer.ContainerCreateCreatedBody, error) { containerGremlin, err := client.ContainerCreate(context.TODO(), &dockercontainer.Config{ Image: testImageName, }, - &dockercontainer.HostConfig{}, + &dockercontainer.HostConfig{ + NetworkMode: dockercontainer.NetworkMode(netMode), + }, &network.NetworkingConfig{}, "") diff --git a/agent/stats/engine_integ_test.go b/agent/stats/engine_integ_test.go index 78e7dac7b69..31dd5316dd2 100644 --- a/agent/stats/engine_integ_test.go +++ b/agent/stats/engine_integ_test.go @@ -65,7 +65,7 @@ func TestStatsEngineWithExistingContainersWithoutHealth(t *testing.T) { timeout := defaultDockerTimeoutSeconds // Create a container to get the container id. - container, err := createGremlin(client) + container, err := createGremlin(client, "default") require.NoError(t, err, "creating container failed") defer client.ContainerRemove(ctx, container.ID, types.ContainerRemoveOptions{Force: true}) @@ -128,7 +128,7 @@ func TestStatsEngineWithNewContainersWithoutHealth(t *testing.T) { // Assign ContainerStop timeout to addressable variable timeout := defaultDockerTimeoutSeconds - container, err := createGremlin(client) + container, err := createGremlin(client, "default") require.NoError(t, err, "creating container failed") defer client.ContainerRemove(ctx, container.ID, types.ContainerRemoveOptions{Force: true}) @@ -345,7 +345,7 @@ func TestStatsEngineWithNewContainersWithPolling(t *testing.T) { cfg.PollMetrics = true cfg.PollingMetricsWaitDuration = 1 * time.Second // Create a new docker client with new config - dockerClientForNewContainersWithPolling, _ := dockerapi.NewDockerGoClient(sdkClientFactory, &cfg, ctx) + dockerClientForNewContainersWithPolling, _ := dockerapi.NewDockerGoClient(sdkClientFactory, &cfg, ctx) // Create a new docker stats engine engine := NewDockerStatsEngine(&cfg, dockerClientForNewContainersWithPolling, eventStream("TestStatsEngineWithNewContainers")) defer engine.removeAll() @@ -574,3 +574,88 @@ func TestStatsEngineWithDockerTaskEngineMissingRemoveEvent(t *testing.T) { validateIdleContainerMetrics(t, statsEngine) validateEmptyTaskHealthMetrics(t, statsEngine) } + +func TestStatsEngineWithNetworkStatsDefaultMode(t *testing.T) { + testNetworkModeStats(t, "default", false) +} + +func testNetworkModeStats(t *testing.T, networkMode string, statsEmpty bool) { + // Create a new docker stats engine + engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithNetworkStats")) + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + // Assign ContainerStop timeout to addressable variable + timeout := defaultDockerTimeoutSeconds + + // Create a container to get the container id. + container, err := createGremlin(client, networkMode) + require.NoError(t, err, "creating container failed") + defer client.ContainerRemove(ctx, container.ID, types.ContainerRemoveOptions{Force: true}) + + engine.cluster = defaultCluster + engine.containerInstanceArn = defaultContainerInstance + + err = client.ContainerStart(ctx, container.ID, types.ContainerStartOptions{}) + require.NoError(t, err, "starting container failed") + defer client.ContainerStop(ctx, container.ID, &timeout) + + containerChangeEventStream := eventStream("TestStatsEngineWithNetworkStats") + taskEngine := ecsengine.NewTaskEngine(&config.Config{}, nil, nil, containerChangeEventStream, + nil, dockerstate.NewTaskEngineState(), nil, nil) + testTask := createRunningTask() + + // Populate Tasks and Container map in the engine. + dockerTaskEngine := taskEngine.(*ecsengine.DockerTaskEngine) + dockerTaskEngine.State().AddTask(testTask) + dockerTaskEngine.State().AddContainer( + &apicontainer.DockerContainer{ + DockerID: container.ID, + DockerName: "gremlin", + Container: testTask.Containers[0], + }, + testTask) + + // Inspect the container and populate the container's network mode + // This is done as part of Task Engine + // https://github.com/aws/amazon-ecs-agent/blob/d2456beb048d36bfe18159ad7f35ca6b78bb9ee9/agent/engine/docker_task_engine.go#L364 + dockerContainer, err := client.ContainerInspect(ctx, container.ID) + require.NoError(t, err, "inspecting container failed") + netMode := string(dockerContainer.HostConfig.NetworkMode) + testTask.Containers[0].SetNetworkMode(netMode) + + // Simulate container start prior to listener initialization. + time.Sleep(checkPointSleep) + err = engine.MustInit(ctx, taskEngine, defaultCluster, defaultContainerInstance) + require.NoError(t, err, "initializing stats engine failed") + defer engine.containerChangeEventStream.Unsubscribe(containerChangeHandler) + + // Wait for the stats collection go routine to start. + time.Sleep(checkPointSleep) + _, taskMetrics, err := engine.GetInstanceMetrics() + assert.NoError(t, err, "gettting instance metrics failed") + taskMetric := taskMetrics[0] + for _, containerMetric := range taskMetric.ContainerMetrics { + if statsEmpty { + assert.Nil(t, containerMetric.NetworkStatsSet, "network stats should be empty for %s network mode", networkMode) + } else { + assert.NotNil(t, containerMetric.NetworkStatsSet, "network stats should be non-empty for %s network mode", networkMode) + } + } + + err = client.ContainerStop(ctx, container.ID, &timeout) + require.NoError(t, err, "stopping container failed") + + err = engine.containerChangeEventStream.WriteToEventStream(dockerapi.DockerContainerChangeEvent{ + Status: apicontainerstatus.ContainerStopped, + DockerContainerMetadata: dockerapi.DockerContainerMetadata{ + DockerID: container.ID, + }, + }) + assert.NoError(t, err, "failed to write to container change event stream") + time.Sleep(waitForCleanupSleep) + + // Should not contain any metrics after cleanup. + validateIdleContainerMetrics(t, engine) + validateEmptyTaskHealthMetrics(t, engine) +} diff --git a/agent/stats/engine_unix_integ_test.go b/agent/stats/engine_unix_integ_test.go new file mode 100644 index 00000000000..87081b6a163 --- /dev/null +++ b/agent/stats/engine_unix_integ_test.go @@ -0,0 +1,34 @@ +// +build !windows,integration + +// Copyright 2014-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package stats + +import ( + "testing" +) + +func TestStatsEngineWithNetworkStatsDifferentModes(t *testing.T) { + var networkModes = []struct { + NetworkMode string + StatsEmpty bool + }{ + {"bridge", false}, + {"host", true}, + {"none", true}, + } + for _, tc := range networkModes { + testNetworkModeStats(t, tc.NetworkMode, tc.StatsEmpty) + } +} From fd1492f8e85c8dce5017bcbaedf25a62e089a8ce Mon Sep 17 00:00:00 2001 From: Ray Allan Date: Thu, 23 May 2019 23:17:04 +0000 Subject: [PATCH 14/17] updated with storage stats check --- agent/stats/engine_integ_test.go | 78 +++++++++++++++++++++++++++++++- 1 file changed, 77 insertions(+), 1 deletion(-) diff --git a/agent/stats/engine_integ_test.go b/agent/stats/engine_integ_test.go index 31dd5316dd2..8a9daae86bb 100644 --- a/agent/stats/engine_integ_test.go +++ b/agent/stats/engine_integ_test.go @@ -633,7 +633,7 @@ func testNetworkModeStats(t *testing.T, networkMode string, statsEmpty bool) { // Wait for the stats collection go routine to start. time.Sleep(checkPointSleep) _, taskMetrics, err := engine.GetInstanceMetrics() - assert.NoError(t, err, "gettting instance metrics failed") + assert.NoError(t, err, "getting instance metrics failed") taskMetric := taskMetrics[0] for _, containerMetric := range taskMetric.ContainerMetrics { if statsEmpty { @@ -659,3 +659,79 @@ func testNetworkModeStats(t *testing.T, networkMode string, statsEmpty bool) { validateIdleContainerMetrics(t, engine) validateEmptyTaskHealthMetrics(t, engine) } + +func TestStorageStats(t *testing.T) { + // Create a new docker stats engine + engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithStorageStats")) + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + // Assign ContainerStop timeout to addressable variable + timeout := defaultDockerTimeoutSeconds + + // Create a container to get the container id. + container, err := createGremlin(client, "default") + require.NoError(t, err, "creating container failed") + defer client.ContainerRemove(ctx, container.ID, types.ContainerRemoveOptions{Force: true}) + + engine.cluster = defaultCluster + engine.containerInstanceArn = defaultContainerInstance + + err = client.ContainerStart(ctx, container.ID, types.ContainerStartOptions{}) + require.NoError(t, err, "starting container failed") + defer client.ContainerStop(ctx, container.ID, &timeout) + + containerChangeEventStream := eventStream("TestStatsEngineWithStorageStats") + taskEngine := ecsengine.NewTaskEngine(&config.Config{}, nil, nil, containerChangeEventStream, + nil, dockerstate.NewTaskEngineState(), nil, nil) + testTask := createRunningTask() + + // Populate Tasks and Container map in the engine. + dockerTaskEngine := taskEngine.(*ecsengine.DockerTaskEngine) + dockerTaskEngine.State().AddTask(testTask) + dockerTaskEngine.State().AddContainer( + &apicontainer.DockerContainer{ + DockerID: container.ID, + DockerName: "gremlin", + Container: testTask.Containers[0], + }, + testTask) + + // Inspect the container and populate the container's network mode + dockerContainer, err := client.ContainerInspect(ctx, container.ID) + require.NoError(t, err, "inspecting container failed") + // Using default network mode + netMode := string(dockerContainer.HostConfig.NetworkMode) + testTask.Containers[0].SetNetworkMode(netMode) + + // Simulate container start prior to listener initialization. + time.Sleep(checkPointSleep) + err = engine.MustInit(ctx, taskEngine, defaultCluster, defaultContainerInstance) + require.NoError(t, err, "initializing stats engine failed") + defer engine.containerChangeEventStream.Unsubscribe(containerChangeHandler) + + // Wait for the stats collection go routine to start. + time.Sleep(checkPointSleep) + _, taskMetrics, err := engine.GetInstanceMetrics() + assert.NoError(t, err, "getting instance metrics failed") + taskMetric := taskMetrics[0] + for _, containerMetric := range taskMetric.ContainerMetrics { + assert.NotNil(t, containerMetric.StorageStatsSet, "storage stats should be non-empty") + } + + err = client.ContainerStop(ctx, container.ID, &timeout) + require.NoError(t, err, "stopping container failed") + + err = engine.containerChangeEventStream.WriteToEventStream(dockerapi.DockerContainerChangeEvent{ + Status: apicontainerstatus.ContainerStopped, + DockerContainerMetadata: dockerapi.DockerContainerMetadata{ + DockerID: container.ID, + }, + }) + assert.NoError(t, err, "failed to write to container change event stream") + time.Sleep(waitForCleanupSleep) + + // Should not contain any metrics after cleanup. + validateIdleContainerMetrics(t, engine) + validateEmptyTaskHealthMetrics(t, engine) +} From 538c163459cc98632809fcb849c599dcf06ebcf9 Mon Sep 17 00:00:00 2001 From: Ray Allan Date: Fri, 24 May 2019 22:03:11 +0000 Subject: [PATCH 15/17] add storage-stats task for functional testing --- Makefile | 11 ++++-- misc/storage-stats/Dockerfile | 22 ++++++++++++ misc/storage-stats/Makefile | 6 ++++ misc/storage-stats/main.go | 66 +++++++++++++++++++++++++++++++++++ 4 files changed, 102 insertions(+), 3 deletions(-) create mode 100644 misc/storage-stats/Dockerfile create mode 100644 misc/storage-stats/Makefile create mode 100644 misc/storage-stats/main.go diff --git a/Makefile b/Makefile index ff1b7ec654d..fdb14e64cc5 100644 --- a/Makefile +++ b/Makefile @@ -208,13 +208,13 @@ test-in-docker: # Privileged needed for docker-in-docker so integ tests pass docker run --net=none -v "$(PWD):/go/src/github.com/aws/amazon-ecs-agent" --privileged "amazon/amazon-ecs-agent-test:make" -run-functional-tests: testnnp test-registry ecr-execution-role-image telemetry-test-image +run-functional-tests: testnnp test-registry ecr-execution-role-image telemetry-test-image storage-stats-test-image . ./scripts/shared_env && go test -tags functional -timeout=60m -v ./agent/functional_tests/... .PHONY: build-image-for-ecr ecr-execution-role-image-for-upload upload-images replicate-images build-image-for-ecr: netkitten volumes-test squid awscli image-cleanup-test-images fluentd taskmetadata-validator \ - testnnp container-health-check-image telemetry-test-image ecr-execution-role-image-for-upload + testnnp container-health-check-image telemetry-test-image storage-stats-test-image ecr-execution-role-image-for-upload ecr-execution-role-image-for-upload: $(MAKE) -C misc/ecr-execution-role-upload $(MFLAGS) @@ -321,7 +321,8 @@ namespace-tests: # TODO, replace this with a build on dockerhub or a mechanism for the # functional tests themselves to build this -.PHONY: squid awscli fluentd gremlin agent-introspection-validator taskmetadata-validator v3-task-endpoint-validator container-metadata-file-validator elastic-inference-validator image-cleanup-test-images ecr-execution-role-image container-health-check-image telemetry-test-image +.PHONY: squid awscli fluentd gremlin agent-introspection-validator taskmetadata-validator v3-task-endpoint-validator container-metadata-file-validator elastic-inference-validator image-cleanup-test-images ecr-execution-role-image container-health-check-image telemetry-test-image storage-stats-test-image + squid: $(MAKE) -C misc/squid $(MFLAGS) @@ -364,6 +365,9 @@ ecr-execution-role-image: telemetry-test-image: $(MAKE) -C misc/telemetry $(MFLAGS) +storage-stats-test-image: + $(MAKE) -C misc/storage-stats $(MFLAGS) + container-health-check-image: $(MAKE) -C misc/container-health $(MFLAGS) @@ -438,6 +442,7 @@ clean: -$(MAKE) -C misc/elastic-inference-validator $(MFLAGS) clean -$(MAKE) -C misc/container-health $(MFLAGS) clean -$(MAKE) -C misc/telemetry $(MFLAGS) clean + -$(MAKE) -C misc/storage-stats $(MFLAGS) clean -$(MAKE) -C misc/appmesh-plugin-validator $(MFLAGS) clean -$(MAKE) -C misc/eni-trunking-validator $(MFLAGS) clean -rm -f .get-deps-stamp diff --git a/misc/storage-stats/Dockerfile b/misc/storage-stats/Dockerfile new file mode 100644 index 00000000000..e504cb51751 --- /dev/null +++ b/misc/storage-stats/Dockerfile @@ -0,0 +1,22 @@ +# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may +# not use this file except in compliance with the License. A copy of the +# License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing +# permissions and limitations under the License. +FROM golang:1.12 + +WORKDIR /gopath + +COPY main.go . + +RUN go build -o storagestats main.go + +ENTRYPOINT ["./storagestats"] +CMD [ "-sleep", "5000", "-bytecount", "1073741823"] diff --git a/misc/storage-stats/Makefile b/misc/storage-stats/Makefile new file mode 100644 index 00000000000..6d99293a0d0 --- /dev/null +++ b/misc/storage-stats/Makefile @@ -0,0 +1,6 @@ +.PHONY: all +all: + docker build -t amazon/amazon-ecs-storage-stats-test:make . + +clean: + -docker rmi -f "amazon/amazon-ecs-storage-stats-test:make" diff --git a/misc/storage-stats/main.go b/misc/storage-stats/main.go new file mode 100644 index 00000000000..5e94ead4b4e --- /dev/null +++ b/misc/storage-stats/main.go @@ -0,0 +1,66 @@ +// Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package main + +import ( + "flag" + "fmt" + "io/ioutil" + "math/rand" + "os" + "time" +) + +func check(e error) { + if e != nil { + fmt.Printf("error: %v\n", e) + } +} + +func writeBytes(byteCount int64) error { + tmpFile, err := ioutil.TempFile(os.TempDir(), "blocktest-") + defer func() { + err = tmpFile.Close() + check(err) + err = os.Remove(tmpFile.Name()) + check(err) + }() + //populate content with random bytes + writeBytes := make([]byte, byteCount) + rand.Read(writeBytes) + // write and flush to disk to force block write + bytesWritten, err := tmpFile.Write(writeBytes) + if err != nil { + return err + } + err = tmpFile.Sync() + if err != nil { + return err + } + fmt.Printf("wrote %d bytes\n", bytesWritten) + return nil +} + +func main() { + sleepInterval := flag.Int("sleep", 1000, "length of sleep interval") + byteCount := flag.Int64("bytecount", 1024, "size in bytes to be written per interval") + flag.Parse() + for { + // Storage stats are cumulative. + // We do incremental writes with sleep to create + // a predictable increase over time. + writeBytes(*byteCount) + time.Sleep(time.Duration(int32(*sleepInterval)) * time.Millisecond) + } +} From add879260118e6a4a2aa4aae5b3314fc8985bbab Mon Sep 17 00:00:00 2001 From: Ray Allan Date: Wed, 5 Jun 2019 16:56:22 +0000 Subject: [PATCH 16/17] update read/write storage-stats functional task --- misc/storage-stats/Dockerfile | 2 +- misc/storage-stats/main.go | 41 +++++++++++++++++++++++++---------- 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/misc/storage-stats/Dockerfile b/misc/storage-stats/Dockerfile index e504cb51751..5112c6fcb31 100644 --- a/misc/storage-stats/Dockerfile +++ b/misc/storage-stats/Dockerfile @@ -19,4 +19,4 @@ COPY main.go . RUN go build -o storagestats main.go ENTRYPOINT ["./storagestats"] -CMD [ "-sleep", "5000", "-bytecount", "1073741823"] +CMD [ "-sleep", "5000", "-bytecount", "8192"] diff --git a/misc/storage-stats/main.go b/misc/storage-stats/main.go index 5e94ead4b4e..e1d1e2abf8c 100644 --- a/misc/storage-stats/main.go +++ b/misc/storage-stats/main.go @@ -19,6 +19,7 @@ import ( "io/ioutil" "math/rand" "os" + "syscall" "time" ) @@ -28,23 +29,16 @@ func check(e error) { } } -func writeBytes(byteCount int64) error { - tmpFile, err := ioutil.TempFile(os.TempDir(), "blocktest-") - defer func() { - err = tmpFile.Close() - check(err) - err = os.Remove(tmpFile.Name()) - check(err) - }() +func writeBytes(byteCount int64, toFile *os.File) error { //populate content with random bytes writeBytes := make([]byte, byteCount) rand.Read(writeBytes) // write and flush to disk to force block write - bytesWritten, err := tmpFile.Write(writeBytes) + bytesWritten, err := toFile.Write(writeBytes) if err != nil { return err } - err = tmpFile.Sync() + err = toFile.Sync() if err != nil { return err } @@ -52,15 +46,38 @@ func writeBytes(byteCount int64) error { return nil } +func readBytes(readFilePath string) error { + // O_DIRECT flag skips the filesystem cache + f, err := os.OpenFile(readFilePath, os.O_RDONLY|syscall.O_DIRECT, 0777) + if err != nil { + return err + } + bytes, err := ioutil.ReadAll(f) + if err != nil { + return err + } + fmt.Printf("read %d bytes\n", len(bytes)) + return nil +} + func main() { sleepInterval := flag.Int("sleep", 1000, "length of sleep interval") byteCount := flag.Int64("bytecount", 1024, "size in bytes to be written per interval") flag.Parse() for { + // create a temp file to write to then read from + tmpFile, err := ioutil.TempFile(os.TempDir(), "blockwrite-") + defer func() { + err = tmpFile.Close() + check(err) + err = os.Remove(tmpFile.Name()) + check(err) + }() // Storage stats are cumulative. - // We do incremental writes with sleep to create + // We do incremental reads/writes with sleep to create // a predictable increase over time. - writeBytes(*byteCount) + writeBytes(*byteCount, tmpFile) + readBytes(tmpFile.Name()) time.Sleep(time.Duration(int32(*sleepInterval)) * time.Millisecond) } } From ecc92334c1b1d17d6246a15799b617058be5fc9f Mon Sep 17 00:00:00 2001 From: Sharanya Devaraj Date: Wed, 5 Jun 2019 02:06:02 +0000 Subject: [PATCH 17/17] stats:remove unwanted debug log --- agent/stats/utils.go | 1 - 1 file changed, 1 deletion(-) diff --git a/agent/stats/utils.go b/agent/stats/utils.go index 45880131d80..84387ef3281 100644 --- a/agent/stats/utils.go +++ b/agent/stats/utils.go @@ -54,7 +54,6 @@ func isNetworkStatsError(err error) bool { func getNetworkStats(dockerStats *types.StatsJSON) *NetworkStats { if dockerStats.Networks == nil { - seelog.Debug("Network stats not reported for container") return nil } networkStats := &NetworkStats{}