in smallpond/execution/driver.py [0:0]
def _create_driver_args_parser(self):
parser = argparse.ArgumentParser(prog="driver.py", description="Smallpond Driver", add_help=False)
parser.add_argument("mode", choices=["executor", "scheduler", "ray"], default="executor")
parser.add_argument("--exec_id", default=socket.gethostname(), help="Unique executor id")
parser.add_argument("--job_id", type=str, help="Unique job id")
parser.add_argument("--job_time", type=float, help="Job create time (seconds since epoch)")
parser.add_argument("--job_name", default="smallpond", help="Display name of the job")
parser.add_argument(
"--job_priority",
type=int,
help="Job priority",
)
parser.add_argument("--resource_group", type=str, help="Resource group")
parser.add_argument("--env_variables", nargs="*", default=[], help="Env variables for the job")
parser.add_argument("--sidecars", nargs="*", default=[], help="Sidecars for the job")
parser.add_argument("--tags", nargs="*", default=[], help="Tags for submitted platform task")
parser.add_argument("--task_image", default="default", help="Container image of platform task")
parser.add_argument("--python_venv", type=str, help="Python virtual env for the job")
parser.add_argument(
"--data_root",
type=str,
help="The root folder for all files generated at runtime",
)
parser.add_argument(
"--runtime_ctx_path",
default=None,
help="The path of pickled runtime context passed from scheduler to executor",
)
parser.add_argument(
"--num_executors",
default=0,
type=int,
help="The number of nodes/executors (run all tasks on scheduler if set to zero)",
)
parser.add_argument(
"--num_executors_per_task",
default=5,
type=int,
help="The number of nodes/executors in each platform task.",
)
parser.add_argument(
"--random_seed",
type=int,
default=None,
help="Random seed for the job, default: int.from_bytes((os.urandom(128))",
)
parser.add_argument(
"--max_retry",
"--max_retry_count",
dest="max_retry_count",
default=DEFAULT_MAX_RETRY_COUNT,
type=int,
help="The max number of times a task is retried by speculative execution",
)
parser.add_argument(
"--max_fail",
"--max_fail_count",
dest="max_fail_count",
default=DEFAULT_MAX_FAIL_COUNT,
type=int,
help="The number of times a task is allowed to fail or crash before giving up",
)
parser.add_argument(
"--prioritize_retry",
action="store_true",
help="Prioritize retry tasks in scheduling",
)
parser.add_argument(
"--speculative_exec",
dest="speculative_exec",
choices=["disable", "enable", "aggressive"],
help="Level of speculative execution",
)
parser.add_argument(
"--enable_speculative_exec",
dest="speculative_exec",
action="store_const",
const="enable",
help="Enable speculative execution",
)
parser.add_argument(
"--disable_speculative_exec",
dest="speculative_exec",
action="store_const",
const="disable",
help="Disable speculative execution",
)
parser.set_defaults(speculative_exec="enable")
parser.add_argument(
"--stop_executor_on_failure",
action="store_true",
help="Stop an executor if any task fails on it",
)
parser.add_argument(
"--fail_fast",
"--fail_fast_on_failure",
dest="fail_fast_on_failure",
action="store_true",
help="Stop all executors if any task fails",
)
parser.add_argument(
"--nonzero_exitcode_as_oom",
action="store_true",
help="Treat task crash as oom error",
)
parser.add_argument(
"--fault_inject",
"--fault_inject_prob",
dest="fault_inject_prob",
type=float,
default=0.0,
help="Inject random errors at runtime (for test)",
)
parser.add_argument(
"--enable_profiling",
action="store_true",
help="Enable Python profiling for each task",
)
parser.add_argument(
"--enable_diagnostic_metrics",
action="store_true",
help="Enable diagnostic metrcis which may have performance impact",
)
parser.add_argument(
"--disable_sched_state_dump",
dest="enable_sched_state_dump",
action="store_false",
help="Disable periodic dump of scheduler state",
)
parser.add_argument(
"--enable_sched_state_dump",
dest="enable_sched_state_dump",
action="store_true",
help="Enable periodic dump of scheduler state so that scheduler can resume execution after restart",
)
parser.set_defaults(enable_sched_state_dump=False)
parser.add_argument(
"--remove_empty_parquet",
action="store_true",
help="Remove empty parquet files from hash partition output",
)
parser.add_argument(
"--remove_output_root",
action="store_true",
help="Remove all output files after job completed (for test)",
)
parser.add_argument(
"--skip_task_with_empty_input",
action="store_true",
help="Skip running a task if any of its input datasets is empty",
)
parser.add_argument(
"--self_contained_final_results",
action="store_true",
help="Build self-contained final results, i.e., create hard/symbolic links in output folder of final results",
)
parser.add_argument(
"--malloc",
"--memory_allocator",
dest="memory_allocator",
default="mimalloc",
choices=["system", "jemalloc", "mimalloc"],
help="Override memory allocator used by worker processes",
)
parser.add_argument(
"--memory_purge_delay",
default=10000,
help="The delay in milliseconds after which jemalloc/mimalloc will purge memory pages that are not in use.",
)
parser.add_argument(
"--bind_numa_node",
action="store_true",
help="Bind executor processes to numa nodes.",
)
parser.add_argument(
"--enforce_memory_limit",
action="store_true",
help="Set soft/hard memory limit for each task process",
)
parser.add_argument(
"--enable_log_analytics",
dest="share_log_analytics",
action="store_true",
help="Share log analytics with smallpond team",
)
parser.add_argument(
"--disable_log_analytics",
dest="share_log_analytics",
action="store_false",
help="Do not share log analytics with smallpond team",
)
log_level_choices = [
"CRITICAL",
"ERROR",
"WARNING",
"SUCCESS",
"INFO",
"DEBUG",
"TRACE",
]
parser.add_argument(
"--console_log_level",
default="INFO",
choices=log_level_choices,
)
parser.add_argument(
"--file_log_level",
default="DEBUG",
choices=log_level_choices,
)
parser.add_argument("--disable_log_rotation", action="store_true", help="Disable log rotation")
parser.add_argument(
"--output_path",
help="Set the output directory of final results and all nodes that have output_name but no output_path specified",
)
parser.add_argument(
"--platform",
type=str,
help="The platform to use for the job. available: mpi",
)
return parser