in source/code/actions/ec2_replace_instance_action.py [0:0]
def is_completed(self, start_data):
def task_is_triggered_by_tag_event():
task_change_events = self._events_.get(handlers.ec2_tag_event_handler.EC2_TAG_EVENT_SOURCE, {}).get(
handlers.TAG_CHANGE_EVENT, [])
return handlers.ec2_tag_event_handler.EC2_CHANGED_INSTANCE_TAGS_EVENT in task_change_events
def set_tags_on_new_instance(new_instance_type, original_type):
# tags copied from replaced instance
copied_tags_filter_str = self.get(PARAM_COPIED_INSTANCE_TAGS, "*")
copied_tags_filter = TagFilterSet(copied_tags_filter_str)
tags = copied_tags_filter.pairs_matching_any_filter(start_data.get("source-tags", {}))
# tags set by action
tags.update(self.build_tags_from_template(parameter_name=PARAM_NEW_INSTANCE_TAGS,
tag_variables={
TAG_PLACEHOLDER_NEW_INSTANCE_TYPE: new_instance_type,
TAG_PLACEHOLDER_ORG_INSTANCE_TYPE: original_type,
TAG_PLACEHOLDER_ORG_INSTANCE_ID: self.instance_id
}))
try:
# if task is triggered by tagging event
if task_is_triggered_by_tag_event():
# up or down tags filters should not match new tags as it would re-trigger execution of the task
if self.replace_mode == REPLACE_BY_STEP:
for t in list(tags.keys()):
# remove tags that match up or down tag filters
if (self.scale_up_tagfilter and t in self.scale_up_tagfilter.get_filter_keys()) or \
(self.scale_down_tagfilter and t in self.scale_down_tagfilter.get_filter_keys()):
self._logger_.info(INF_TAGS_NOT_SET_STEP.format({t: tags[t]}, self.instance_id))
del tags[t]
else:
# new tags should not match the tag filter or task name should not be in task list
if self._tagfilter_ is not None:
# check again tag filter if any
check_filter = TagFilterExpression(self._tagfilter_)
for t in list(tags.keys()):
if t in check_filter.get_filter_keys():
self._logger_.info(INF_TAGS_NOT_SET_TYPE.format({t: tags[t]}, self.instance_id))
del tags[t]
else:
# check if name of the task is not in the new task list
tag_list_tag_name = os.getenv(handlers.ENV_AUTOMATOR_TAG_NAME)
for t in list(tags.keys()):
if t == tag_list_tag_name and self._task_ in tagging.split_task_list(tags[t]):
self._logger_.info(INF_TAGS_NOT_SET_TYPE.format({t: tags[t]}, self.instance_id))
del tags[t]
tagging.set_ec2_tags(ec2_client=self.ec2_client,
resource_ids=[self.new_instance_id],
logger=self._logger_,
tags=tags)
except Exception as tag_ex:
raise_exception(ERR_SET_TAGS, self.new_instance_id, tag_ex)
def get_scoped_elb_data(registrations):
result = {}
if registrations not in [{}, None]:
result = self._get_elb_data({}, [e["LoadBalancerName"] for e in registrations if e["Version"] == 1])
result = self._get_elbv2_data(result, [e["TargetGroupArn"] for e in registrations if e["Version"] == 2])
return result
elb_registrations = start_data.get("elb-registrations")
current_elb_data = get_scoped_elb_data(elb_registrations)
if start_data.get("not-replaced", False):
return self.result
# test if there any registrations left for replaced instance
instance_registrations = current_elb_data.get(self.instance_id)
if instance_registrations is not None:
self._logger_.info(INF_WAIT_DEREGISTER_LOAD_BALANCER, self.instance_id, self.load_balancing_str(instance_registrations))
return None
# get current state of instance
self.new_instance_id = start_data["new-instance"]
instance = self._get_instance(self.new_instance_id)
self._logger_.debug("Instance data is {}", safe_json(instance, indent=3))
state_code = instance["State"]["Code"] & 0xFF
# new instance is running, done...
if state_code == EC2_STATE_RUNNING:
# instance is running
self._logger_.info(INF_INSTANCE_RUNNING, self.new_instance_id)
if elb_registrations is not None and len(elb_registrations) != len(current_elb_data.get(self.new_instance_id, [])):
self._logger_.info(INF_WAIT_REGISTER_LOAD_BALANCER, self.new_instance_id)
return None
set_tags_on_new_instance(instance["InstanceType"], start_data.get("org-instance-type", ""))
self._logger_.info(INF_TERMINATING_INSTANCE, self.instance_id)
try:
self.ec2_client.terminate_instances_with_retries(InstanceIds=[self.instance_id])
except Exception as ex:
self._logger_.error(ERR_TERMINATING_INSTANCE, self.instance_id, ex)
for s in ["source-tags", "not-replaced"]:
if s in self.result:
del self.result[s]
return self.result
# in pending state, wait for next completion check
if state_code == EC2_STATE_PENDING:
return None
raise_exception(ERR_STARTING_NEW_INSTANCE, self.new_instance_id, safe_json(instance, indent=3))