Skip to content

Commit

Permalink
ec2_instance - Apply retry decorators more consistently. (#373)
Browse files Browse the repository at this point in the history
* ec2_instance: build results inside find_instances and add backoff

* Add retry decorator to ec2 clients
  • Loading branch information
tremble authored Feb 3, 2021
1 parent b7afd18 commit bbb4707
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 42 deletions.
2 changes: 2 additions & 0 deletions changelogs/fragments/373-ec2_instance-retry-pagination.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
minor_changes:
- ec2_instance - add automatic retries on all paginated queries for temporary errors (https://github.com/ansible-collections/community.aws/pull/373).
89 changes: 47 additions & 42 deletions plugins/modules/ec2_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,6 @@ def tower_callback_script(tower_conf, windows=False, passwd=None):
raise NotImplementedError("Only windows with remote-prep or non-windows with tower job callback supported so far.")


@AWSRetry.jittered_backoff()
def manage_tags(match, new_tags, purge_tags, ec2):
changed = False
old_tags = boto3_tag_list_to_ansible_dict(match['Tags'])
Expand All @@ -896,12 +895,14 @@ def manage_tags(match, new_tags, purge_tags, ec2):
return bool(tags_to_delete or tags_to_set)
if tags_to_set:
ec2.create_tags(
aws_retry=True,
Resources=[match['InstanceId']],
Tags=ansible_dict_to_boto3_tag_list(tags_to_set))
changed |= True
if tags_to_delete:
delete_with_current_values = dict((k, old_tags.get(k)) for k in tags_to_delete)
ec2.delete_tags(
aws_retry=True,
Resources=[match['InstanceId']],
Tags=ansible_dict_to_boto3_tag_list(delete_with_current_values))
changed |= True
Expand Down Expand Up @@ -929,14 +930,17 @@ def add_or_update_instance_profile(instance, desired_profile_name):
if instance_profile_setting.get('Arn') == desired_arn:
return False
# update association
ec2 = module.client('ec2')
ec2 = module.client('ec2', retry_decorator=AWSRetry.jittered_backoff())
try:
association = ec2.describe_iam_instance_profile_associations(Filters=[{'Name': 'instance-id', 'Values': [instance['InstanceId']]}])
association = ec2.describe_iam_instance_profile_associations(
aws_retry=True,
Filters=[{'Name': 'instance-id', 'Values': [instance['InstanceId']]}])
except botocore.exceptions.ClientError as e:
# check for InvalidAssociationID.NotFound
module.fail_json_aws(e, "Could not find instance profile association")
try:
resp = ec2.replace_iam_instance_profile_association(
aws_retry=True,
AssociationId=association['IamInstanceProfileAssociations'][0]['AssociationId'],
IamInstanceProfile={'Arn': determine_iam_role(desired_profile_name)}
)
Expand All @@ -946,9 +950,10 @@ def add_or_update_instance_profile(instance, desired_profile_name):

if not instance_profile_setting and desired_profile_name:
# create association
ec2 = module.client('ec2')
ec2 = module.client('ec2', retry_decorator=AWSRetry.jittered_backoff())
try:
resp = ec2.associate_iam_instance_profile(
aws_retry=True,
IamInstanceProfile={'Arn': determine_iam_role(desired_profile_name)},
InstanceId=instance['InstanceId']
)
Expand Down Expand Up @@ -989,7 +994,7 @@ def build_network_spec(params, ec2=None):
},
"""
if ec2 is None:
ec2 = module.client('ec2')
ec2 = module.client('ec2', retry_decorator=AWSRetry.jittered_backoff())

interfaces = []
network = params.get('network') or {}
Expand Down Expand Up @@ -1109,11 +1114,11 @@ def warn_if_cpu_options_changed(instance):

def discover_security_groups(group, groups, parent_vpc_id=None, subnet_id=None, ec2=None):
if ec2 is None:
ec2 = module.client('ec2')
ec2 = module.client('ec2', retry_decorator=AWSRetry.jittered_backoff())

if subnet_id is not None:
try:
sub = ec2.describe_subnets(SubnetIds=[subnet_id])
sub = ec2.describe_subnets(aws_retry=True, SubnetIds=[subnet_id])
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'InvalidGroup.NotFound':
module.fail_json(
Expand Down Expand Up @@ -1168,14 +1173,17 @@ def discover_security_groups(group, groups, parent_vpc_id=None, subnet_id=None,
found_groups = []
for f_set in (id_filters, name_filters):
if len(f_set) > 1:
found_groups.extend(ec2.get_paginator(
'describe_security_groups'
).paginate(
Filters=f_set
).search('SecurityGroups[]'))
found_groups.extend(describe_security_groups(ec2, Filters=f_set))
return list(dict((g['GroupId'], g) for g in found_groups).values())


@AWSRetry.jittered_backoff()
def describe_security_groups(ec2, **params):
paginator = ec2.get_paginator('describe_security_groups')
results = paginator.paginate(**params)
return list(results.search('SecurityGroups[]'))


def build_top_level_options(params):
spec = {}
if params.get('image_id'):
Expand Down Expand Up @@ -1257,7 +1265,7 @@ def build_instance_tags(params, propagate_tags_to_volumes=True):

def build_run_instance_spec(params, ec2=None):
if ec2 is None:
ec2 = module.client('ec2')
ec2 = module.client('ec2', retry_decorator=AWSRetry.jittered_backoff())

spec = dict(
ClientToken=uuid.uuid4().hex,
Expand Down Expand Up @@ -1296,7 +1304,7 @@ def await_instances(ids, state='OK'):
}
if state not in state_opts:
module.fail_json(msg="Cannot wait for state {0}, invalid state".format(state))
waiter = module.client('ec2').get_waiter(state_opts[state])
waiter = module.client('ec2', retry_decorator=AWSRetry.jittered_backoff()).get_waiter(state_opts[state])
try:
waiter.wait(
InstanceIds=ids,
Expand All @@ -1316,7 +1324,7 @@ def await_instances(ids, state='OK'):
def diff_instance_and_params(instance, params, ec2=None, skip=None):
"""boto3 instance obj, module params"""
if ec2 is None:
ec2 = module.client('ec2')
ec2 = module.client('ec2', retry_decorator=AWSRetry.jittered_backoff())

if skip is None:
skip = []
Expand All @@ -1342,7 +1350,7 @@ def value_wrapper(v):
if mapping.instance_key in skip:
continue

value = AWSRetry.jittered_backoff()(ec2.describe_instance_attribute)(Attribute=mapping.attribute_name, InstanceId=id_)
value = ec2.describe_instance_attribute(aws_retry=True, Attribute=mapping.attribute_name, InstanceId=id_)
if value[mapping.instance_key]['Value'] != params.get(mapping.param_key):
arguments = dict(
InstanceId=instance['InstanceId'],
Expand All @@ -1352,7 +1360,7 @@ def value_wrapper(v):
changes_to_apply.append(arguments)

if params.get('security_group') or params.get('security_groups'):
value = AWSRetry.jittered_backoff()(ec2.describe_instance_attribute)(Attribute="groupSet", InstanceId=id_)
value = ec2.describe_instance_attribute(aws_retry=True, Attribute="groupSet", InstanceId=id_)
# managing security groups
if params.get('vpc_subnet_id'):
subnet_id = params.get('vpc_subnet_id')
Expand Down Expand Up @@ -1404,6 +1412,7 @@ def change_network_attachments(instance, params, ec2):
to_attach = set(new_ids) - set(old_ids)
for eni_id in to_attach:
ec2.attach_network_interface(
aws_retry=True,
DeviceIndex=new_ids.index(eni_id),
InstanceId=instance['InstanceId'],
NetworkInterfaceId=eni_id,
Expand All @@ -1412,35 +1421,35 @@ def change_network_attachments(instance, params, ec2):
return False


@AWSRetry.jittered_backoff()
def find_instances(ec2, ids=None, filters=None):
paginator = ec2.get_paginator('describe_instances')
if ids:
return list(paginator.paginate(
InstanceIds=ids,
).search('Reservations[].Instances[]'))
params = dict(InstanceIds=ids)
elif filters is None:
module.fail_json(msg="No filters provided when they were required")
elif filters is not None:
else:
for key in list(filters.keys()):
if not key.startswith("tag:"):
filters[key.replace("_", "-")] = filters.pop(key)
return list(paginator.paginate(
Filters=ansible_dict_to_boto3_filter_list(filters)
).search('Reservations[].Instances[]'))
return []
params = dict(Filters=ansible_dict_to_boto3_filter_list(filters))

results = paginator.paginate(**params).search('Reservations[].Instances[]')
return list(results)


@AWSRetry.jittered_backoff()
def get_default_vpc(ec2):
vpcs = ec2.describe_vpcs(Filters=ansible_dict_to_boto3_filter_list({'isDefault': 'true'}))
vpcs = ec2.describe_vpcs(
aws_retry=True,
Filters=ansible_dict_to_boto3_filter_list({'isDefault': 'true'}))
if len(vpcs.get('Vpcs', [])):
return vpcs.get('Vpcs')[0]
return None


@AWSRetry.jittered_backoff()
def get_default_subnet(ec2, vpc, availability_zone=None):
subnets = ec2.describe_subnets(
aws_retry=True,
Filters=ansible_dict_to_boto3_filter_list({
'vpc-id': vpc['VpcId'],
'state': 'available',
Expand All @@ -1462,7 +1471,7 @@ def get_default_subnet(ec2, vpc, availability_zone=None):

def ensure_instance_state(state, ec2=None):
if ec2 is None:
module.client('ec2')
module.client('ec2', retry_decorator=AWSRetry.jittered_backoff())
if state in ('running', 'started'):
changed, failed, instances, failure_reason = change_instance_state(filters=module.params.get('filters'), desired_state='RUNNING')

Expand Down Expand Up @@ -1537,11 +1546,10 @@ def ensure_instance_state(state, ec2=None):
)


@AWSRetry.jittered_backoff()
def change_instance_state(filters, desired_state, ec2=None):
"""Takes STOPPED/RUNNING/TERMINATED"""
if ec2 is None:
ec2 = module.client('ec2')
ec2 = module.client('ec2', retry_decorator=AWSRetry.jittered_backoff())

changed = set()
instances = find_instances(ec2, filters=filters)
Expand All @@ -1558,7 +1566,7 @@ def change_instance_state(filters, desired_state, ec2=None):

# TODO use a client-token to prevent double-sends of these start/stop/terminate commands
# https://docs.aws.amazon.com/AWSEC2/latest/APIReference/Run_Instance_Idempotency.html
resp = ec2.terminate_instances(InstanceIds=[inst['InstanceId']])
resp = ec2.terminate_instances(aws_retry=True, InstanceIds=[inst['InstanceId']])
[changed.add(i['InstanceId']) for i in resp['TerminatingInstances']]
if desired_state == 'STOPPED':
if inst['State']['Name'] in ('stopping', 'stopped'):
Expand All @@ -1569,14 +1577,14 @@ def change_instance_state(filters, desired_state, ec2=None):
changed.add(inst['InstanceId'])
continue

resp = ec2.stop_instances(InstanceIds=[inst['InstanceId']])
resp = ec2.stop_instances(aws_retry=True, InstanceIds=[inst['InstanceId']])
[changed.add(i['InstanceId']) for i in resp['StoppingInstances']]
if desired_state == 'RUNNING':
if module.check_mode:
changed.add(inst['InstanceId'])
continue

resp = ec2.start_instances(InstanceIds=[inst['InstanceId']])
resp = ec2.start_instances(aws_retry=True, InstanceIds=[inst['InstanceId']])
[changed.add(i['InstanceId']) for i in resp['StartingInstances']]
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e:
try:
Expand Down Expand Up @@ -1625,7 +1633,7 @@ def handle_existing(existing_matches, changed, ec2, state):
)
changes = diff_instance_and_params(existing_matches[0], module.params)
for c in changes:
AWSRetry.jittered_backoff()(ec2.modify_instance_attribute)(**c)
ec2.modify_instance_attribute(aws_retry=True, **c)
changed |= bool(changes)
changed |= add_or_update_instance_profile(existing_matches[0], module.params.get('instance_role'))
changed |= change_network_attachments(existing_matches[0], module.params, ec2)
Expand Down Expand Up @@ -1664,7 +1672,7 @@ def ensure_present(existing_matches, changed, ec2, state):
changes = diff_instance_and_params(ins, module.params, skip=['UserData', 'EbsOptimized'])
for c in changes:
try:
AWSRetry.jittered_backoff()(ec2.modify_instance_attribute)(**c)
ec2.modify_instance_attribute(aws_retry=True, **c)
except botocore.exceptions.ClientError as e:
module.fail_json_aws(e, msg="Could not apply change {0} to new instance.".format(str(c)))

Expand All @@ -1675,9 +1683,7 @@ def ensure_present(existing_matches, changed, ec2, state):
spec=instance_spec,
)
await_instances(instance_ids)
instances = ec2.get_paginator('describe_instances').paginate(
InstanceIds=instance_ids
).search('Reservations[].Instances[]')
instances = find_instances(ec2, ids=instance_ids)

module.exit_json(
changed=True,
Expand All @@ -1689,10 +1695,9 @@ def ensure_present(existing_matches, changed, ec2, state):
module.fail_json_aws(e, msg="Failed to create new EC2 instance")


@AWSRetry.jittered_backoff()
def run_instances(ec2, **instance_spec):
try:
return ec2.run_instances(**instance_spec)
return ec2.run_instances(aws_retry=True, **instance_spec)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'InvalidParameterValue' and "Invalid IAM Instance Profile ARN" in e.response['Error']['Message']:
# If the instance profile has just been created, it takes some time to be visible by ec2
Expand Down Expand Up @@ -1762,7 +1767,7 @@ def main():
module.fail_json(msg="Parameter network.interfaces can't be used with security_groups")

state = module.params.get('state')
ec2 = module.client('ec2')
ec2 = module.client('ec2', retry_decorator=AWSRetry.jittered_backoff())
if module.params.get('filters') is None:
filters = {
# all states except shutting-down and terminated
Expand Down

0 comments on commit bbb4707

Please sign in to comment.