in cli/jobs/pipelines/tensorflow-image-segmentation/src/tf_helper/training.py [0:0]
def setup_config(self, args):
"""Sets internal variables using provided CLI arguments (see build_arguments_parser()).
In particular, sets device(cuda) and multinode parameters."""
self.dataloading_config = args
self.training_config = args
# verify parameter default values
if self.dataloading_config.num_workers is None:
self.dataloading_config.num_workers = tf.data.AUTOTUNE
if self.dataloading_config.num_workers < 0:
self.dataloading_config.num_workers = tf.data.AUTOTUNE
if self.dataloading_config.num_workers == 0:
self.logger.warning(
"You specified num_workers=0, forcing prefetch_factor to be discarded."
)
self.dataloading_config.prefetch_factor = 0
# Get distribution config
if "TF_CONFIG" not in os.environ:
self.logger.critical(
"TF_CONFIG cannot be found in os.environ, defaulting back to non-distributed training"
)
self.nodes = 1
# self.devices = [ device.name for device in tf.config.list_physical_devices('GPU') ]
self.worker_id = 0
else:
tf_config = json.loads(os.environ["TF_CONFIG"])
self.logger.info(f"Found TF_CONFIG = {tf_config}")
self.nodes = len(tf_config["cluster"]["worker"])
# self.devices = [ device.name for device in tf.config.list_physical_devices('GPU') ]
self.worker_id = tf_config["task"]["index"]
# Reduce number of GPUs artificially if requested
if args.disable_cuda:
self.logger.warning("CUDA disabled because --disable_cuda True")
self.gpus = 0
elif args.num_gpus == 0:
self.logger.warning("CUDA disabled because --num_gpus=0")
self.gpus = 0
elif args.num_gpus and args.num_gpus > 0:
self.gpus = args.num_gpus
self.logger.warning(
f"Because you set --num_gpus={args.num_gpus}, retricting to first {self.gpus} physical devices"
)
else: # if args.num_gpus < 0
self.gpus = len(tf.config.list_physical_devices("GPU"))
# Check if we need distributed at all
self.distributed_available = (self.nodes > 1) or (
(self.nodes * self.gpus) > 1
) # if multi-node (CPU or GPU) or multi-gpu
self.self_is_main_node = self.worker_id == 0
self.logger.info(
f"Distribution settings: nodes={self.nodes}, gpus={self.gpus}, distributed_available={self.distributed_available}, self_is_main_node={self.self_is_main_node}"
)
# Setting up TF distributed is a whole story
self._setup_distribution_strategy()
# DISTRIBUTED: in distributed mode, you want to report parameters
# only from main process (rank==0) to avoid conflict
if self.self_is_main_node:
# MLFLOW: report relevant parameters using mlflow
logged_params = {
# log some distribution params
"nodes": self.nodes,
"instance_per_node": self.gpus,
"disable_cuda": bool(self.training_config.disable_cuda),
"distributed": self.distributed_available,
"distributed_strategy_resolved": self.training_config.distributed_strategy,
"distributed_backend": self.training_config.distributed_backend,
# data loading params
"batch_size": self.dataloading_config.batch_size,
"num_workers": self.dataloading_config.num_workers,
"cpu_count": self.cpu_count,
"prefetch_factor": self.dataloading_config.prefetch_factor,
"cache": self.dataloading_config.cache,
# training params
"model_arch": self.training_config.model_arch,
"model_input_size": self.training_config.model_input_size,
"model_arch_pretrained": False, # TODO
"num_classes": self.training_config.num_classes,
# profiling
"enable_profiling": bool(self.training_config.enable_profiling),
}
logged_params.update(get_nvml_params()) # add some gpu properties
logged_params["cuda_available"] = (
logged_params.get("cuda_device_count", 0) > 0
)
mlflow.log_params(logged_params)