in smallpond/execution/task.py [0:0]
def main():
import argparse
from smallpond.execution.task import Task
from smallpond.io.filesystem import load
parser = argparse.ArgumentParser(prog="task.py", description="Task Local Runner")
parser.add_argument("pickle_path", help="Path of pickled task(s)")
parser.add_argument("-t", "--task_id", default=None, help="Task id")
parser.add_argument("-r", "--retry_count", default=0, help="Task retry count")
parser.add_argument("-o", "--output_path", default=None, help="Task output path")
parser.add_argument("-l", "--log_level", default="DEBUG", help="Logging message level")
args = parser.parse_args()
logger.remove()
logger.add(
sys.stdout,
format=r"[{time:%Y-%m-%d %H:%M:%S.%f}] {level} {message}",
level=args.log_level,
)
def exec_task(task: Task, output_path: Optional[str]):
for retry_count in range(1000):
task.retry_count = retry_count
if output_path is None:
task.output_root = task.ctx.staging_root
else:
task.output_root = os.path.join(output_path, "output")
task.ctx.temp_root = os.path.join(output_path, "temp")
if any(
os.path.exists(path)
for path in (
task.temp_abspath,
task.final_output_abspath,
task.runtime_output_abspath,
)
):
continue
task.status = WorkStatus.INCOMPLETE
task.start_time = time.time()
task.finish_time = None
logger.info(f"start to debug: {task}")
task.exec()
break
obj = load(args.pickle_path)
logger.info(f"loaded an object of {type(obj)} from pickle file {args.pickle_path}")
if isinstance(obj, Task):
task: Task = obj
exec_task(task, args.output_path)
elif isinstance(obj, Dict):
assert args.task_id is not None, f"error: no task id specified"
tasks: List[Task] = obj.values()
for task in tasks:
if task.id == TaskId(args.task_id) and task.retry_count == args.retry_count:
exec_task(task, args.output_path)
break
else:
logger.error(f"cannot find task with id {args.task_id}")
else:
logger.error(f"unsupported type of object: {type(obj)}")