easy_rec/python/utils/distribution_utils.py (239 lines of code) (raw):
# -*- encoding:utf-8 -*-
# Copyright (c) Alibaba, Inc. and its affiliates.
from __future__ import print_function
import json
import logging
import os
import tensorflow as tf
from easy_rec.python.protos.train_pb2 import DistributionStrategy
from easy_rec.python.utils import estimator_utils
from easy_rec.python.utils.estimator_utils import chief_to_master
from easy_rec.python.utils.estimator_utils import master_to_chief
DistributionStrategyMap = {
'': DistributionStrategy.NoStrategy,
'ps': DistributionStrategy.PSStrategy,
'ess': DistributionStrategy.ExascaleStrategy,
'mirrored': DistributionStrategy.MirroredStrategy,
'collective': DistributionStrategy.CollectiveAllReduceStrategy
}
def set_distribution_config(pipeline_config, num_worker, num_gpus_per_worker,
distribute_strategy):
if distribute_strategy in [
DistributionStrategy.PSStrategy, DistributionStrategy.MirroredStrategy,
DistributionStrategy.CollectiveAllReduceStrategy,
DistributionStrategy.ExascaleStrategy
]:
pipeline_config.train_config.sync_replicas = False
pipeline_config.train_config.train_distribute = distribute_strategy
pipeline_config.train_config.num_gpus_per_worker = num_gpus_per_worker
print('Dump pipeline_config.train_config:')
print(pipeline_config.train_config)
def set_tf_config_and_get_train_worker_num(
ps_hosts,
worker_hosts,
task_index,
job_name,
distribute_strategy=DistributionStrategy.NoStrategy,
eval_method='none'):
logging.info(
'set_tf_config_and_get_train_worker_num: distribute_strategy = %d' %
distribute_strategy)
worker_hosts = worker_hosts.split(',')
ps_hosts = ps_hosts.split(',') if ps_hosts else []
total_worker_num = len(worker_hosts)
train_worker_num = total_worker_num
print('Original TF_CONFIG=%s' % os.environ.get('TF_CONFIG', ''))
print('worker_hosts=%s ps_hosts=%s task_index=%d job_name=%s' %
(','.join(worker_hosts), ','.join(ps_hosts), task_index, job_name))
print('eval_method=%s' % eval_method)
if distribute_strategy == DistributionStrategy.MirroredStrategy:
assert total_worker_num == 1, 'mirrored distribute strategy only need 1 worker'
elif distribute_strategy in [
DistributionStrategy.NoStrategy, DistributionStrategy.PSStrategy,
DistributionStrategy.CollectiveAllReduceStrategy,
DistributionStrategy.ExascaleStrategy
]:
cluster, task_type, task_index_ = estimator_utils.parse_tf_config()
train_worker_num = 0
if eval_method == 'separate':
if 'evaluator' in cluster:
# 'evaluator' in cluster indicates user use new-style cluster content
if 'chief' in cluster:
train_worker_num += len(cluster['chief'])
elif 'master' in cluster:
train_worker_num += len(cluster['master'])
if 'worker' in cluster:
train_worker_num += len(cluster['worker'])
# drop evaluator to avoid hang
if distribute_strategy == DistributionStrategy.NoStrategy:
del cluster['evaluator']
tf_config = {
'cluster': cluster,
'task': {
'type': task_type,
'index': task_index_
}
}
os.environ['TF_CONFIG'] = json.dumps(tf_config)
else:
# backward compatibility, if user does not assign one evaluator in
# -Dcluster, we use first worker for chief, second for evaluation
train_worker_num = total_worker_num - 1
assert train_worker_num > 0, 'in distribution mode worker num must be greater than 1, ' \
'the second worker will be used as evaluator'
if len(worker_hosts) > 1:
cluster = {'chief': [worker_hosts[0]], 'worker': worker_hosts[2:]}
if distribute_strategy != DistributionStrategy.NoStrategy:
cluster['evaluator'] = [worker_hosts[1]]
if len(ps_hosts) > 0:
cluster['ps'] = ps_hosts
if job_name == 'ps':
os.environ['TF_CONFIG'] = json.dumps({
'cluster': cluster,
'task': {
'type': job_name,
'index': task_index
}
})
elif job_name == 'worker':
if task_index == 0:
os.environ['TF_CONFIG'] = json.dumps({
'cluster': cluster,
'task': {
'type': 'chief',
'index': 0
}
})
elif task_index == 1:
os.environ['TF_CONFIG'] = json.dumps({
'cluster': cluster,
'task': {
'type': 'evaluator',
'index': 0
}
})
else:
os.environ['TF_CONFIG'] = json.dumps({
'cluster': cluster,
'task': {
'type': job_name,
'index': task_index - 2
}
})
else:
if 'evaluator' in cluster:
evaluator = cluster['evaluator']
del cluster['evaluator']
# 'evaluator' in cluster indicates user use new-style cluster content
train_worker_num += 1
if 'chief' in cluster:
train_worker_num += len(cluster['chief'])
elif 'master' in cluster:
train_worker_num += len(cluster['master'])
if 'worker' in cluster:
train_worker_num += len(cluster['worker'])
cluster['worker'].append(evaluator[0])
else:
cluster['worker'] = [evaluator[0]]
if task_type == 'evaluator':
tf_config = {
'cluster': cluster,
'task': {
'type': 'worker',
'index': train_worker_num - 2
}
}
else:
tf_config = {
'cluster': cluster,
'task': {
'type': task_type,
'index': task_index_
}
}
os.environ['TF_CONFIG'] = json.dumps(tf_config)
else:
cluster = {'chief': [worker_hosts[0]], 'worker': worker_hosts[1:]}
train_worker_num = len(worker_hosts)
if len(ps_hosts) > 0:
cluster['ps'] = ps_hosts
if job_name == 'ps':
os.environ['TF_CONFIG'] = json.dumps({
'cluster': cluster,
'task': {
'type': job_name,
'index': task_index
}
})
else:
if task_index == 0:
os.environ['TF_CONFIG'] = json.dumps({
'cluster': cluster,
'task': {
'type': 'chief',
'index': 0
}
})
else:
os.environ['TF_CONFIG'] = json.dumps({
'cluster': cluster,
'task': {
'type': 'worker',
'index': task_index - 1
}
})
if eval_method == 'none':
# change master to chief, will not evaluate
master_to_chief()
elif eval_method == 'master':
# change chief to master, will evaluate on master
chief_to_master()
else:
assert distribute_strategy == '', 'invalid distribute_strategy %s'\
% distribute_strategy
cluster, task_type, task_index = estimator_utils.parse_tf_config()
print('Final TF_CONFIG = %s' % os.environ.get('TF_CONFIG', ''))
tf.logging.info('TF_CONFIG %s' % os.environ.get('TF_CONFIG', ''))
tf.logging.info('distribute_stategy %s, train_worker_num: %d' %
(distribute_strategy, train_worker_num))
# remove pai chief-worker waiting strategy
# which is conflicted with worker waiting strategy in easyrec
if 'TF_WRITE_WORKER_STATUS_FILE' in os.environ:
del os.environ['TF_WRITE_WORKER_STATUS_FILE']
return train_worker_num
def set_tf_config_and_get_train_worker_num_on_ds():
if 'TF_CONFIG' not in os.environ:
return
tf_config = json.loads(os.environ['TF_CONFIG'])
if 'cluster' in tf_config and 'ps' in tf_config['cluster'] and (
'evaluator' not in tf_config['cluster']):
easyrec_tf_config = dict()
easyrec_tf_config['cluster'] = {}
easyrec_tf_config['task'] = {}
easyrec_tf_config['cluster']['ps'] = tf_config['cluster']['ps']
easyrec_tf_config['cluster']['chief'] = [tf_config['cluster']['worker'][0]]
easyrec_tf_config['cluster']['worker'] = tf_config['cluster']['worker'][2:]
if tf_config['task']['type'] == 'worker' and tf_config['task']['index'] == 0:
easyrec_tf_config['task']['type'] = 'chief'
easyrec_tf_config['task']['index'] = 0
elif tf_config['task']['type'] == 'worker' and tf_config['task'][
'index'] == 1:
easyrec_tf_config['task']['type'] = 'evaluator'
easyrec_tf_config['task']['index'] = 0
elif tf_config['task']['type'] == 'worker':
easyrec_tf_config['task']['type'] = tf_config['task']['type']
easyrec_tf_config['task']['index'] = tf_config['task']['index'] - 2
else:
easyrec_tf_config['task']['type'] = tf_config['task']['type']
easyrec_tf_config['task']['index'] = tf_config['task']['index']
os.environ['TF_CONFIG'] = json.dumps(easyrec_tf_config)
def set_tf_config_and_get_distribute_eval_worker_num_on_ds():
assert 'TF_CONFIG' in os.environ, "'TF_CONFIG' must in os.environ"
tf_config = json.loads(os.environ['TF_CONFIG'])
if 'cluster' in tf_config and 'ps' in tf_config['cluster'] and (
'evaluator' not in tf_config['cluster']):
easyrec_tf_config = dict()
easyrec_tf_config['cluster'] = {}
easyrec_tf_config['task'] = {}
easyrec_tf_config['cluster']['ps'] = tf_config['cluster']['ps']
easyrec_tf_config['cluster']['chief'] = [tf_config['cluster']['worker'][0]]
easyrec_tf_config['cluster']['worker'] = tf_config['cluster']['worker'][1:]
if tf_config['task']['type'] == 'worker' and tf_config['task']['index'] == 0:
easyrec_tf_config['task']['type'] = 'chief'
easyrec_tf_config['task']['index'] = 0
elif tf_config['task']['type'] == 'worker':
easyrec_tf_config['task']['type'] = tf_config['task']['type']
easyrec_tf_config['task']['index'] = tf_config['task']['index'] - 1
else:
easyrec_tf_config['task']['type'] = tf_config['task']['type']
easyrec_tf_config['task']['index'] = tf_config['task']['index']
os.environ['TF_CONFIG'] = json.dumps(easyrec_tf_config)