in cosmos-db-migration-utility/src/lambda/batch-request-reader/lambda_function.py [0:0]
def process_request(request):
namespace = request["namespace"]
cluster_name = request["cluster_name"]
payload = { "cluster_name": cluster_name, "namespace": namespace }
# check the status of the tracker.event_writer
status = get_tracker_value(cluster_name, "event_writer")
if status == "stop":
return {
"cluster_name": cluster_name,
"namespace": namespace,
"message": "Detected the event writer status as stop. Ignoring request for namespace"
}
data = {}
item = get_unprocessed_batch_id(cluster_name, namespace)
if item is None:
logger.info("All batches for cluster_name: %s. namespace: %s are processed. Waiting for others namespaces to catch up", cluster_name, namespace)
# processed em all. send delayed sqs request again
send_sqs_message("read-batch-request-queue", payload, MESSAGE_DELAY_SECONDS)
return {
"cluster_name": cluster_name,
"namespace": namespace,
"message": "Didn't find unprocessed batch. Retry after {} seconds".format(MESSAGE_DELAY_SECONDS)
}
data = []
if item["document_count"] == float(0):
logger.info("Zero documents were found in current batch: %f for cluster_name: %s. namespace: %s are processed. Waiting for others namespaces to catch up", item["batch_id"], cluster_name, namespace)
mark_processed_send_sqs(cluster_name, namespace, item, payload)
else:
data = get_data_from_s3(item["s3_link"])
# bulk write to document db
write_result = bulk_write_data_to_document_db(cluster_name, namespace, data)
if write_result:
mark_processed_send_sqs(cluster_name, namespace, item, payload)
else:
logger.fatal("Bulk write operation failed for some reason. Needs more analysis")
return {
"cluster_name": cluster_name,
"namespace": namespace,
"message": "Successfully imported batch {} containing {} items into namespace: {}.".format(item["batch_id"], len(data), namespace)
}