in source/code/tagging/__init__.py [0:0]
def build_tags_from_template(tags_str,
task, task_id,
timezone="UTC",
account=None,
region=None,
tag_variables=None,
restricted_value_set=False,
include_deleted_tags=True):
tag_vars = {} if tag_variables is None else copy.copy(tag_variables)
tz = timezone if timezone not in ["", None] else "UTC"
dt = date_time_provider().now(tz=pytz.timezone(tz))
dt = dt.replace(microsecond=0)
# variables used in tag names/values
tag_vars.update({
TAG_VAL_ACCOUNT: account if account is not None else "",
TAG_VAL_AUTOMATOR_STACK: os.getenv(handlers.ENV_STACK_NAME, ""),
TAG_VAL_DATE: "{:0>4d}{:0>2d}{:0>2d}".format(dt.year, dt.month, dt.day),
TAG_VAL_DATE_TIME: "{:0>4d}{:0>2d}{:0>2d}{:0>2d}{:0>2d}{:0>2d}".format(dt.year, dt.month, dt.day, dt.hour, dt.minute,
dt.second),
TAG_VAL_DAY: "{:0>2d}".format(dt.day),
TAG_VAL_HOUR: "{:0>2d}".format(dt.hour),
TAG_VAL_ISO_DATE: dt.date().isoformat(),
TAG_VAL_ISO_DATETIME: dt.isoformat(),
TAG_VAL_ISO_TIME: dt.time().isoformat(),
TAG_VAL_ISO_WEEKDAY: dt.isoweekday(),
TAG_VAL_MINUTE: "{:0>2d}".format(dt.minute),
TAG_VAL_MONTH: "{:0>2d}".format(dt.month),
TAG_VAL_MONTH_NAME: dt.strftime("%b"),
TAG_VAL_MONTH_NAME_LONG: dt.strftime("%B"),
TAG_VAL_REGION: region if region is not None else "",
TAG_VAL_SECOND: "{:0>2d}".format(dt.second),
TAG_VAL_TASK_TAG: os.getenv(handlers.ENV_AUTOMATOR_TAG_NAME),
TAG_VAL_TASK: task,
TAG_VAL_TASK_ID: task_id,
TAG_VAL_TIME: "{:0>2d}{:0>2d}{:0>2d}".format(dt.hour, dt.minute, dt.second),
TAG_VAL_TIMEZONE: dt.tzname(),
TAG_VAL_WEEKDAY: dt.strftime("%a"),
TAG_VAL_WEEKDAY_LONG: dt.strftime("%A"),
TAG_VAL_YEAR: "{:0>4d}".format(dt.year)
})
# get ssm parameter values and add to variables
names = re.findall("{ssm:(.+?)\}", tags_str)
if len(names) > 0:
resp = boto3.client("ssm").get_parameters(Names=list(set(names)))
for p in resp.get("Parameters", []):
tag_vars["ssm:{}".format(p["Name"])] = p["Value"].split(",") if p["Type"] == "StringList" else p["Value"]
# variables as strings
for v in list(tag_vars.keys()):
if tag_vars[v] is None:
value = ""
elif isinstance(tag_vars[v], list):
value = ",".join(tag_vars[v])
elif isinstance(tag_vars[v], dict):
value = safe_json(tag_vars[v])
else:
value = str(tag_vars[v])
tag_vars[v] = value
# build tag names with unprocessed values
lastkey = None
tags = {}
for t in tags_str.split(","):
t = t.strip()
if "=" in t:
t = t.partition("=")
key = t[0].strip()
for v in tag_vars:
key = key.replace(TAG_VAL_STR.format(v), tag_vars[v])
tags[key] = t[2].strip()
lastkey = key
elif lastkey is not None:
tags[lastkey] = ",".join([tags[lastkey], t])
# process values
for t in tags:
if tags[t] not in ["", None]:
for v in tag_vars:
tags[t] = tags[t].replace(TAG_VAL_STR.format(v), tag_vars[v])
else:
if tags[t] is None:
del tags[t]
if restricted_value_set:
clean_tag_set(tags)
if not include_deleted_tags:
for t in list(tags.keys()):
if tags[t] == TAG_DELETE:
del tags[t]
return tags