in sdap/ningesterpy.py [0:0]
def run_processor_chain():
try:
parameters = request.get_json()
except Exception as e:
raise BadRequest("Invalid JSON data") from e
try:
processor_list = parameters['processor_list']
except (KeyError, TypeError):
raise BadRequest(description="processor_list is required.")
try:
chain = ProcessorChain(processor_list)
except ProcessorNotFound as e:
raise BadRequest("Unknown processor requested: %s" % e.missing_processor) from e
except MissingProcessorArguments as e:
raise BadRequest(
"%s missing required configuration options: %s" % (e.processor, e.missing_processor_args)) from e
try:
input_data = json_format.Parse(parameters['input_data'], nexusproto.NexusTile())
except ParseError as e:
raise BadRequest("input_data must be a NexusTile protobuf serialized as a string") from e
result = next(chain.process(input_data), None)
if isinstance(result, nexusproto.NexusTile):
result = result.SerializeToString()
return Response(result, mimetype='application/octet-stream')