in 5-app-infra/3-artifact-publish/docker/cdmc/tag_engine_api/main.py [0:0]
def copy_tags():
json_request = request.get_json(force=True)
print('json request: ', json_request)
status, response, tag_creator_sa = do_authentication(request.headers, json_request, ENABLE_AUTH)
if status == False:
return jsonify(response), 400
if 'source_project' in json_request:
source_project = json_request['source_project']
else:
response = {
"status": "error",
"message": "Request JSON is missing a source_project parameter",
}
return jsonify(response), 400
if 'source_dataset' in json_request:
source_dataset = json_request['source_dataset']
else:
response = {
"status": "error",
"message": "Request JSON is missing a source_dataset parameter",
}
return jsonify(response), 400
if 'source_table' in json_request:
source_table = json_request['source_table']
else:
response = {
"status": "error",
"message": "Request JSON is missing a source_table parameter",
}
return jsonify(response), 400
if 'target_project' in json_request:
target_project = json_request['target_project']
else:
response = {
"status": "error",
"message": "Request JSON is missing a target_project parameter",
}
return jsonify(response), 400
if 'target_dataset' in json_request:
target_dataset = json_request['target_dataset']
else:
response = {
"status": "error",
"message": "Request JSON is missing a target_dataset parameter",
}
return jsonify(response), 400
if 'target_table' in json_request:
target_table = json_request['target_table']
else:
response = {
"status": "error",
"message": "Request JSON is missing a target_table parameter",
}
return jsonify(response), 400
credentials, success = get_target_credentials(tag_creator_sa)
if success == False:
print('Error acquiring credentials from', tag_creator_sa)
dcc = controller.DataCatalogController(credentials)
success = dcc.copy_tags(source_project, source_dataset, source_table, target_project, target_dataset, target_table)
if success:
response = {"status": "success"}
else:
response = {"status": "failure"}
return jsonify(response)