miscellaneous/distributed_tensorflow_mask_rcnn/container-optimized-script-mode/resources/train.py [13:164]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def setup():

    # Read info that SageMaker provides
    current_host = os.environ["SM_CURRENT_HOST"]
    hosts = json.loads(os.environ["SM_HOSTS"])

    # Enable SSH connections between containers
    _start_ssh_daemon()

    if current_host == _get_master_host_name(hosts):
        _wait_for_worker_nodes_to_start_sshd(hosts)


class TimeoutError(Exception):
    pass


@contextmanager
def timeout(seconds=0, minutes=0, hours=0):
    """
    Add a signal-based timeout to any block of code.
    If multiple time units are specified, they will be added together to determine time limit.
    Usage:
    with timeout(seconds=5):
        my_slow_function(...)
    Args:
        - seconds: The time limit, in seconds.
        - minutes: The time limit, in minutes.
        - hours: The time limit, in hours.
    """

    limit = seconds + 60 * minutes + 3600 * hours

    def handler(signum, frame):  # pylint: disable=W0613
        raise TimeoutError("timed out after {} seconds".format(limit))

    try:
        signal.signal(signal.SIGALRM, handler)
        signal.setitimer(signal.ITIMER_REAL, limit)
        yield
    finally:
        signal.alarm(0)


def _get_master_host_name(hosts):
    return sorted(hosts)[0]


def _start_ssh_daemon():
    subprocess.Popen(["/usr/sbin/sshd", "-D"])


def _wait_for_worker_nodes_to_start_sshd(hosts, interval=1, timeout_in_seconds=180):
    with timeout(seconds=timeout_in_seconds):
        while hosts:
            print("hosts that aren't SSHable yet: %s", str(hosts))
            for host in hosts:
                ssh_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                if _can_connect(host, 22, ssh_socket):
                    hosts.remove(host)
            time.sleep(interval)


def _can_connect(host, port, s):
    try:
        print("testing connection to host %s", host)
        s.connect((host, port))
        s.close()
        print("can connect to host %s", host)
        return True
    except socket.error:
        print("can't connect to host %s", host)
        return False


def wait_for_training_processes_to_appear_and_finish(proccess_id_string, worker):

    training_process_started = False
    while True:
        time.sleep(300)
        training_process_ps = subprocess.check_output(
            f'ps -elf | grep "{proccess_id_string}"', encoding="utf-8", shell=True
        )
        print(training_process_ps)
        training_process_count = subprocess.check_output(
            f'ps -elf | grep "{proccess_id_string}" | wc -l', encoding="utf-8", shell=True
        )
        training_process_count_str = training_process_count.replace("\n", "").strip()
        training_process_count = int(training_process_count_str) - 2
        training_process_running = training_process_count > 0
        if training_process_started:
            print(f"training processes running: {training_process_count}")
            if not training_process_running:
                print(f"Worker {worker} training completed.")
                time.sleep(5)
                sys.exit(0)

        if not training_process_started:
            if training_process_running:
                training_process_started = True
            else:
                print(f"Worker {worker} exiting: training not started in 300 seconds.")
                sys.exit(1)


def build_host_arg(host_list, gpu_per_host):
    arg = ""
    for ind, host in enumerate(host_list):
        if ind != 0:
            arg += ","
        arg += f"{host}:{gpu_per_host}"
    return arg


def copy_files(src, dest):
    src_files = os.listdir(src)
    for file in src_files:
        path = os.path.join(src, file)
        if os.path.isfile(path):
            shutil.copy(path, dest)


def train():

    import pprint

    pprint.pprint(dict(os.environ), width=1)

    model_dir = os.environ["SM_MODEL_DIR"]
    log_dir = None

    copy_logs_to_model_dir = False

    try:
        log_dir = os.environ["SM_CHANNEL_LOG"]
        copy_logs_to_model_dir = True
    except KeyError:
        log_dir = model_dir

    train_data_dir = os.environ["SM_CHANNEL_TRAIN"]

    print("pre-setup check")
    setup()

    current_host = os.environ["SM_CURRENT_HOST"]
    all_hosts = json.loads(os.environ["SM_HOSTS"])
    if_name = os.environ["SM_NETWORK_INTERFACE_NAME"]

    is_master = current_host == sorted(all_hosts)[0]

    if not is_master:
        print(f"Worker: {current_host}")
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



