in src/ab/plugins/calculate/spark.py [0:0]
def save_log(self):
task_mapper = get_mapper('_task')
# wait until app created & spark_app_id recorded
# todo wait & notify
while not current_app_id:
time.sleep(0.1)
while task_mapper.count(conditions={'spark_app_id': current_app_id}) == 0:
time.sleep(0.1)
while True:
line = self.stdout.readline()
if line.strip():
# log as soon as possible, for debug
logger.debug(line.strip())
if '[Stage' in line:
continue
app_id = current_app_id
if app_id is not None and line:
for t in chunk_string(line, 1000):
task_mapper.update(row={'log': concat(task_mapper.table.c.log, t)},
conditions={'spark_app_id': app_id})
# proc.poll() returns the retcode of subprocess
# A None value indicates that the process hasn’t terminated yet
rc = self.poll()
if line == '' and rc is not None:
logger.error('spark gateway exited with return code: {rc}'.format(rc=rc))
return