Skip to content

Commit

Permalink
Merge pull request #21 from rapid7/use-zap-for-logging-in-v2-s3-watcher
Browse files Browse the repository at this point in the history
Use zap instead of logrus for the logger in v2 S3 watcher
  • Loading branch information
asebastian-r7 authored Oct 3, 2019
2 parents 2b4f83c + 8ceea51 commit 8924296
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 25 deletions.
42 changes: 33 additions & 9 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

94 changes: 78 additions & 16 deletions watchers/v2/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package s3
import (
"encoding/json"
"io/ioutil"
"os"
"reflect"
"regexp"
"strings"
Expand All @@ -17,7 +16,8 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"

log "github.com/sirupsen/logrus"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/rapid7/cps/pkg/index"
"github.com/rapid7/cps/pkg/kv"
Expand All @@ -38,6 +38,8 @@ var (
Config config
isJSON = regexp.MustCompile(".json$")
mu = sync.Mutex{}

log *zap.Logger
)

type config struct {
Expand All @@ -46,8 +48,24 @@ type config struct {
}

func init() {
log.SetFormatter(&log.JSONFormatter{})
log.SetOutput(os.Stdout)
log, _ = zap.Config{
Encoding: "json",
Level: zap.NewAtomicLevelAt(zapcore.DebugLevel),
OutputPaths: []string{"stderr"},
ErrorOutputPaths: []string{"stderr"},
EncoderConfig: zapcore.EncoderConfig{
MessageKey: "message",

LevelKey: "level",
EncodeLevel: zapcore.CapitalLevelEncoder,

TimeKey: "time",
EncodeTime: zapcore.ISO8601TimeEncoder,

CallerKey: "caller",
EncodeCaller: zapcore.ShortCallerEncoder,
},
}.Build()
}

// Poll polls every 60 seconds, kicking off an S3 sync.
Expand Down Expand Up @@ -78,15 +96,20 @@ func Poll(bucket, bucketRegion string) {
// AWS session, lists all items in the bucket, finally
// parsing all files and putting them in the kv store.
func Sync() {
log.Print("s3 sync begun")
log.Info("s3 sync begun")

bucket := Config.bucket
region := Config.bucketRegion

svc := setUpAwsSession(region)
resp, err := listBucket(bucket, region, svc)
if err != nil {
log.Error(err)
log.Error("Failed to list bucket",
zap.Error(err),
zap.String("bucket", bucket),
zap.String("region", region),
)

return
}

Expand All @@ -99,7 +122,7 @@ func Sync() {
Up = true
Health = true

log.Print("S3 sync finished")
log.Info("S3 sync finished")
}

func setUpAwsSession(region string) s3iface.S3API {
Expand Down Expand Up @@ -130,8 +153,15 @@ func listBucket(bucket, region string, svc s3iface.S3API) ([]*s3.ListObjectsOutp

resp, err := svc.ListObjects(params)
if err != nil {
log.Errorf("Error listing s3 objects %v:", err)
log.Error("Error listing s3 objects:",
zap.Error(err),
zap.String("bucket", bucket),
zap.String("region", region),
zap.String("prefix", prefix),
)

Health = false

return nil, err
}

Expand Down Expand Up @@ -168,7 +198,12 @@ func getPropertyFiles(files []string, b string, svc s3iface.S3API) error {
serviceProperties := make(map[string]interface{})
err := json.Unmarshal(body, &serviceProperties)
if err != nil {
log.Errorf("There was an error unmarshalling properties for %v: %v", serviceName, err)
log.Error("There was an error unmarshalling properties",
zap.Error(err),
zap.String("service_name", serviceName),
zap.String("file", f),
)

return err
}

Expand All @@ -177,11 +212,14 @@ func getPropertyFiles(files []string, b string, svc s3iface.S3API) error {

s, err := injectSecrets(services)
if err != nil {
log.Errorf("%v", err)
log.Error("There was an error injecting secrets",
zap.Error(err),
zap.Any("services", services),
)

return err
}

log.Debugf("All services found: %v", s)
for k, v := range s {
serviceBytes, _ := json.Marshal(v)
kv.WriteProperty(k, serviceBytes)
Expand All @@ -207,7 +245,11 @@ func injectSecrets(data interface{}) (map[string]interface{}, error) {
secretBytes, _ := json.Marshal(d.MapIndex(k).Interface())
s, err := secret.GetSSMSecret(k.String(), secretBytes)
if err != nil {
log.Error(err)
log.Error("Failed to get secret from SSM",
zap.Error(err),
zap.String("key", k.String()),
)

return nil, err
}

Expand Down Expand Up @@ -263,25 +305,45 @@ func getFile(k, b string, svc s3iface.S3API) ([]byte, error) {

if err != nil {
if aerr, ok := err.(awserr.Error); ok && aerr.Code() == request.CanceledErrorCode {
log.Errorf("Download canceled due to timeout %v\n", err)
log.Error("Download canceled due to timeout",
zap.Error(err),
zap.String("key", k),
zap.String("bucket", b),
)

Health = false

return nil, err
}

log.Errorf("Failed to download object: %v", err)
log.Error("Failed to download object",
zap.Error(err),
zap.String("key", k),
zap.String("bucket", b),
)

Health = false

return nil, err
}

body, err = ioutil.ReadAll(result.Body)
defer result.Body.Close()
if err != nil {
log.Errorf("Failure to read body: %v\n", err)
log.Error("Failure to read body:",
zap.Error(err),
zap.String("key", k),
zap.String("bucket", b),
)

Health = false

return nil, err
}
} else {
log.Printf("Skipping: %v.\n", k)
log.Info("Skipping key",
zap.String("key", k),
)
}

return body, nil
Expand Down

0 comments on commit 8924296

Please sign in to comment.