diff --git a/es-cleanup.py b/es-cleanup.py index a2a8e4e..7d306b0 100644 --- a/es-cleanup.py +++ b/es-cleanup.py @@ -58,6 +58,10 @@ def __init__(self, event, context): self.cfg["index_format"] = self.get_parameter( "index_format", "%Y.%m.%d") + self.cfg["snapshot_enabled"] = bool(self.get_parameter("snapshot_enabled", False)) + self.cfg["snapshot_delete_after"] = int(self.get_parameter("snapshot_delete_after", 15)) + self.cfg["snapshot_repository"] = self.get_parameter("snapshot_repository", "es_snapshots") + if not self.cfg["es_endpoint"]: raise Exception("[es_endpoint] OS variable is not set") @@ -143,6 +147,41 @@ def delete_index(self, index_name): """ return self.send_to_es(index_name, "DELETE") + def snapshot_index(self, index_name, snapshot_repository): + """ES PUT specific index snapshot + + Args: + index_name (str): Index name + + Returns: + dict: ES answer + """ + + snapshot_payload = { + "indices": index_name, + "ignore_unavailable": True, + "include_global_state": False + } + + # Append a timestamp to deduplicate multiple snapshots for same index + now = str(datetime.datetime.now().timestamp()) + snapshot_name = "{}_{}".format(index_name, now) + + # create snapshot + snapshot = self.send_to_es("_snapshot/{}/{}".format(snapshot_repository, snapshot_name), method="PUT", payload=snapshot_payload) + + # Wait for snapshot to be sucessful + retries = 0 + while retries < int(self.cfg["es_max_retry"]): + if retries > 0: + seconds = (5**retries) * .1 + time.sleep(seconds) + snapshots = self.get_snapshot(snapshot_repository, snapshot_name) + if snapshots["snapshots"][0]["state"] == "SUCCESS": + break + return snapshot + + def get_indices(self): """ES Get indices @@ -151,6 +190,30 @@ def get_indices(self): """ return self.send_to_es("/_cat/indices") + def get_snapshots(self, snapshot_repository): + """ES Get snapshots of a repository + + Returns: + dict: ES answer + """ + return self.send_to_es("/_snapshot/{}/_all".format(snapshot_repository)) + + def get_snapshot(self, snapshot_repository, snapshot_name): + """ES Get snapshot + + Returns: + dict: ES answer + """ + return self.send_to_es("/_snapshot/{}/{}".format(snapshot_repository, snapshot_name)) + + def delete_snapshot(self, snapshot_repository, snapshot_name): + """ES Get snapshot + + Returns: + dict: ES answer + """ + return self.send_to_es("/_snapshot/{}/{}".format(snapshot_repository, snapshot_name), method="DELETE") + def lambda_handler(event, context): """Main Lambda function @@ -178,6 +241,12 @@ def lambda_handler(event, context): if re.search(es.cfg["index"], index["index"]): if idx_date <= earliest_to_keep.strftime(es.cfg["index_format"]): + # Create snapshot named as the index if snapshots are enabled + if es.cfg["snapshot_enabled"]: + print("Creating snapshot for index: {} in repository {}".format(index["index"], es.cfg["snapshot_repository"])) + es.snapshot_index(index["index"], es.cfg["snapshot_repository"]) + + # Delete index print("Deleting index: {}".format(index["index"])) es.delete_index(index["index"]) else: @@ -185,6 +254,26 @@ def lambda_handler(event, context): else: print("Index '{}' name '{}' did not match pattern '{}'".format(index["index"], idx_name, es.cfg["index"])) + if es.cfg["snapshot_enabled"]: + # Snapshot cutoff definition, remove older than this date + snapshot_earliest_to_keep = datetime.date.today() - datetime.timedelta( + days=int(es.cfg["snapshot_delete_after"])) + for snapshot in es.get_snapshots(es.cfg["snapshot_repository"])["snapshots"]: + # split by "-", ignoring the timestamp part of the snapshot name: + timestamp_pos=snapshot["snapshot"].rfind("_") + timestamp_pos=timestamp_pos+1 if timestamp_pos >= 0 else len(snapshot["snapshot"]) + snapshot_split = snapshot["snapshot"][:timestamp_pos].rsplit("-", + 1 + es.cfg["index_format"].count("-")) + snapshot_name = snapshot_split[0] + snapshot_date = '-'.join(word for word in snapshot_split[1:]) + + if snapshot_date <= snapshot_earliest_to_keep.strftime(es.cfg["index_format"]): + # Delete snapshot + print("Deleting snapshot: {}".format(snapshot["snapshot"])) + es.delete_snapshot(es.cfg["snapshot_repository"], snapshot["snapshot"]) + else: + print("Keeping snapshot: {}".format(snapshot["snapshot"])) + if __name__ == '__main__': event = { @@ -199,3 +288,4 @@ def lambda_handler(event, context): ['arn:aws:events:us-east-1:123456789012:rule/my-schedule'] } lambda_handler(event, "") + diff --git a/terraform/lambda.tf b/terraform/lambda.tf index f047d37..1dd35ae 100644 --- a/terraform/lambda.tf +++ b/terraform/lambda.tf @@ -26,11 +26,14 @@ resource "aws_lambda_function" "es_cleanup" { environment { variables = { - es_endpoint = var.es_endpoint - index = var.index - skip_index = var.skip_index - delete_after = var.delete_after - index_format = var.index_format + es_endpoint = var.es_endpoint + index = var.index + skip_index = var.skip_index + delete_after = var.delete_after + index_format = var.index_format + snapshot_enabled = var.snapshot_enabled + snapshot_repository = var.snapshot_repository + snapshot_delete_after = var.snapshot_delete_after } } diff --git a/terraform/variables.tf b/terraform/variables.tf index 0971a03..6cff97d 100644 --- a/terraform/variables.tf +++ b/terraform/variables.tf @@ -28,6 +28,21 @@ variable "delete_after" { default = 15 } +variable "snapshot_enabled" { + type = bool + default = false +} + +variable "snapshot_repository" { + type = string + default = "" +} + +variable "snapshot_delete_after" { + description = "Numbers of days to preserve snapshots" + default = 15 +} + variable "index_format" { description = "Combined with 'index' varible is used to evaluate the index age" default = "%Y.%m.%d"