def check_and_generate_next_task()

in aliyun/log/consumer/shard_worker.py [0:0]


    def check_and_generate_next_task(self):
        """
        check if the previous task is done and proceed to fire another task
        :return:
        """
        # if self.task_future is None:
        #     # there's no any ongoing task
        #     self._update_status(False)
        #     self.generate_next_task()
        #     return

        if self.task_future is None or self.task_future.done():
            task_success = False

            task_result = self.get_task_result(self.task_future)
            self.task_future = None

            if task_result is not None and task_result.get_exception() is None:

                task_success = True
                if isinstance(task_result, InitTaskResult):
                    # maintain check points
                    assert self.consumer_status == ConsumerStatus.INITIALIZING, \
                        ClientWorkerException("get init task result, but status is: " + str(self.consumer_status))

                    init_result = task_result
                    self.next_fetch_cursor = init_result.get_cursor()
                    self.fetch_end_cursor = init_result.end_cursor
                    self.checkpoint_tracker.set_memory_check_point(self.next_fetch_cursor)
                    if init_result.is_cursor_persistent():
                        self.checkpoint_tracker.set_persistent_check_point(self.next_fetch_cursor)

                elif isinstance(task_result, ProcessTaskResult):
                    # maintain check points
                    process_task_result = task_result
                    roll_back_checkpoint = process_task_result.get_rollback_check_point()
                    if roll_back_checkpoint:
                        self.last_fetch_log_group = None
                        self.logger.info("user defined to roll-back check-point, cancel current fetching task")
                        self.cancel_current_fetch()
                        self.next_fetch_cursor = roll_back_checkpoint

            # log task status
            self._sample_log_error(task_result)

            # update status basing on task results
            self._update_status(task_success)

            #
            self._generate_next_task()