in spark/spark-tensorflow-distributor/spark_tensorflow_distributor/mirrored_strategy_runner.py [0:0]
def __init__(self,
*,
num_slots,
local_mode=False,
use_gpu=True,
gpu_resource_name='gpu',
use_custom_strategy=False):
"""
Args:
num_slots: Total number of GPUs or CPU only Spark tasks that
participate in distributed training. For example, if
num_slots = 16 we train on the Spark cluster with 16 GPUs
if doing GPU training, or with 16 Spark tasks if doing
CPU training. num_slots cannot be less than or equal to 0.
Note that when doing CPU training, Spark will still be
subject to any GPU-aware scheduling confs set
in the Spark configuration. Note also that for GPU training,
num_slots will limit the number of GPUs used for training
even if more are available, so that exactly num_slots GPUs
are used in total. Spark does not restrict CPU cores for
tasks and so for CPU training, num_slots rarely needs to
be greater than the number of workers and in local mode
set num_slots=1.
local_mode: If True, the training function will be run locally
on the driver. If False training is distributed among the
workers.
use_gpu: If True, training is done with GPUs using Spark
resource scheduling with the gpu_resource_name parameter
as the resource name. If False, do CPU only training.
gpu_resource_name: The name of the Spark resource scheduling
GPU resource. It may be set under
`spark.executor.resource.{gpu_resource_name}`,
`spark.task.resource.{gpu_resource_name}`,
`spark.driver.resource.{gpu_resource_name}`, and
`spark.worker.resource.{gpu_resource_name}` in the Spark
conf. Contact the cluster administrator to set these
configurations. The resource should be configured with
a discovery script that is formatted according to the
Spark configuration docs. Make sure
`spark.driver.resource.{gpu_resource_name}.discoveryScript` and
`spark.driver.resource.{gpu_resource_name}.discoveryScript` are
also set in the Spark conf. In particular, the GPU addresses
should be zero indexed. For example, the output of the
discovery script for 3 GPUs with gpu_resource_name='gpu'
would be `{"name": "gpu", "addresses":["0","1","2"]}`.
See an example discovery script: `github.com/apache/spark/blob/
master/examples/src/main/scripts/getGpusResources.sh`.
use_custom_strategy: When true, the training function passed to the
MirroredStrategyRunner.run method must construct and use its
own tensorflow.distribute.Strategy() object. When false,
MirroredStrategyRunner constructs one for the user and wraps
the training function in the strategy context, allowing
the user to provide non-distributed TensorFlow code that is
executed as distributed code.
Example with use_custom_strategy=True:
def train_fn():
import tensorflow as tf
strategy = tf.distribute.experimental \
.MultiWorkerMirroredStrategy()
with strategy.scope():
# training code
Example with use_custom_strategy=False:
def train_fn():
import tensorflow as tf
# training code
"""
self._logger = _get_logger(self.__class__.__name__)
self._num_slots = num_slots
if num_slots <= 0:
raise ValueError(f'num_slots is set to {num_slots} but '
'cannot be less than or equal to 0.')
self._local_mode = local_mode
self._use_gpu = use_gpu
self._gpu_resource_name = gpu_resource_name
self._use_custom_strategy = use_custom_strategy
if self._use_gpu:
self._logger.info('Doing GPU training...')
else:
self._logger.info('Doing CPU training...')
spark = SparkSession.builder.getOrCreate()
self.sc = spark.sparkContext
if self._local_mode is True:
self._logger.warning('MirroredStrategyRunner will run in '
'local mode on the driver node. '
'There would be resource contention if '
'the driver also runs other workloads.')
self._num_tasks = None
else:
self._num_tasks = self.get_num_tasks()
self._logger.info(f'Will run with {self._num_tasks} Spark tasks.')