analysis/webservice/webapp_livy.py (65 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import configparser
import logging
import sys
import os
import pkg_resources
from webservice import nexus_tornado
from .nexus_tornado.options import define, options, parse_command_line
from webservice.NexusLivyHandler import LivyHandler
class RunFileHandler(nexus_tornado.web.RequestHandler):
_id = 0
def __init__(self, *args, **kwargs):
self._lh = kwargs.pop('lh', None)
super(RunFileHandler, self).__init__(*args, **kwargs)
def post(self):
self._upload_file = self.request.files['file'][0]
upload_fname = 'upload_'+str(RunFileHandler._id)+'.py'
while os.path.exists(upload_fname):
RunFileHandler._id += 1
upload_fname = 'upload_'+str(RunFileHandler._id)+'.py'
RunFileHandler._id += 1
with open(upload_fname, 'w') as f:
f.write(self._upload_file['body'])
try:
ans = self._lh.exec_file(upload_fname)
except Exception as e:
ans = str(e)
self.write(str(ans))
class RunStrHandler(nexus_tornado.web.RequestHandler):
def __init__(self, *args, **kwargs):
self._lh = kwargs.pop('lh', None)
super(RunStrHandler, self).__init__(*args, **kwargs)
def post(self):
self._upload_str = self.request.body
ans = self._lh.exec_str(self._upload_str)
self.write(str(ans))
if __name__ == "__main__":
# Configure logger.
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout)
log = logging.getLogger(__name__)
# Configure tornado.
webconfig = configparser.RawConfigParser()
webconfig.readfp(pkg_resources.resource_stream(__name__, "config/web.ini"), filename='web.ini')
define("debug", default=False, help="run in debug mode")
define("port", default=webconfig.get("livy", "server.socket_port"), help="run on the given port", type=int)
define("address", default=webconfig.get("livy", "server.socket_host"), help="Bind to the given address")
parse_command_line()
log.info("Initializing on host address '%s'" % options.address)
log.info("Initializing on port '%s'" % options.port)
log.info("Starting web server in debug mode: %s" % options.debug)
# Start up Livy Spark session.
livy_host = webconfig.get("livy", "livy_host")
livy_port = webconfig.get("livy", "livy_port")
livy_url = 'http://' + livy_host + ':' + livy_port
lh = LivyHandler(host=livy_url)
# Define tornado job handlers
handlers = []
handlers.append((r"/run_file", RunFileHandler, dict(lh=lh)))
handlers.append((r"/run_str", RunStrHandler, dict(lh=lh)))
# Start listening for job requests.
app = nexus_tornado.web.Application(
handlers,
default_host=options.address,
debug=options.debug
)
app.listen(options.port)
log.info("Started HTTP listener...")
nexus_tornado.ioloop.IOLoop.current().start()