in aws/petctl.py [0:0]
def run_job(session, specs_json, args):
job_name = args.name
script_args = args.script_args
rdzv_specs = specs_json["rdzv"]
worker_specs = specs_json["worker"]
script_url = urlparse(args.script_path)
scheme = script_url.scheme
if scheme == "docker":
# docker://tmp/script.py -> tmp/script.py (relative to working dir in docker)
# docker:///tmp/script.py -> /tmp/script.py (absolute path in docker)
script = script_url.netloc + script_url.path
elif scheme == "s3":
# fetch_and_run supports s3:// so just pass through
script = args.script_path
else:
s3_bucket = worker_specs["s3_bucket"]
s3_prefix = worker_specs["s3_prefix"]
script = S3(session).cp(args.script_path, s3_bucket, f"{s3_prefix}/{job_name}")
asg = AutoScalingGroup(session)
rdzv_asg_name = f"{job_name}_rdzv"
worker_asg_name = f"{job_name}_worker"
# create a single node asg to host the etcd server for rendezvous
etcd_server_hostname = asg.create_asg_sync(rdzv_asg_name, size=1, **rdzv_specs)[0]
rdzv_endpoint = f"{etcd_server_hostname}:2379"
# allow overriding instance types from cli
if args.instance_type:
worker_specs["instance_type"] = args.instance_type
worker_specs["rdzv_endpoint"] = rdzv_endpoint
worker_specs["job_name"] = job_name
worker_specs["script"] = script
worker_specs["args"] = " ".join(script_args)
worker_specs["user"] = getpass.getuser()
instance_type = worker_specs["instance_type"]
script_args_str = worker_specs["args"]
log.info(
f"\n------------------------------------------------------------------\n"
f"Starting job...\n"
f" job name : {job_name}\n"
f" instance type: {instance_type}\n"
f" size : {args.size} (min={args.min_size}, max={args.max_size})\n"
f" rdzv endpoint: {rdzv_endpoint}\n"
f" cmd : {script}\n"
f" cmd args : {script_args_str}\n"
f"------------------------------------------------------------------\n"
)
asg.create_asg(
worker_asg_name, args.size, args.min_size, args.max_size, **worker_specs
)