From 456db61190b59cca7179e307ce3f14290c93093e Mon Sep 17 00:00:00 2001 From: Fernando Llaca Date: Thu, 5 Dec 2019 17:36:26 +0100 Subject: [PATCH 1/4] Added snapshot of index before removing it. --- es-cleanup.py | 69 +++++++++++++++++++++++++++++++++++++++++- terraform/lambda.tf | 13 +++++--- terraform/variables.tf | 15 +++++++++ 3 files changed, 91 insertions(+), 6 deletions(-) diff --git a/es-cleanup.py b/es-cleanup.py index a2a8e4e..1167a9f 100644 --- a/es-cleanup.py +++ b/es-cleanup.py @@ -58,6 +58,10 @@ def __init__(self, event, context): self.cfg["index_format"] = self.get_parameter( "index_format", "%Y.%m.%d") + self.cfg["snapshot_enabled"] = bool(self.get_parameter("snapshot_enabled", False)) + self.cfg["snapshot_delete_after"] = int(self.get_parameter("snapshot_delete_after", 15)) + self.cfg["snapshot_repository"] = self.get_parameter("snapshot_repository", "es_snapshots") + if not self.cfg["es_endpoint"]: raise Exception("[es_endpoint] OS variable is not set") @@ -143,6 +147,37 @@ def delete_index(self, index_name): """ return self.send_to_es(index_name, "DELETE") + def snapshot_index(self, index_name, snapshot_repository): + """ES PUT specific index snapshot + + Args: + index_name (str): Index name + + Returns: + dict: ES answer + """ + + snapshot_payload = { + "indices": index_name, + "ignore_unavailable": True, + "include_global_state": False + } + + # create snapshot + snapshot = self.send_to_es("_snapshot/{}/{}".format(snapshot_repository, index_name), method="PUT", payload=snapshot_payload) + + # Wait for snapshot to be sucessful + retries = 0 + while retries < int(self.cfg["es_max_retry"]): + if retries > 0: + seconds = (5**retries) * .1 + time.sleep(seconds) + snapshots = self.get_snapshot(snapshot_repository, index_name) + if snapshots["snapshots"][0]["state"] == "SUCCESS": + break + return snapshot + + def get_indices(self): """ES Get indices @@ -151,6 +186,30 @@ def get_indices(self): """ return self.send_to_es("/_cat/indices") + def get_snapshots(self, snapshot_repository): + """ES Get snapshots of a repository + + Returns: + dict: ES answer + """ + return self.send_to_es("/_snapshot/{}/_all".format(snapshot_repository)) + + def get_snapshot(self, snapshot_repository, snapshot_name): + """ES Get snapshot + + Returns: + dict: ES answer + """ + return self.send_to_es("/_snapshot/{}/{}".format(snapshot_repository, snapshot_name)) + + def delete_snapshot(self, snapshot_repository, snapshot_name): + """ES Get snapshot + + Returns: + dict: ES answer + """ + return self.send_to_es("/_snapshot/{}/{}".format(snapshot_repository, snapshot_name), method="DELETE") + def lambda_handler(event, context): """Main Lambda function @@ -178,8 +237,15 @@ def lambda_handler(event, context): if re.search(es.cfg["index"], index["index"]): if idx_date <= earliest_to_keep.strftime(es.cfg["index_format"]): + # Create snapshot if enabled + if es.cfg["snapshot_enabled"]: + print("Creating snapshot for index: {} in repository {}".format(index["index"], es.cfg["snapshot_repository"])) + es.snapshot_index(index["index"], es.cfg["snapshot_repository"]) + + # Delete index print("Deleting index: {}".format(index["index"])) - es.delete_index(index["index"]) + # TODO: do not delete while testing the snapshot feature + # es.delete_index(index["index"]) else: print("Keeping index: {}".format(index["index"])) else: @@ -199,3 +265,4 @@ def lambda_handler(event, context): ['arn:aws:events:us-east-1:123456789012:rule/my-schedule'] } lambda_handler(event, "") + diff --git a/terraform/lambda.tf b/terraform/lambda.tf index f047d37..1dd35ae 100644 --- a/terraform/lambda.tf +++ b/terraform/lambda.tf @@ -26,11 +26,14 @@ resource "aws_lambda_function" "es_cleanup" { environment { variables = { - es_endpoint = var.es_endpoint - index = var.index - skip_index = var.skip_index - delete_after = var.delete_after - index_format = var.index_format + es_endpoint = var.es_endpoint + index = var.index + skip_index = var.skip_index + delete_after = var.delete_after + index_format = var.index_format + snapshot_enabled = var.snapshot_enabled + snapshot_repository = var.snapshot_repository + snapshot_delete_after = var.snapshot_delete_after } } diff --git a/terraform/variables.tf b/terraform/variables.tf index 0971a03..6cff97d 100644 --- a/terraform/variables.tf +++ b/terraform/variables.tf @@ -28,6 +28,21 @@ variable "delete_after" { default = 15 } +variable "snapshot_enabled" { + type = bool + default = false +} + +variable "snapshot_repository" { + type = string + default = "" +} + +variable "snapshot_delete_after" { + description = "Numbers of days to preserve snapshots" + default = 15 +} + variable "index_format" { description = "Combined with 'index' varible is used to evaluate the index age" default = "%Y.%m.%d" From 03ec523864d277f460c58aa36d65f148440fd181 Mon Sep 17 00:00:00 2001 From: Fernando Llaca Date: Thu, 5 Dec 2019 19:16:50 +0100 Subject: [PATCH 2/4] Implemented snapshots cleanup --- es-cleanup.py | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/es-cleanup.py b/es-cleanup.py index 1167a9f..09240ae 100644 --- a/es-cleanup.py +++ b/es-cleanup.py @@ -237,20 +237,36 @@ def lambda_handler(event, context): if re.search(es.cfg["index"], index["index"]): if idx_date <= earliest_to_keep.strftime(es.cfg["index_format"]): - # Create snapshot if enabled + # Create snapshot named as the index if snapshots are enabled if es.cfg["snapshot_enabled"]: print("Creating snapshot for index: {} in repository {}".format(index["index"], es.cfg["snapshot_repository"])) es.snapshot_index(index["index"], es.cfg["snapshot_repository"]) # Delete index print("Deleting index: {}".format(index["index"])) - # TODO: do not delete while testing the snapshot feature - # es.delete_index(index["index"]) + es.delete_index(index["index"]) else: print("Keeping index: {}".format(index["index"])) else: print("Index '{}' name '{}' did not match pattern '{}'".format(index["index"], idx_name, es.cfg["index"])) + if es.cfg["snapshot_enabled"]: + # Snapshot cutoff definition, remove older than this date + snapshot_earliest_to_keep = datetime.date.today() - datetime.timedelta( + days=int(es.cfg["snapshot_delete_after"])) + for snapshot in es.get_snapshots(es.cfg["snapshot_repository"])["snapshots"]: + snapshot_split = snapshot["snapshot"].rsplit("-", + 1 + es.cfg["index_format"].count("-")) + snapshot_name = snapshot_split[0] + snapshot_date = '-'.join(word for word in snapshot_split[1:]) + + if snapshot_date <= snapshot_earliest_to_keep.strftime(es.cfg["index_format"]): + # Delete snapshot + print("Deleting snapshot: {}".format(snapshot["snapshot"])) + es.delete_snapshot(es.cfg["snapshot_repository"], snapshot["snapshot"]) + else: + print("Keeping snapshot: {}".format(snapshot["snapshot"])) + if __name__ == '__main__': event = { From b5c231510ad04e058a9af14244e0a3e9adb8d033 Mon Sep 17 00:00:00 2001 From: Fernando Llaca Date: Wed, 8 Jan 2020 12:29:51 +0100 Subject: [PATCH 3/4] Use timestamp to deduplicate snapshots of same index --- es-cleanup.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/es-cleanup.py b/es-cleanup.py index 09240ae..866a2e3 100644 --- a/es-cleanup.py +++ b/es-cleanup.py @@ -163,8 +163,12 @@ def snapshot_index(self, index_name, snapshot_repository): "include_global_state": False } + # Append a timestamp to deduplicate multiple snapshots for same index + now = str(datetime.datetime.now()) + snapshot_name = "{}_{}".format(index_name, now) + # create snapshot - snapshot = self.send_to_es("_snapshot/{}/{}".format(snapshot_repository, index_name), method="PUT", payload=snapshot_payload) + snapshot = self.send_to_es("_snapshot/{}/{}".format(snapshot_repository, snapshot_name), method="PUT", payload=snapshot_payload) # Wait for snapshot to be sucessful retries = 0 @@ -172,7 +176,7 @@ def snapshot_index(self, index_name, snapshot_repository): if retries > 0: seconds = (5**retries) * .1 time.sleep(seconds) - snapshots = self.get_snapshot(snapshot_repository, index_name) + snapshots = self.get_snapshot(snapshot_repository, snapshot_name) if snapshots["snapshots"][0]["state"] == "SUCCESS": break return snapshot @@ -255,7 +259,10 @@ def lambda_handler(event, context): snapshot_earliest_to_keep = datetime.date.today() - datetime.timedelta( days=int(es.cfg["snapshot_delete_after"])) for snapshot in es.get_snapshots(es.cfg["snapshot_repository"])["snapshots"]: - snapshot_split = snapshot["snapshot"].rsplit("-", + # split by "-", ignoring the timestamp part of the snapshot name: + timestamp_pos=snapshot["snapshot"].rfind("_") + timestamp_pos=timestamp_pos+1 if timestamp_pos >= 0 else len(snapshot["snapshot"]) + snapshot_split = snapshot["snapshot"][:timestamp_pos].rsplit("-", 1 + es.cfg["index_format"].count("-")) snapshot_name = snapshot_split[0] snapshot_date = '-'.join(word for word in snapshot_split[1:]) From addd79a7d3bb6812bd54100fb58a3c89757d59db Mon Sep 17 00:00:00 2001 From: Fernando Llaca Date: Wed, 8 Jan 2020 17:03:31 +0100 Subject: [PATCH 4/4] Fix timestamp format --- es-cleanup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/es-cleanup.py b/es-cleanup.py index 866a2e3..7d306b0 100644 --- a/es-cleanup.py +++ b/es-cleanup.py @@ -164,7 +164,7 @@ def snapshot_index(self, index_name, snapshot_repository): } # Append a timestamp to deduplicate multiple snapshots for same index - now = str(datetime.datetime.now()) + now = str(datetime.datetime.now().timestamp()) snapshot_name = "{}_{}".format(index_name, now) # create snapshot