miscellaneous/distributed_tensorflow_mask_rcnn/container-optimized-script-mode/resources/train.py (224 lines of code) (raw):
import glob
import json
import os
import shutil
import signal
import socket
import subprocess
import sys
import time
from contextlib import contextmanager
def setup():
# Read info that SageMaker provides
current_host = os.environ["SM_CURRENT_HOST"]
hosts = json.loads(os.environ["SM_HOSTS"])
# Enable SSH connections between containers
_start_ssh_daemon()
if current_host == _get_master_host_name(hosts):
_wait_for_worker_nodes_to_start_sshd(hosts)
class TimeoutError(Exception):
pass
@contextmanager
def timeout(seconds=0, minutes=0, hours=0):
"""
Add a signal-based timeout to any block of code.
If multiple time units are specified, they will be added together to determine time limit.
Usage:
with timeout(seconds=5):
my_slow_function(...)
Args:
- seconds: The time limit, in seconds.
- minutes: The time limit, in minutes.
- hours: The time limit, in hours.
"""
limit = seconds + 60 * minutes + 3600 * hours
def handler(signum, frame): # pylint: disable=W0613
raise TimeoutError("timed out after {} seconds".format(limit))
try:
signal.signal(signal.SIGALRM, handler)
signal.setitimer(signal.ITIMER_REAL, limit)
yield
finally:
signal.alarm(0)
def _get_master_host_name(hosts):
return sorted(hosts)[0]
def _start_ssh_daemon():
subprocess.Popen(["/usr/sbin/sshd", "-D"])
def _wait_for_worker_nodes_to_start_sshd(hosts, interval=1, timeout_in_seconds=180):
with timeout(seconds=timeout_in_seconds):
while hosts:
print("hosts that aren't SSHable yet: %s", str(hosts))
for host in hosts:
ssh_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if _can_connect(host, 22, ssh_socket):
hosts.remove(host)
time.sleep(interval)
def _can_connect(host, port, s):
try:
print("testing connection to host %s", host)
s.connect((host, port))
s.close()
print("can connect to host %s", host)
return True
except socket.error:
print("can't connect to host %s", host)
return False
def wait_for_training_processes_to_appear_and_finish(proccess_id_string, worker):
training_process_started = False
while True:
time.sleep(300)
training_process_ps = subprocess.check_output(
f'ps -elf | grep "{proccess_id_string}"', encoding="utf-8", shell=True
)
print(training_process_ps)
training_process_count = subprocess.check_output(
f'ps -elf | grep "{proccess_id_string}" | wc -l', encoding="utf-8", shell=True
)
training_process_count_str = training_process_count.replace("\n", "").strip()
training_process_count = int(training_process_count_str) - 2
training_process_running = training_process_count > 0
if training_process_started:
print(f"training processes running: {training_process_count}")
if not training_process_running:
print(f"Worker {worker} training completed.")
time.sleep(5)
sys.exit(0)
if not training_process_started:
if training_process_running:
training_process_started = True
else:
print(f"Worker {worker} exiting: training not started in 300 seconds.")
sys.exit(1)
def build_host_arg(host_list, gpu_per_host):
arg = ""
for ind, host in enumerate(host_list):
if ind != 0:
arg += ","
arg += f"{host}:{gpu_per_host}"
return arg
def copy_files(src, dest):
src_files = os.listdir(src)
for file in src_files:
path = os.path.join(src, file)
if os.path.isfile(path):
shutil.copy(path, dest)
def train():
import pprint
pprint.pprint(dict(os.environ), width=1)
model_dir = os.environ["SM_MODEL_DIR"]
log_dir = None
copy_logs_to_model_dir = False
try:
log_dir = os.environ["SM_CHANNEL_LOG"]
copy_logs_to_model_dir = True
except KeyError:
log_dir = model_dir
train_data_dir = os.environ["SM_CHANNEL_TRAIN"]
print("pre-setup check")
setup()
current_host = os.environ["SM_CURRENT_HOST"]
all_hosts = json.loads(os.environ["SM_HOSTS"])
if_name = os.environ["SM_NETWORK_INTERFACE_NAME"]
is_master = current_host == sorted(all_hosts)[0]
if not is_master:
print(f"Worker: {current_host}")
process_search_term = "python3 /mask-rcnn-tensorflow/MaskRCNN/train.py"
wait_for_training_processes_to_appear_and_finish(process_search_term, current_host)
print(f"Worker {current_host} has completed")
else:
print(f"Master: {current_host}")
hyperparamters = json.loads(os.environ["SM_HPS"])
try:
batch_norm = hyperparamters["batch_norm"]
except KeyError:
batch_norm = "FreezeBN"
try:
mode_fpn = hyperparamters["mode_fpn"]
except KeyError:
mode_fpn = "True"
try:
mode_mask = hyperparamters["mode_mask"]
except KeyError:
mode_mask = "True"
try:
eval_period = hyperparamters["eval_period"]
except KeyError:
eval_period = 1
try:
lr_epoch_schedule = hyperparamters["lr_epoch_schedule"]
except KeyError:
lr_epoch_schedule = "[(16, 0.1), (20, 0.01), (24, None)]"
try:
horovod_cycle_time = hyperparamters["horovod_cycle_time"]
except KeyError:
horovod_cycle_time = 0.5
try:
horovod_fusion_threshold = hyperparamters["horovod_fusion_threshold"]
except KeyError:
horovod_fusion_threshold = 67108864
try:
data_train = hyperparamters["data_train"]
except KeyError:
data_train = "train2017"
try:
data_val = hyperparamters["data_val"]
except KeyError:
data_val = "val2017"
try:
nccl_min_rings = hyperparamters["nccl_min_rings"]
except KeyError:
nccl_min_rings = 8
try:
batch_size_per_gpu = hyperparamters["batch_size_per_gpu"]
except KeyError:
batch_size_per_gpu = 4
try:
images_per_epoch = hyperparamters["images_per_epoch"]
except KeyError:
images_per_epoch = 120000
try:
backbone_weights = hyperparamters["backbone_weights"]
except KeyError:
backbone_weights = "ImageNet-R50-AlignPadding.npz"
try:
resnet_arch = hyperparamters["resnet_arch"]
except KeyError:
resnet_arch = "resnet50"
load_model = None
try:
load_model = hyperparamters["load_model"]
except KeyError:
pass
resnet_num_blocks = "[3, 4, 6, 3]"
if resnet_arch == "resnet101":
resnet_num_blocks = "[3, 4, 23, 3]"
gpus_per_host = int(os.environ["SM_NUM_GPUS"])
numprocesses = len(all_hosts) * int(gpus_per_host)
mpirun_cmd = f"""HOROVOD_CYCLE_TIME={horovod_cycle_time} \\
HOROVOD_FUSION_THRESHOLD={horovod_fusion_threshold} \\
mpirun -np {numprocesses} \\
--host {build_host_arg(all_hosts, gpus_per_host)} \\
--allow-run-as-root \\
--display-map \\
--tag-output \\
-mca btl_tcp_if_include {if_name} \\
-mca oob_tcp_if_include {if_name} \\
-x NCCL_SOCKET_IFNAME={if_name} \\
--mca plm_rsh_no_tree_spawn 1 \\
-bind-to none -map-by slot \\
-mca pml ob1 -mca btl ^openib \\
-mca orte_abort_on_non_zero_status 1 \\
-x TENSORPACK_FP16=1 \\
-x NCCL_MIN_NRINGS={nccl_min_rings} -x NCCL_DEBUG=INFO \\
-x HOROVOD_CYCLE_TIME -x HOROVOD_FUSION_THRESHOLD \\
-x LD_LIBRARY_PATH -x PATH \\
--output-filename {model_dir} \\
python3 /mask-rcnn-tensorflow/MaskRCNN/train.py \
--logdir {log_dir} \
--fp16 \
--throughput_log_freq=2000 \
--images_per_epoch {images_per_epoch} \
--config \
MODE_FPN={mode_fpn} \
MODE_MASK={mode_mask} \
DATA.BASEDIR={train_data_dir} \
BACKBONE.RESNET_NUM_BLOCKS='{resnet_num_blocks}' \
BACKBONE.WEIGHTS={train_data_dir}/pretrained-models/{backbone_weights} \
BACKBONE.NORM={batch_norm} \
DATA.TRAIN='["{data_train}"]' \
DATA.VAL='("{data_val}",)' \
TRAIN.BATCH_SIZE_PER_GPU={batch_size_per_gpu} \
TRAIN.EVAL_PERIOD={eval_period} \
TRAIN.LR_EPOCH_SCHEDULE='{lr_epoch_schedule}' \
RPN.TOPK_PER_IMAGE=True \
PREPROC.PREDEFINED_PADDING=True \
TRAIN.GRADIENT_CLIP=0 \
TRAINER=horovod"""
for key, item in hyperparamters.items():
if key.startswith("config:"):
hp = f" {key[7:]}={item}"
mpirun_cmd += hp
if load_model:
mpirun_cmd += f" --load {train_data_dir}/pretrained-models/{load_model}"
print("--------Begin MPI Run Command----------")
print(mpirun_cmd)
print("--------End MPI Run Comamnd------------")
exitcode = 0
try:
process = subprocess.Popen(
mpirun_cmd,
encoding="utf-8",
cwd="/mask-rcnn-tensorflow",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
)
while True:
if process.poll() != None:
break
output = process.stdout.readline()
if output:
print(output.strip())
exitcode = process.poll()
print(f"mpirun exit code:{exitcode}")
exitcode = 0
except Exception as e:
print("train exception occured", file=sys.stderr)
exitcode = 1
print(str(e), file=sys.stderr)
finally:
if copy_logs_to_model_dir:
copy_files(log_dir, model_dir)
sys.stdout.flush()
sys.stderr.flush()
sys.exit(exitcode)
if __name__ == "__main__":
train()