in aliyun/log/consumer/shard_worker.py [0:0]
def check_and_generate_next_task(self):
"""
check if the previous task is done and proceed to fire another task
:return:
"""
# if self.task_future is None:
# # there's no any ongoing task
# self._update_status(False)
# self.generate_next_task()
# return
if self.task_future is None or self.task_future.done():
task_success = False
task_result = self.get_task_result(self.task_future)
self.task_future = None
if task_result is not None and task_result.get_exception() is None:
task_success = True
if isinstance(task_result, InitTaskResult):
# maintain check points
assert self.consumer_status == ConsumerStatus.INITIALIZING, \
ClientWorkerException("get init task result, but status is: " + str(self.consumer_status))
init_result = task_result
self.next_fetch_cursor = init_result.get_cursor()
self.fetch_end_cursor = init_result.end_cursor
self.checkpoint_tracker.set_memory_check_point(self.next_fetch_cursor)
if init_result.is_cursor_persistent():
self.checkpoint_tracker.set_persistent_check_point(self.next_fetch_cursor)
elif isinstance(task_result, ProcessTaskResult):
# maintain check points
process_task_result = task_result
roll_back_checkpoint = process_task_result.get_rollback_check_point()
if roll_back_checkpoint:
self.last_fetch_log_group = None
self.logger.info("user defined to roll-back check-point, cancel current fetching task")
self.cancel_current_fetch()
self.next_fetch_cursor = roll_back_checkpoint
# log task status
self._sample_log_error(task_result)
# update status basing on task results
self._update_status(task_success)
#
self._generate_next_task()