def save_log()

in src/ab/plugins/calculate/spark.py [0:0]


    def save_log(self):
        task_mapper = get_mapper('_task')

        # wait until app created & spark_app_id recorded
        # todo wait & notify
        while not current_app_id:
            time.sleep(0.1)
        while task_mapper.count(conditions={'spark_app_id': current_app_id}) == 0:
            time.sleep(0.1)

        while True:
            line = self.stdout.readline()
            if line.strip():
                # log as soon as possible, for debug
                logger.debug(line.strip())

            if '[Stage' in line:
                continue

            app_id = current_app_id
            if app_id is not None and line:
                for t in chunk_string(line, 1000):
                    task_mapper.update(row={'log': concat(task_mapper.table.c.log, t)},
                                       conditions={'spark_app_id': app_id})

            # proc.poll() returns the retcode of subprocess
            # A None value indicates that the process hasn’t terminated yet
            rc = self.poll()
            if line == '' and rc is not None:
                logger.error('spark gateway exited with return code: {rc}'.format(rc=rc))
                return