def run_job()

in aws/petctl.py [0:0]


def run_job(session, specs_json, args):
    job_name = args.name
    script_args = args.script_args

    rdzv_specs = specs_json["rdzv"]
    worker_specs = specs_json["worker"]

    script_url = urlparse(args.script_path)
    scheme = script_url.scheme
    if scheme == "docker":
        # docker://tmp/script.py -> tmp/script.py (relative to working dir in docker)
        # docker:///tmp/script.py -> /tmp/script.py (absolute path in docker)
        script = script_url.netloc + script_url.path
    elif scheme == "s3":
        # fetch_and_run supports s3:// so just pass through
        script = args.script_path
    else:
        s3_bucket = worker_specs["s3_bucket"]
        s3_prefix = worker_specs["s3_prefix"]
        script = S3(session).cp(args.script_path, s3_bucket, f"{s3_prefix}/{job_name}")

    asg = AutoScalingGroup(session)
    rdzv_asg_name = f"{job_name}_rdzv"
    worker_asg_name = f"{job_name}_worker"

    # create a single node asg to host the etcd server for rendezvous
    etcd_server_hostname = asg.create_asg_sync(rdzv_asg_name, size=1, **rdzv_specs)[0]
    rdzv_endpoint = f"{etcd_server_hostname}:2379"

    # allow overriding instance types from cli
    if args.instance_type:
        worker_specs["instance_type"] = args.instance_type
    worker_specs["rdzv_endpoint"] = rdzv_endpoint
    worker_specs["job_name"] = job_name
    worker_specs["script"] = script
    worker_specs["args"] = " ".join(script_args)
    worker_specs["user"] = getpass.getuser()

    instance_type = worker_specs["instance_type"]
    script_args_str = worker_specs["args"]

    log.info(
        f"\n------------------------------------------------------------------\n"
        f"Starting job...\n"
        f"  job name     : {job_name}\n"
        f"  instance type: {instance_type}\n"
        f"  size         : {args.size} (min={args.min_size}, max={args.max_size})\n"
        f"  rdzv endpoint: {rdzv_endpoint}\n"
        f"  cmd          : {script}\n"
        f"  cmd args     : {script_args_str}\n"
        f"------------------------------------------------------------------\n"
    )

    asg.create_asg(
        worker_asg_name, args.size, args.min_size, args.max_size, **worker_specs
    )