Skip to content

Commit

Permalink
Support structured logs from lambda (requires latest TF AWS provider)
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesJJ committed Dec 23, 2023
1 parent cfa462c commit ec9e516
Showing 1 changed file with 32 additions and 13 deletions.
45 changes: 32 additions & 13 deletions src/lambda.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import logging
from pythonjsonlogger import jsonlogger
import urllib.parse
import botocore
import boto3
Expand All @@ -10,21 +12,34 @@
import time
import json

# JSON Logs
logger = logging.getLogger()
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
logHandler.setFormatter(formatter)
logger.addHandler(logHandler)

# Logging Levels
boto3.set_stream_logger(name="botocore", level=logging.WARN)
boto3.set_stream_logger(name="boto3", level=logging.WARN)


def logjson(metric, message):
v = message if (type(message) == dict) else {"msg": "{0}".format(message)}
print(json.dumps({"metric": metric} | v, separators=(",", ":")))
v = message if (type(message) == dict) else {"text": "{0}".format(message)}
logger.info(metric, extra=message)


def logerror(when, error):
logjson("error", {"msg": str(error), "when": str(when)})
logging.error("error", extra={"text": str(error), "when": str(when)})


def create_log_stream(log_group_name, log_stream_name):
for attempt in range(2):
try:
logjson("create_log_stream", {"attempt": attempt})
logs.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name)
logs.create_log_stream(
logGroupName=log_group_name, logStreamName=log_stream_name
)
return True
except (
botocore.exceptions.SSOError,
Expand All @@ -41,7 +56,9 @@ def create_log_stream(log_group_name, log_stream_name):

def extract_timestamp(line):
split_line = line.split(sep="\t")
t = datetime.strptime("{} {}".format(split_line[0], split_line[1]), "%Y-%m-%d %H:%M:%S").timestamp()
t = datetime.strptime(
"{} {}".format(split_line[0], split_line[1]), "%Y-%m-%d %H:%M:%S"
).timestamp()
time_ms = int(float(t) * 1000)
return time_ms

Expand Down Expand Up @@ -82,7 +99,9 @@ def cfl_data_to_cwl(data):
continue
if earliest_event <= 0:
earliest_event = line_timestamp
if batch_at_limits(len(records), batch_bytes + line_bytes, line_timestamp - earliest_event):
if batch_at_limits(
len(records), batch_bytes + line_bytes, line_timestamp - earliest_event
):
logjson("put_batch", {"count": len(records), "data_size": batch_bytes})
sequence_token = put_records_to_cwl(records, sequence_token)
records = []
Expand Down Expand Up @@ -143,7 +162,9 @@ def put_records_to_cwl(records, outgoing_sequence_token):
if "sequenceToken" in put_log_events_kwargs:
del put_log_events_kwargs["sequenceToken"]
if "expectedSequenceToken" in e.response:
put_log_events_kwargs["sequenceToken"] = e.response["expectedSequenceToken"]
put_log_events_kwargs["sequenceToken"] = e.response[
"expectedSequenceToken"
]
logjson(
"put_batch_retry",
{"msg": "sequence token fixed", "attempt": attempt},
Expand All @@ -152,7 +173,7 @@ def put_records_to_cwl(records, outgoing_sequence_token):
# unexpected, so log & raise
logerror("put log events", e)
raise e
except (logs.exceptions.DataAlreadyAcceptedException) as e:
except logs.exceptions.DataAlreadyAcceptedException as e:
if e.response["Error"]["Code"] == "DataAlreadyAcceptedException":
logjson(
"put_batch_already_accepted",
Expand All @@ -179,10 +200,11 @@ def put_records_to_cwl(records, outgoing_sequence_token):


def lambda_handler(event, context):

# Get our S3 bucket and key from the event context, URL decode the keyname
bucket = event["Records"][0]["s3"]["bucket"]["name"]
key = urllib.parse.unquote_plus(event["Records"][0]["s3"]["object"]["key"], encoding="utf-8")
key = urllib.parse.unquote_plus(
event["Records"][0]["s3"]["object"]["key"], encoding="utf-8"
)

# Get S3 object
logjson("s3_get", {"bucket": bucket, "key": key})
Expand Down Expand Up @@ -248,9 +270,6 @@ def lambda_handler(event, context):
sequence_token = None
log_stream_created = False

# Debug logging
# boto3.set_stream_logger(name='botocore')

# re-use this lambda instance's log stream name in our target log group
log_stream_name = os.getenv(
"AWS_LAMBDA_LOG_STREAM_NAME",
Expand Down

0 comments on commit ec9e516

Please sign in to comment.