Skip to content

Commit

Permalink
Use logger.Contextual for custom struct logs
Browse files Browse the repository at this point in the history
  • Loading branch information
sparrc committed Dec 21, 2019
1 parent 882c359 commit 1a0c755
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 94 deletions.
6 changes: 5 additions & 1 deletion agent/api/task/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func (t *Task) UnmarshalJSON(data []byte) error {
if err != nil {
return err
}
t.initLog()
t.log.SetContext(map[string]string{
"taskARN": t.Arn,
"taskFamily": t.Family,
"taskVersion": t.Version,
})
return nil
}
36 changes: 14 additions & 22 deletions agent/api/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ type Task struct {
lock sync.RWMutex

// log is a custom logger with extra context specific to the task struct
log seelog.LoggerInterface
log logger.Contextual
}

// TaskFromACS translates ecsacs.Task to apitask.Task by first marshaling the received
Expand All @@ -270,6 +270,11 @@ func TaskFromACS(acsTask *ecsacs.Task, envelope *ecsacs.PayloadMessage) (*Task,
if err := json.Unmarshal(data, task); err != nil {
return nil, err
}
task.log.SetContext(map[string]string{
"taskARN": task.Arn,
"taskFamily": task.Family,
"taskVersion": task.Version,
})
if task.GetDesiredStatus() == apitaskstatus.TaskRunning && envelope.SeqNum != nil {
task.StartSequenceNumber = *envelope.SeqNum
} else if task.GetDesiredStatus() == apitaskstatus.TaskStopped && envelope.SeqNum != nil {
Expand All @@ -286,21 +291,9 @@ func TaskFromACS(acsTask *ecsacs.Task, envelope *ecsacs.PayloadMessage) (*Task,

//initialize resources map for task
task.ResourcesMapUnsafe = make(map[string][]taskresource.TaskResource)
task.initLog()
return task, nil
}

func (task *Task) initLog() {
if task.log == nil {
task.log = logger.InitLogger()
task.log.SetContext(map[string]string{
"taskARN": task.Arn,
"taskFamily": task.Family,
"taskVersion": task.Version,
})
}
}

func (task *Task) initializeVolumes(cfg *config.Config, dockerClient dockerapi.DockerClient, ctx context.Context) error {
err := task.initializeDockerLocalVolumes(dockerClient, ctx)
if err != nil {
Expand All @@ -325,7 +318,6 @@ func (task *Task) PostUnmarshalTask(cfg *config.Config,
dockerClient dockerapi.DockerClient, ctx context.Context) error {
// TODO, add rudimentary plugin support and call any plugins that want to
// hook into this
task.initLog()
task.adjustForPlatform(cfg)
if task.MemoryCPULimitsEnabled {
if err := task.initializeCgroupResourceSpec(cfg.CgroupPath, cfg.CgroupCPUPeriod, resourceFields); err != nil {
Expand Down Expand Up @@ -1311,7 +1303,7 @@ func (task *Task) updateTaskKnownStatus() (newStatus apitaskstatus.TaskStatus) {
}
}
if earliestKnownStatusContainer == nil {
task.log.Criticalf(
task.log.Errorf(
"Impossible state found while updating tasks's known status, earliest state recorded as %s",
containerEarliestKnownStatus.String())
return apitaskstatus.TaskStatusNone
Expand Down Expand Up @@ -1342,7 +1334,7 @@ func (task *Task) updateTaskKnownStatus() (newStatus apitaskstatus.TaskStatus) {
// based on the known statuses of all containers in the task
func (task *Task) getEarliestKnownTaskStatusForContainers() apitaskstatus.TaskStatus {
if len(task.Containers) == 0 {
task.log.Criticalf("No containers in the task")
task.log.Errorf("No containers in the task")
return apitaskstatus.TaskStatusNone
}
// Set earliest container status to an impossible to reach 'high' task status
Expand Down Expand Up @@ -1571,13 +1563,13 @@ func (task *Task) shouldOverrideNetworkMode(container *apicontainer.Container, d
}
}
if pauseContName == "" {
task.log.Critical("Pause container required, but not found in the task")
task.log.Error("Pause container required, but not found in the task")
return false, ""
}
pauseContainer, ok := dockerContainerMap[pauseContName]
if !ok || pauseContainer == nil {
// This should never be the case and implies a code-bug.
task.log.Criticalf("Pause container required, but not found in container map for container: [%s]",
task.log.Errorf("Pause container required, but not found in container map for container: [%s]",
container.String())
return false, ""
}
Expand Down Expand Up @@ -1660,14 +1652,14 @@ func (task *Task) shouldOverridePIDMode(container *apicontainer.Container, docke
case pidModeTask:
pauseCont, ok := task.ContainerByName(NamespacePauseContainerName)
if !ok {
task.log.Criticalf("Namespace Pause container not found in the task; Setting Task's Desired Status to Stopped")
task.log.Errorf("Namespace Pause container not found in the task; Setting Task's Desired Status to Stopped")
task.SetDesiredStatus(apitaskstatus.TaskStopped)
return false, ""
}
pauseDockerID, ok := dockerContainerMap[pauseCont.Name]
if !ok || pauseDockerID == nil {
// Docker container shouldn't be nil or not exist if the Container definition within task exists; implies code-bug
task.log.Criticalf("Namespace Pause docker container not found in the task; Setting Task's Desired Status to Stopped")
task.log.Errorf("Namespace Pause docker container not found in the task; Setting Task's Desired Status to Stopped")
task.SetDesiredStatus(apitaskstatus.TaskStopped)
return false, ""
}
Expand Down Expand Up @@ -1713,14 +1705,14 @@ func (task *Task) shouldOverrideIPCMode(container *apicontainer.Container, docke
case ipcModeTask:
pauseCont, ok := task.ContainerByName(NamespacePauseContainerName)
if !ok {
task.log.Criticalf("Namespace Pause container not found in the task; Setting Task's Desired Status to Stopped")
task.log.Errorf("Namespace Pause container not found in the task; Setting Task's Desired Status to Stopped")
task.SetDesiredStatus(apitaskstatus.TaskStopped)
return false, ""
}
pauseDockerID, ok := dockerContainerMap[pauseCont.Name]
if !ok || pauseDockerID == nil {
// Docker container shouldn't be nill or not exist if the Container definition within task exists; implies code-bug
task.log.Criticalf("Namespace Pause container not found in the task; Setting Task's Desired Status to Stopped")
task.log.Errorf("Namespace Pause container not found in the task; Setting Task's Desired Status to Stopped")
task.SetDesiredStatus(apitaskstatus.TaskStopped)
return false, ""
}
Expand Down
27 changes: 10 additions & 17 deletions agent/engine/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
utilsync "github.com/aws/amazon-ecs-agent/agent/utils/sync"
"github.com/aws/amazon-ecs-agent/agent/utils/ttime"

"github.com/cihub/seelog"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -127,7 +126,7 @@ type managedTask struct {
*apitask.Task
ctx context.Context
cancel context.CancelFunc
log seelog.LoggerInterface
log logger.Contextual

engine *DockerTaskEngine
cfg *config.Config
Expand Down Expand Up @@ -167,18 +166,11 @@ type managedTask struct {
// This method must only be called when the engine.processTasks write lock is
// already held.
func (engine *DockerTaskEngine) newManagedTask(task *apitask.Task) *managedTask {
log := logger.InitLogger()
log.SetContext(map[string]string{
"taskARN": task.Arn,
"taskFamily": task.Family,
"taskVersion": task.Version,
})
ctx, cancel := context.WithCancel(engine.ctx)
t := &managedTask{
ctx: ctx,
cancel: cancel,
Task: task,
log: log,
acsMessages: make(chan acsTransition),
dockerMessages: make(chan dockerContainerChange),
resourceStateChangeEvent: make(chan resourceStateChange),
Expand All @@ -192,6 +184,11 @@ func (engine *DockerTaskEngine) newManagedTask(task *apitask.Task) *managedTask
taskStopWG: engine.taskStopGroup,
steadyStatePollInterval: engine.taskSteadyStatePollInterval,
}
t.log.SetContext(map[string]string{
"taskARN": task.Arn,
"taskFamily": task.Family,
"taskVersion": task.Version,
})
engine.managedTasks[task.Arn] = t
return t
}
Expand Down Expand Up @@ -502,8 +499,7 @@ func (mtask *managedTask) emitResourceChange(change resourceStateChange) {
func (mtask *managedTask) emitTaskEvent(task *apitask.Task, reason string) {
event, err := api.NewTaskStateChangeEvent(task, reason)
if err != nil {
mtask.log.Infof("unable to create task state change event [%s]: %v",
task.Arn, reason, err)
mtask.log.Infof("unable to create task state change event [%s]: %v", reason, err)
return
}
mtask.log.Infof("sending task change event [%s]", event.String())
Expand All @@ -516,8 +512,7 @@ func (mtask *managedTask) emitTaskEvent(task *apitask.Task, reason string) {
func (mtask *managedTask) emitContainerEvent(task *apitask.Task, cont *apicontainer.Container, reason string) {
event, err := api.NewContainerStateChangeEvent(task, cont, reason)
if err != nil {
mtask.log.Infof("unable to create state change event for container [%s]: %v",
task.Arn, cont.Name, err)
mtask.log.Infof("unable to create state change event for container [%s]: %v", cont.Name, err)
return
}

Expand Down Expand Up @@ -575,14 +570,12 @@ func (mtask *managedTask) releaseIPInIPAM() {
MinSupportedCNIVersion: config.DefaultMinSupportedCNIVersion,
})
if err != nil {
mtask.log.Errorf("failed to release ip; unable to build cni configuration: %v",
err)
mtask.log.Errorf("failed to release ip; unable to build cni configuration: %v", err)
return
}
err = mtask.cniClient.ReleaseIPResource(mtask.ctx, cfg, ipamCleanupTmeout)
if err != nil {
mtask.log.Errorf("failed to release ip; IPAM error: %v",
err)
mtask.log.Errorf("failed to release ip; IPAM error: %v", err)
return
}
}
Expand Down
119 changes: 119 additions & 0 deletions agent/logger/contextual_logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

package logger

import (
"path/filepath"
"runtime"

"github.com/cihub/seelog"
)

// Contextual is a logger that can have custom context added to it. Once
// SetContext is called, it will print log messages with the additional context
// appended. Before SetContext is called, it will print messages using the
// default agent logger.
type Contextual struct {
log seelog.LoggerInterface
context map[string]string
}

// Debugf formats message according to format specifier
// and writes to log with level = Debug.
func (c *Contextual) Debugf(format string, params ...interface{}) {
if c.log == nil {
seelog.Debugf(format, params...)
} else {
c.log.Debugf(format, params...)
}
}

// Infof formats message according to format specifier
// and writes to log with level = Info.
func (c *Contextual) Infof(format string, params ...interface{}) {
if c.log == nil {
seelog.Infof(format, params...)
} else {
c.log.Infof(format, params...)
}
}

// Warnf formats message according to format specifier
// and writes to log with level = Warn.
func (c *Contextual) Warnf(format string, params ...interface{}) error {
if c.log == nil {
return seelog.Warnf(format, params...)
} else {
return c.log.Warnf(format, params...)
}
}

// Errorf formats message according to format specifier
// and writes to log with level = Error.
func (c *Contextual) Errorf(format string, params ...interface{}) error {
if c.log == nil {
return seelog.Errorf(format, params...)
} else {
return c.log.Errorf(format, params...)
}
}

// Debug formats message using the default formats for its operands
// and writes to log with level = Debug
func (c *Contextual) Debug(v ...interface{}) {
if c.log == nil {
seelog.Debug(v...)
} else {
c.log.Debug(v...)
}
}

// Info formats message using the default formats for its operands
// and writes to log with level = Info
func (c *Contextual) Info(v ...interface{}) {
if c.log == nil {
seelog.Info(v...)
} else {
c.log.Info(v...)
}
}

// Warn formats message using the default formats for its operands
// and writes to log with level = Warn
func (c *Contextual) Warn(v ...interface{}) error {
if c.log == nil {
return seelog.Warn(v...)
} else {
return c.log.Warn(v...)
}
}

// Error formats message using the default formats for its operands
// and writes to log with level = Error
func (c *Contextual) Error(v ...interface{}) error {
if c.log == nil {
return seelog.Error(v...)
} else {
return c.log.Error(v...)
}
}

func (c *Contextual) SetContext(context map[string]string) {
if c.log == nil {
c.log = InitLogger()
_, f, _, _ := runtime.Caller(1)
context["module"] = filepath.Base(f)
c.log.SetContext(context)
}
}
43 changes: 26 additions & 17 deletions agent/logger/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,31 +55,40 @@ var Config *logConfig
func logfmtFormatter(params string) seelog.FormatterFunc {
return func(message string, level seelog.LogLevel, context seelog.LogContextInterface) interface{} {
cc, ok := context.CustomContext().(map[string]string)
var customContext string
if ok && len(cc) > 0 {
var sortedContext []string
for k, v := range cc {
sortedContext = append(sortedContext, k+"="+v)
}
sort.Strings(sortedContext)
customContext = " " + strings.Join(sortedContext, " ")
if !ok {
cc = map[string]string{}
}
return fmt.Sprintf(`level=%s time=%s msg=%q module=%s%s
`, level.String(), context.CallTime().UTC().Format(time.RFC3339), message, context.FileName(), customContext)
if _, ok = cc["module"]; !ok {
cc["module"] = context.FileName()
}

var ccStr string
var ccSorted []string
for k, v := range cc {
ccSorted = append(ccSorted, k+"="+v)
}
sort.Strings(ccSorted)
ccStr = " " + strings.Join(ccSorted, " ")
return fmt.Sprintf(`level=%s time=%s msg=%q%s
`, level.String(), context.CallTime().UTC().Format(time.RFC3339), message, ccStr)
}
}

func jsonFormatter(params string) seelog.FormatterFunc {
return func(message string, level seelog.LogLevel, context seelog.LogContextInterface) interface{} {
cc, ok := context.CustomContext().(map[string]string)
var customContext string
if ok && len(cc) > 0 {
for k, v := range cc {
customContext += fmt.Sprintf(", %q: %q", k, v)
}
if !ok {
cc = map[string]string{}
}
if _, ok = cc["module"]; !ok {
cc["module"] = context.FileName()
}
var ccStr string
for k, v := range cc {
ccStr += fmt.Sprintf(", %q: %q", k, v)
}
return fmt.Sprintf(`{"level": %q, "time": %q, "msg": %q, "module": %q%s}
`, level.String(), context.CallTime().UTC().Format(time.RFC3339), message, context.FileName(), customContext)
return fmt.Sprintf(`{"level": %q, "time": %q, "msg": %q%s}
`, level.String(), context.CallTime().UTC().Format(time.RFC3339), message, ccStr)
}
}

Expand Down
Loading

0 comments on commit 1a0c755

Please sign in to comment.