in cli/jobs/pipelines/tensorflow-image-segmentation/src/tf_helper/training.py [0:0]
def _setup_distribution_strategy(self):
"""DISTRIBUTED: this takes care of initializing the distribution strategy.
Tensorflow uses a different "strategy" for each use case:
- multi-node => MultiWorkerMirroredStrategy
- single-node multi-gpu => MirroredStrategy
- single-node single-gpu => OneDeviceStrategy
Each comes with its own initialization process."""
# Identify the right strategy depending on params + context
if self.training_config.distributed_strategy == "auto":
# Auto detect
if self.nodes > 1: # MULTI-NODE
self.training_config.distributed_strategy = (
"multiworkermirroredstrategy"
)
elif self.gpus > 1: # SINGLE-NODE MULTI-GPU
self.training_config.distributed_strategy = "mirroredstrategy"
else: # SINGLE-NODE SINGLE-GPU
self.training_config.distributed_strategy = "onedevicestrategy"
if self.training_config.distributed_strategy == "multiworkermirroredstrategy":
self.logger.info(
"Using MultiWorkerMirroredStrategy as distributed_strategy"
)
# first we need to define the communication options (backend)
if self.training_config.distributed_backend.lower() == "nccl":
self.logger.info(
"Setting CommunicationImplementation.NCCL as distributed_backend"
)
communication_options = tf.distribute.experimental.CommunicationOptions(
implementation=tf.distribute.experimental.CommunicationImplementation.NCCL
)
elif self.training_config.distributed_backend.lower() == "ring":
self.logger.info(
"Setting CommunicationImplementation.RING as distributed_backend"
)
communication_options = tf.distribute.experimental.CommunicationOptions(
implementation=tf.distribute.experimental.CommunicationImplementation.RING
)
else:
self.logger.info(
"Setting CommunicationImplementation.AUTO as distributed_backend"
)
communication_options = tf.distribute.experimental.CommunicationOptions(
implementation=tf.distribute.experimental.CommunicationImplementation.AUTO
)
# second, we can artificially limit the number of gpus by using tf.config.set_visible_devices()
self.devices = tf.config.list_physical_devices("GPU")[
: self.gpus
] # artificially limit visible GPU
self.devices += tf.config.list_physical_devices("CPU") # but add all CPU
self.logger.info(
f"Setting tf.config.set_visible_devices(devices={self.devices})"
)
tf.config.set_visible_devices(devices=self.devices)
# finally we can initialize the strategy
self.logger.info("Initialize MultiWorkerMirroredStrategy()...")
self.strategy = tf.distribute.MultiWorkerMirroredStrategy(
communication_options=communication_options
)
# we're storing the name of the strategy to log as a parameter
self.training_config.distributed_strategy = self.strategy.__class__.__name__
elif self.training_config.distributed_strategy == "mirroredstrategy":
self.devices = [
f"GPU:{i}" for i in range(self.gpus)
] # artificially limit number of gpus (if requested)
self.logger.info(
f"Using MirroredStrategy(devices={self.devices}) as distributed_strategy"
)
# names of devices for MirroredStrategy must be GPU:N
self.strategy = tf.distribute.MirroredStrategy(devices=self.devices)
self.training_config.distributed_strategy = self.strategy.__class__.__name__
elif self.training_config.distributed_strategy == "onedevicestrategy":
self.devices = [f"GPU:{i}" for i in range(self.gpus)]
self.logger.info(
"Using OneDeviceStrategy(devices=GPU:0) as distributed_strategy"
)
self.strategy = tf.distribute.OneDeviceStrategy(device="GPU:0")
self.training_config.distributed_strategy = self.strategy.__class__.__name__
else:
raise ValueError(
f"distributed_strategy={self.training_config.distributed_strategy} is not recognized."
)