in aws/lambda/github-webhook-rds-sync/utils.py [0:0]
def get_column(key: str, value: Any, type_name: str) -> Column:
"""
If the key is present for the webhook type 'type_name' in the hardcoded
TYPE_MAP, use it. Otherwise, guess the type based on the value's runtime
type.
"""
if isinstance(value, dict):
raise RuntimeError(f"Value cannot be a dict: {key}: {value}")
if key in TYPE_MAP.get(type_name, {}):
return TYPE_MAP[type_name][key]()
if key in NAME_MAP:
return NAME_MAP[key][1]()
if is_date(key, value):
return Column(DateTime)
if isinstance(value, str):
return Column(String(max(30, len(value) * 10)))
if isinstance(value, int):
return Column(Integer)
if isinstance(value, bool):
return Column(Boolean)
if isinstance(value, list):
return Column(JSON)
else:
# Don't error out immediately, but bubble this up so we can report all
# errors at once later
# breakpoint()
if key.endswith("_node_id"):
return get_column(key[: -len("_node_id")], value, type_name)
# raise RuntimeError()
return None