rostran/handlers/basic.py (144 lines of code) (raw):
import json
##############################
# Common Handlers
##############################
def json_load(content, resolved=False):
if not content or not resolved:
return None
return json.loads(content)
def tags_dict_to_list(tags_dict, resolved=False):
"""
Convert dictionary-formatted tags to list format.
"""
if not tags_dict:
return None
if resolved:
return [{"Key": k, "Value": v} for k, v in tags_dict.items()]
return {
"Fn::Jq": [
"First",
'to_entries | map({"Key": .key, "Value": .value})',
tags_dict,
]
}
def tags_list_to_dict(tags_list, resolved=False):
"""
Convert list-formatted tags to dict format.
"""
if not tags_list:
return None
if resolved:
return [{tag["Key"]: tag["Value"]} for tag in tags_list]
return {
"Fn::Jq": [
"First",
"map({(.Key): .Value})|add",
tags_list,
]
}
def to_boolean(data, resolved=False):
"""
Convert data to boolean.
"""
if data is None or not resolved:
return None
return bool(data)
def to_int(data, resolved=False):
"""
Convert data to integer.
"""
if data is None or not resolved:
return None
try:
return int(data)
except ValueError:
return data
def kv_list_to_map(kv_list, k_name, v_name):
"""
Convert a list containing dictionaries with key-value pairs into a dictionary.
For example,
[{"parameter_key": "Name", "parameter_value": "test"}] will be converted to
{"Name": "test"}.
"""
if not kv_list:
return None
return {e[k_name]: e[v_name] for e in kv_list}
def select_first(resource_ids, resolved=False):
"""
Select the first ID from the list of resource IDs.
"""
if not resource_ids:
return None
if resolved:
return resource_ids[0]
return {"Fn::Select": ["0", resource_ids]}
def kv_list_to_map_wrapper(k_name, v_name):
def kv_list_to_map(kv_list, resolved=False):
"""
Convert a list containing dictionaries with key-value pairs into a dictionary.
For example,
[{"parameter_key": "Name", "parameter_value": "test"}] will be converted to
{"Name": "test"}.
"""
if not kv_list or not resolved:
return None
return {e[k_name]: e[v_name] for e in kv_list}
return kv_list_to_map
def join_wrapper(splitter=":"):
def join(items, resolved=False):
if not items:
return None
if resolved:
return splitter.join(items)
return {"Fn::Join": [splitter, items]}
return join
colon_join = join_wrapper(":")
slash_join = join_wrapper("/")
comma_join = join_wrapper(",")
def replace_wrapper(old, new):
def replace(string, resolved=False):
if not string:
return None
if resolved:
return string.replace(old, new)
return {"Fn::Replace": [{old: new}, string]}
return replace
replace_slash_to_colon = replace_wrapper("/", ":")
def jq_to_string(data, resolved=False):
return [". | tostring", data]
##############################
# Product Handlers
##############################
def slb_x_forwarded_for(x_forwarded_for, resolved=False):
if not x_forwarded_for or not resolved:
return None
if isinstance(x_forwarded_for, list):
x_forwarded_for = x_forwarded_for[0]
if not isinstance(x_forwarded_for, dict):
return
mapping = {
"retrive_slb_ip": "XForwardedFor_SLBIP",
"retrive_slb_id": "XForwardedFor_SLBID",
"retrive_slb_proto": "XForwardedFor_proto",
}
props = {}
for k1, k2 in mapping.items():
v = x_forwarded_for.get(k1)
if v:
props[k2] = "on"
if props:
props.update({"XForwardedFor": "on"})
return props
ros_parameters = kv_list_to_map_wrapper(
k_name="parameter_key", v_name="parameter_value"
)
def ots_search_index_sorters(sorters, resolved=False):
if not sorters or not resolved:
return None
mapping = {
"FieldSort": {
"mode": "SortMode",
"order": "SortOrder",
"field_name": "FieldName",
},
"PrimaryKeySort": {
"order": "SortOrder",
},
"ScoreSort": {
"order": "SortOrder",
},
"GeoDistanceSort": {
"mode": "SortMode",
"order": "SortOrder",
"field_name": "FieldName",
},
}
result = []
for sorter in sorters:
sorter_type = sorter.get("sorter_type") or "PrimaryKeySort"
sorter_type_mapping = mapping.get(sorter_type)
if not sorter_type_mapping:
continue
new_sorter = {}
for from_, to in sorter_type_mapping.items():
val = sorter.get(from_)
if val is not None:
new_sorter[to] = val
if new_sorter:
result.append({sorter_type: new_sorter})
return result
def ec2_network_interface_ipv6_addresses(ipv6_addresses, resolved=False):
if not ipv6_addresses or not resolved:
return None
return [addr["Ipv6Address"] for addr in ipv6_addresses]
def ec2_network_interface_private_addresses(private_addresses, resolved=False):
if not private_addresses or not resolved:
return None
return [addr["PrivateIpAddress"] for addr in private_addresses]