miscellaneous/distributed_tensorflow_mask_rcnn/container-script-mode/resources/train.py [13:163]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def setup():

    # Read info that SageMaker provides
    current_host = os.environ["SM_CURRENT_HOST"]
    hosts = json.loads(os.environ["SM_HOSTS"])

    # Enable SSH connections between containers
    _start_ssh_daemon()

    if current_host == _get_master_host_name(hosts):
        _wait_for_worker_nodes_to_start_sshd(hosts)


class TimeoutError(Exception):
    pass


@contextmanager
def timeout(seconds=0, minutes=0, hours=0):
    """
    Add a signal-based timeout to any block of code.
    If multiple time units are specified, they will be added together to determine time limit.
    Usage:
    with timeout(seconds=5):
        my_slow_function(...)
    Args:
        - seconds: The time limit, in seconds.
        - minutes: The time limit, in minutes.
        - hours: The time limit, in hours.
    """

    limit = seconds + 60 * minutes + 3600 * hours

    def handler(signum, frame):  # pylint: disable=W0613
        raise TimeoutError("timed out after {} seconds".format(limit))

    try:
        signal.signal(signal.SIGALRM, handler)
        signal.setitimer(signal.ITIMER_REAL, limit)
        yield
    finally:
        signal.alarm(0)


def _get_master_host_name(hosts):
    return sorted(hosts)[0]


def _start_ssh_daemon():
    subprocess.Popen(["/usr/sbin/sshd", "-D"])


def _wait_for_worker_nodes_to_start_sshd(hosts, interval=1, timeout_in_seconds=180):
    with timeout(seconds=timeout_in_seconds):
        while hosts:
            print("hosts that aren't SSHable yet: %s", str(hosts))
            for host in hosts:
                ssh_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                if _can_connect(host, 22, ssh_socket):
                    hosts.remove(host)
            time.sleep(interval)


def _can_connect(host, port, s):
    try:
        print("testing connection to host %s", host)
        s.connect((host, port))
        s.close()
        print("can connect to host %s", host)
        return True
    except socket.error:
        print("can't connect to host %s", host)
        return False


def wait_for_training_processes_to_appear_and_finish(proccess_id_string, worker):

    training_process_started = False
    while True:
        time.sleep(300)
        training_process_ps = subprocess.check_output(
            f'ps -elf | grep "{proccess_id_string}"', encoding="utf-8", shell=True
        )
        print(training_process_ps)
        training_process_count = subprocess.check_output(
            f'ps -elf | grep "{proccess_id_string}" | wc -l', encoding="utf-8", shell=True
        )
        training_process_count_str = training_process_count.replace("\n", "").strip()
        training_process_count = int(training_process_count_str) - 2
        training_process_running = training_process_count > 0
        if training_process_started:
            print(f"training processes running: {training_process_count}")
            if not training_process_running:
                print(f"Worker {worker} training completed.")
                time.sleep(5)
                sys.exit(0)

        if not training_process_started:
            if training_process_running:
                training_process_started = True
            else:
                print(f"Worker {worker} exiting: training not started in 300 seconds.")
                sys.exit(1)


def build_host_arg(host_list, gpu_per_host):
    arg = ""
    for ind, host in enumerate(host_list):
        if ind != 0:
            arg += ","
        arg += f"{host}:{gpu_per_host}"
    return arg


def copy_files(src, dest):
    src_files = os.listdir(src)
    for file in src_files:
        path = os.path.join(src, file)
        if os.path.isfile(path):
            shutil.copy(path, dest)


def train():
    import pprint

    pprint.pprint(dict(os.environ), width=1)

    model_dir = os.environ["SM_MODEL_DIR"]
    log_dir = None

    copy_logs_to_model_dir = False

    try:
        log_dir = os.environ["SM_CHANNEL_LOG"]
        copy_logs_to_model_dir = True
    except KeyError:
        log_dir = model_dir

    train_data_dir = os.environ["SM_CHANNEL_TRAIN"]

    print("pre-setup check")
    setup()

    current_host = os.environ["SM_CURRENT_HOST"]
    all_hosts = json.loads(os.environ["SM_HOSTS"])
    if_name = os.environ["SM_NETWORK_INTERFACE_NAME"]

    is_master = current_host == sorted(all_hosts)[0]

    if not is_master:
        print(f"Worker: {current_host}")
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



