in backend/lambdas/tasks/generate_queries.py [0:0]
def generate_athena_queries(data_mapper, deletion_items, job_id):
"""
For each Data Mapper, it generates a list of parameters needed for each
query execution. The matches for the given column are saved in an external
S3 object (aka manifest) to allow its size to grow into the thousands without
incurring in DDB Document size limit, SQS message size limit, or Athena query
size limit. The manifest S3 Path is finally referenced as part of the SQS message.
"""
manifest_key = MANIFEST_KEY.format(
job_id=job_id, data_mapper_id=data_mapper["DataMapperId"]
)
db = data_mapper["QueryExecutorParameters"]["Database"]
table_name = data_mapper["QueryExecutorParameters"]["Table"]
table = get_table(db, table_name)
all_partition_keys = [p["Name"] for p in table.get("PartitionKeys", [])]
partition_keys = data_mapper["QueryExecutorParameters"].get(
"PartitionKeys", all_partition_keys
)
columns = [c for c in data_mapper["Columns"]]
msg = {
"DataMapperId": data_mapper["DataMapperId"],
"QueryExecutor": data_mapper["QueryExecutor"],
"Format": data_mapper["Format"],
"Database": db,
"Table": table_name,
"Columns": columns,
"PartitionKeys": [],
"DeleteOldVersions": data_mapper.get("DeleteOldVersions", True),
"IgnoreObjectNotFoundExceptions": data_mapper.get(
"IgnoreObjectNotFoundExceptions", False
),
}
if data_mapper.get("RoleArn", None):
msg["RoleArn"] = data_mapper["RoleArn"]
# Workout which deletion items should be included in this query
applicable_match_ids = [
item
for item in deletion_items
if msg["DataMapperId"] in item.get("DataMappers", [])
or len(item.get("DataMappers", [])) == 0
]
if len(applicable_match_ids) == 0:
return []
# Compile a list of MatchIds grouped by Column
columns_with_matches = {}
manifest = ""
for item in applicable_match_ids:
mid, item_id, item_createdat = itemgetter(
"MatchId", "DeletionQueueItemId", "CreatedAt"
)(item)
is_simple = not isinstance(mid, list)
if is_simple:
for column in msg["Columns"]:
casted = cast_to_type(mid, column, table)
if column not in columns_with_matches:
columns_with_matches[column] = {
"Column": column,
"Type": "Simple",
}
manifest += build_manifest_row(
[column], casted, item_id, item_createdat
)
else:
sorted_mid = sorted(mid, key=lambda x: x["Column"])
query_columns = list(map(lambda x: x["Column"], sorted_mid))
column_key = COMPOSITE_JOIN_TOKEN.join(query_columns)
composite_match = list(
map(lambda x: cast_to_type(x["Value"], x["Column"], table), sorted_mid)
)
if column_key not in columns_with_matches:
columns_with_matches[column_key] = {
"Columns": query_columns,
"Type": "Composite",
}
manifest += build_manifest_row(
query_columns, composite_match, item_id, item_createdat
)
s3.Bucket(manifests_bucket_name).put_object(Body=manifest, Key=manifest_key)
msg["Columns"] = list(columns_with_matches.values())
msg["Manifest"] = "s3://{}/{}".format(manifests_bucket_name, manifest_key)
if len(partition_keys) == 0:
return [msg]
# For every partition combo of every table, create a query
partitions = set()
for partition in get_partitions(db, table_name):
current = tuple(
(all_partition_keys[i], cast_to_type(v, all_partition_keys[i], table, True))
for i, v in enumerate(partition["Values"])
if all_partition_keys[i] in partition_keys
)
partitions.add(current)
ret = []
for current in partitions:
current_dict = [{"Key": k, "Value": v} for k, v in current]
ret.append({**msg, "PartitionKeys": current_dict})
return ret