analysis/webservice/NexusLivyHandler.py (78 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import time, json, requests, textwrap from importlib import import_module from os import environ from os.path import basename, splitext, abspath from livy.client import HttpClient class LivyHandler: def __init__(self, host='http://localhost:8998'): self._headers = {'Content-Type': 'application/json'} if host is not None: self.create_pyspark_session(host) def _wait_for_state(self, url, desired_state): while True: r = requests.get(url, headers=self._headers) if r.json()['state'] == desired_state: break time.sleep(1) return r def create_pyspark_session(self, host): self._host = host data = {'kind': 'pyspark'} # Create a Spark session print('Creating Spark session...') r = requests.post(host + '/sessions', data=json.dumps(data), headers=self._headers) # Wait until the new Spark session is ready to use self._session_url = host + r.headers['location'] r = self._wait_for_state(self._session_url, 'idle') # Create client for Livy batch jobs self._lc = HttpClient(self._session_url) def exec_str (self, code): print('Submitting code...') statements_url = self._session_url + '/statements' data = {'code': code} r = requests.post(statements_url, data=json.dumps(data), headers=self._headers) # Wait until the code completes print('Running code...') status_url = self._host + r.headers['location'] r = self._wait_for_state(status_url, 'available') output = r.json()['output'] print('output=',output) if output['status'] == 'error': ans = {'text/plain': output['traceback']} else: ans = {'text/plain': [output['data']['text/plain']]} return ans def exec_file(self, py_uri): py_uri_abs = abspath(py_uri) self._lc.upload_pyfile(py_uri_abs) m = splitext(basename(py_uri_abs))[0] try: m_imp = import_module(m) except ImportError: raise def upload_pyfile_job(jc): return m_imp.main(jc.sc) return self._lc.submit(upload_pyfile_job).result() def close(self): print('Closing Spark session...') requests.delete(self._session_url, headers=self._headers) def main(): try: livy_host = environ['LIVY_HOST'] except: livy_host = "http://localhost:8998" print('Using Livy at {}'.format(livy_host)) lh = LivyHandler(host=livy_host) # Run some pyspark code. code = textwrap.dedent(""" 1 + 1 """) ans = lh.exec_str(code) print('The answer is {}'.format(ans)) # Run some more pyspark code. code = textwrap.dedent(""" import random NUM_SAMPLES = 100000 def sample(p): x, y = random.random(), random.random() return 1 if x*x + y*y < 1 else 0 count = sc.parallelize(xrange(0, NUM_SAMPLES)).map(sample).reduce(lambda a, b: a + b) print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES) """) ans = lh.exec_str(code) print('The answer is {}'.format(ans)) # Run a batch job py_uri = 'test_code_nexus_laptop.py' print('Submitting batch job from {}'.format(py_uri)) ans = lh.exec_file(py_uri) print('The answer is {}'.format(ans)) # Close the Spark session. lh.close() if __name__ == "__main__": main()