analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py (67 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import tornado.gen
import tornado.ioloop
from webservice.nexus_tornado.request.renderers import NexusRendererFactory
from webservice.webmodel import NexusRequestObjectTornadoFree, NexusRequestObject, NexusProcessingException
from webservice.algorithms_spark.NexusCalcSparkTornadoHandler import NexusCalcSparkTornadoHandler
class NexusRequestHandler(tornado.web.RequestHandler):
def initialize(self, thread_pool, clazz=None, **kargs):
self.logger = logging.getLogger('nexus')
self.executor = thread_pool
self.__clazz = clazz
self._clazz_init_args = kargs # 'algorithm_config', 'sc' for spark handler
@tornado.gen.coroutine
def get(self):
self.logger.info("Received request %s" % self._request_summary())
# temporary hack to use a NexusRequestObject without tornado request references
# this object only supports timeAvgMapSpark yet.
# Will be extended to replace the historical object in the next pull request related to ticket SDAP-252
# if self.request.path == '/timeAvgMapSpark':
# request = NexusRequestObjectTornadoFree(self)
# else:
request = NexusRequestObject(self)
# create NexusCalcHandler which will process the request
instance = self.__clazz(**self._clazz_init_args)
io_loop = tornado.ioloop.IOLoop.current()
try:
if isinstance(instance, NexusCalcSparkTornadoHandler):
results = instance.calc(request, io_loop)
else:
results = yield io_loop.run_in_executor(
self.executor,
instance.calc,
request
)
try:
self.set_status(results.status_code)
except AttributeError:
pass
# Only render results if there are results to render.
# "NexusCalcSparkTornadoHandler" endpoints redirectm so no
# need to render.
if not isinstance(instance, NexusCalcSparkTornadoHandler):
renderer = NexusRendererFactory.get_renderer(request)
renderer.render(self, results)
except NexusProcessingException as e:
self.async_onerror_callback(e.reason, e.code)
except Exception as e:
self.async_onerror_callback(str(e), 500)
@tornado.gen.coroutine
def post(self):
self.logger.info("Received %s" % self._request_summary())
request = NexusRequestObject(self)
# create NexusCalcHandler which will process the request
instance = self.__clazz(**self._clazz_init_args)
try:
# process the request asynchronously on a different thread,
# the current tornado handler is still available to get other user requests
results = yield tornado.ioloop.IOLoop.current().run_in_executor(self.executor, instance.calc, request)
if results:
try:
self.set_status(results.status_code)
except AttributeError:
pass
renderer = NexusRendererFactory.get_renderer(request)
renderer.render(self, results)
except NexusProcessingException as e:
self.async_onerror_callback(e.reason, e.code)
except Exception as e:
self.async_onerror_callback(str(e), 500)
def async_onerror_callback(self, reason, code=500):
self.logger.error("Error processing request", exc_info=True)
self.set_header("Content-Type", "application/json")
self.set_status(code)
response = {
"error": reason,
"code": code
}
self.write(json.dumps(response, indent=5))
self.finish()