def main()

in smallpond/execution/task.py [0:0]


def main():
    import argparse

    from smallpond.execution.task import Task
    from smallpond.io.filesystem import load

    parser = argparse.ArgumentParser(prog="task.py", description="Task Local Runner")
    parser.add_argument("pickle_path", help="Path of pickled task(s)")
    parser.add_argument("-t", "--task_id", default=None, help="Task id")
    parser.add_argument("-r", "--retry_count", default=0, help="Task retry count")
    parser.add_argument("-o", "--output_path", default=None, help="Task output path")
    parser.add_argument("-l", "--log_level", default="DEBUG", help="Logging message level")
    args = parser.parse_args()

    logger.remove()
    logger.add(
        sys.stdout,
        format=r"[{time:%Y-%m-%d %H:%M:%S.%f}] {level} {message}",
        level=args.log_level,
    )

    def exec_task(task: Task, output_path: Optional[str]):
        for retry_count in range(1000):
            task.retry_count = retry_count
            if output_path is None:
                task.output_root = task.ctx.staging_root
            else:
                task.output_root = os.path.join(output_path, "output")
                task.ctx.temp_root = os.path.join(output_path, "temp")
            if any(
                os.path.exists(path)
                for path in (
                    task.temp_abspath,
                    task.final_output_abspath,
                    task.runtime_output_abspath,
                )
            ):
                continue
            task.status = WorkStatus.INCOMPLETE
            task.start_time = time.time()
            task.finish_time = None
            logger.info(f"start to debug: {task}")
            task.exec()
            break

    obj = load(args.pickle_path)
    logger.info(f"loaded an object of {type(obj)} from pickle file {args.pickle_path}")

    if isinstance(obj, Task):
        task: Task = obj
        exec_task(task, args.output_path)
    elif isinstance(obj, Dict):
        assert args.task_id is not None, f"error: no task id specified"
        tasks: List[Task] = obj.values()
        for task in tasks:
            if task.id == TaskId(args.task_id) and task.retry_count == args.retry_count:
                exec_task(task, args.output_path)
                break
        else:
            logger.error(f"cannot find task with id {args.task_id}")
    else:
        logger.error(f"unsupported type of object: {type(obj)}")