in api/batch_processing/api_core/server.py [0:0]
def request_detections():
"""
Checks that the input parameters to this endpoint are valid, starts a thread
to launch the batch processing job, and return the job_id/request_id to the user.
"""
if not request.is_json:
msg = 'Body needs to have a JSON mimetype (e.g., application/json).'
return make_error(415, msg)
try:
post_body = request.get_json()
except Exception as e:
return make_error(415, f'Error occurred reading POST request body: {e}.')
app.logger.info(f'server, request_detections, post_body: {post_body}')
# required params
caller_id = post_body.get('caller', None)
if caller_id is None or caller_id not in app_config.get_allowlist():
msg = ('Parameter caller is not supplied or is not on our allowlist. '
'Please email cameratraps@microsoft.com to request access.')
return make_error(401, msg)
use_url = post_body.get('use_url', False)
if use_url and isinstance(use_url, str): # in case it is included but is intended to be False
if use_url.lower() in ['false', 'f', 'no', 'n']:
use_url = False
input_container_sas = post_body.get('input_container_sas', None)
if not input_container_sas and not use_url:
msg = ('input_container_sas with read and list access is a required '
'field when not using image URLs.')
return make_error(400, msg)
if input_container_sas is not None:
if not sas_blob_utils.is_container_uri(input_container_sas):
return make_error(400, 'input_container_sas provided is not for a container.')
result = check_data_container_sas(input_container_sas)
if result is not None:
return make_error(result[0], result[1])
# can be an URL to a file not hosted in an Azure blob storage container
images_requested_json_sas = post_body.get('images_requested_json_sas', None)
if images_requested_json_sas is not None:
if not images_requested_json_sas.startswith(('http://', 'https://')):
return make_error(400, 'images_requested_json_sas needs to be an URL.')
# if use_url, then images_requested_json_sas is required
if use_url and images_requested_json_sas is None:
return make_error(400, 'images_requested_json_sas is required since use_url is true.')
# optional params
# check model_version is among the available model versions
model_version = post_body.get('model_version', '')
if model_version != '':
model_version = str(model_version) # in case user used an int
if model_version not in api_config.MD_VERSIONS_TO_REL_PATH: # TODO use AppConfig to store model version info
return make_error(400, f'model_version {model_version} is not supported.')
# check request_name has only allowed characters
request_name = post_body.get('request_name', '')
if request_name != '':
if len(request_name) > 92:
return make_error(400, 'request_name is longer than 92 characters.')
allowed = set(string.ascii_letters + string.digits + '_' + '-')
if not set(request_name) <= allowed:
msg = ('request_name contains invalid characters (only letters, '
'digits, - and _ are allowed).')
return make_error(400, msg)
# optional params for telemetry collection - logged to status table for now as part of call_params
country = post_body.get('country', None)
organization_name = post_body.get('organization_name', None)
# All API instances / node pools share a quota on total number of active Jobs;
# we cannot accept new Job submissions if we are at the quota
try:
num_active_jobs = batch_job_manager.get_num_active_jobs()
if num_active_jobs >= api_config.MAX_BATCH_ACCOUNT_ACTIVE_JOBS:
return make_error(503, f'Too many active jobs, please try again later')
except Exception as e:
return make_error(500, f'Error checking number of active jobs: {e}')
try:
job_id = uuid.uuid4().hex
job_status_table.create_job_status(
job_id=job_id,
status= get_job_status('created', 'Request received. Listing images next...'),
call_params=post_body
)
except Exception as e:
return make_error(500, f'Error creating a job status entry: {e}')
try:
thread = threading.Thread(
target=create_batch_job,
name=f'job_{job_id}',
kwargs={'job_id': job_id, 'body': post_body}
)
thread.start()
except Exception as e:
return make_error(500, f'Error creating or starting the batch processing thread: {e}')
return {'request_id': job_id}