in python-api/src/main/python/livy/job_handle.py [0:0]
def _poll_result(self):
def do_poll_result():
try:
suffix_url = "/" + str(self._session_id) + "/jobs/" + \
str(self._job_id)
job_status = self._conn.send_request('GET', suffix_url,
headers={'Accept': 'application/json'}).json()
job_state = job_status['state']
job_result = None
has_finished = False
job_error = None
if job_state == 'SUCCEEDED':
job_result = job_status['result']
has_finished = True
elif job_state == 'FAILED':
job_error = job_status['error']
has_finished = True
elif job_state == 'CANCELLED':
repeated_timer.stop()
else:
pass
if has_finished:
if job_result is not None:
b64_decoded = base64.b64decode(job_result)
b64_decoded_decoded = base64.b64decode(b64_decoded)
deserialized_object = cloudpickle.loads(
b64_decoded_decoded)
super(JobHandle, self).set_result(deserialized_object)
if job_error is not None:
self.set_job_exception(Exception(job_error))
repeated_timer.stop()
else:
self._update_state(job_state)
except Exception as err:
repeated_timer.stop()
traceback.print_exc()
self.set_job_exception(err, traceback.format_exc())
repeated_timer = self._RepeatedTimer(self._JOB_INITIAL_POLL_INTERVAL,
do_poll_result, self._executor)
repeated_timer.start()