def _setup_distribution_strategy()

in cli/jobs/pipelines/tensorflow-image-segmentation/src/tf_helper/training.py [0:0]


    def _setup_distribution_strategy(self):
        """DISTRIBUTED: this takes care of initializing the distribution strategy.

        Tensorflow uses a different "strategy" for each use case:
        - multi-node => MultiWorkerMirroredStrategy
        - single-node multi-gpu => MirroredStrategy
        - single-node single-gpu => OneDeviceStrategy

        Each comes with its own initialization process."""
        # Identify the right strategy depending on params + context
        if self.training_config.distributed_strategy == "auto":
            # Auto detect
            if self.nodes > 1:  # MULTI-NODE
                self.training_config.distributed_strategy = (
                    "multiworkermirroredstrategy"
                )
            elif self.gpus > 1:  # SINGLE-NODE MULTI-GPU
                self.training_config.distributed_strategy = "mirroredstrategy"
            else:  # SINGLE-NODE SINGLE-GPU
                self.training_config.distributed_strategy = "onedevicestrategy"

        if self.training_config.distributed_strategy == "multiworkermirroredstrategy":
            self.logger.info(
                "Using MultiWorkerMirroredStrategy as distributed_strategy"
            )

            # first we need to define the communication options (backend)
            if self.training_config.distributed_backend.lower() == "nccl":
                self.logger.info(
                    "Setting CommunicationImplementation.NCCL as distributed_backend"
                )
                communication_options = tf.distribute.experimental.CommunicationOptions(
                    implementation=tf.distribute.experimental.CommunicationImplementation.NCCL
                )
            elif self.training_config.distributed_backend.lower() == "ring":
                self.logger.info(
                    "Setting CommunicationImplementation.RING as distributed_backend"
                )
                communication_options = tf.distribute.experimental.CommunicationOptions(
                    implementation=tf.distribute.experimental.CommunicationImplementation.RING
                )
            else:
                self.logger.info(
                    "Setting CommunicationImplementation.AUTO as distributed_backend"
                )
                communication_options = tf.distribute.experimental.CommunicationOptions(
                    implementation=tf.distribute.experimental.CommunicationImplementation.AUTO
                )

            # second, we can artificially limit the number of gpus by using tf.config.set_visible_devices()
            self.devices = tf.config.list_physical_devices("GPU")[
                : self.gpus
            ]  # artificially limit visible GPU
            self.devices += tf.config.list_physical_devices("CPU")  # but add all CPU
            self.logger.info(
                f"Setting tf.config.set_visible_devices(devices={self.devices})"
            )
            tf.config.set_visible_devices(devices=self.devices)

            # finally we can initialize the strategy
            self.logger.info("Initialize MultiWorkerMirroredStrategy()...")
            self.strategy = tf.distribute.MultiWorkerMirroredStrategy(
                communication_options=communication_options
            )

            # we're storing the name of the strategy to log as a parameter
            self.training_config.distributed_strategy = self.strategy.__class__.__name__

        elif self.training_config.distributed_strategy == "mirroredstrategy":
            self.devices = [
                f"GPU:{i}" for i in range(self.gpus)
            ]  # artificially limit number of gpus (if requested)
            self.logger.info(
                f"Using MirroredStrategy(devices={self.devices}) as distributed_strategy"
            )

            # names of devices for MirroredStrategy must be GPU:N
            self.strategy = tf.distribute.MirroredStrategy(devices=self.devices)
            self.training_config.distributed_strategy = self.strategy.__class__.__name__

        elif self.training_config.distributed_strategy == "onedevicestrategy":
            self.devices = [f"GPU:{i}" for i in range(self.gpus)]
            self.logger.info(
                "Using OneDeviceStrategy(devices=GPU:0) as distributed_strategy"
            )
            self.strategy = tf.distribute.OneDeviceStrategy(device="GPU:0")
            self.training_config.distributed_strategy = self.strategy.__class__.__name__

        else:
            raise ValueError(
                f"distributed_strategy={self.training_config.distributed_strategy} is not recognized."
            )