in backend/lambdas/tasks/generate_queries.py [0:0]
def handler(event, context):
job_id = event["ExecutionName"]
deletion_items = get_deletion_queue()
manifests_partitions = []
data_mappers = get_data_mappers()
total_queries = 0
for data_mapper in data_mappers:
query_executor = data_mapper["QueryExecutor"]
if query_executor == "athena":
queries = generate_athena_queries(data_mapper, deletion_items, job_id)
if len(queries) > 0:
manifests_partitions.append([job_id, data_mapper["DataMapperId"]])
else:
raise NotImplementedError(
"Unsupported data mapper query executor: '{}'".format(query_executor)
)
batch_sqs_msgs(queue, queries)
total_queries += len(queries)
write_partitions(manifests_partitions)
return {
"GeneratedQueries": total_queries,
"DeletionQueueSize": len(deletion_items),
"Manifests": [
"s3://{}/{}".format(
manifests_bucket_name,
MANIFEST_KEY.format(
job_id=partition_tuple[0], data_mapper_id=partition_tuple[1]
),
)
for partition_tuple in manifests_partitions
],
}