in python/graph_util.py [0:0]
def compile_subgraphs(graph_def,
workdir=None, args_dict=None, timeout=None, max_num_compilers=None,
verbose=None):
"""Compile `NeuronOp`s in a `GraphDef` proto.
Args:
graph_def: Input `GraphDef` proto that contains `NeuronOp`s.
workdir: None or path-like representing the working directory used by the compiler;
if None, will use `tempfile` to create a temporary workdir for each subgraph,
else will create and use 'workdir/op_name' for each subgraph.
args_dict: Dict `{str: list}` that maps NeuronOp names to compiler arguments;
compiler arguments should be a list of strings, as used in `subprocess.run`.
timeout: Integer representing timeout limit for the compiler. Default: 18000.
max_num_compilers: Integer representing maximum allowed compiler processes.
Default is number of cpu cores.
Returns:
A `GraphDef` proto with `NeuronOp`s already compiled.
"""
if all(node.op != gdu.tNeuronOp for node in graph_def.node):
return graph_def
subgraph_compilers = {}
if workdir is None:
workdir_obj = tempfile.TemporaryDirectory()
workdir_base = workdir_obj.name
else:
workdir_base = os.path.abspath(workdir)
if timeout is None:
timeout = 18000
Compiler = collections.namedtuple('Compiler', 'command verbose workdir_path subgraph_info')
_neuron_cc_input_name = 'graph_def.pb'
_neuron_executable_name = 'graph_def.neff'
neuron_cc = ncc.find_neuron_cc()
if neuron_cc is None:
return graph_def
subgraph_info_format = '{{subgraph {} with input tensors {}, output tensors {}}}'.format
neuron_nodes = gdu.get_neuron_nodes(graph_def)
if not neuron_nodes:
return graph_def
command = []
for node in neuron_nodes:
if len(node.attr['input_names'].list.s) == 0 or len(node.attr['output_names'].list.s) == 0:
continue
subgraph_info = subgraph_info_format(node.name, *_io_tensor_info(node))
io_config_json = _io_config(node)
if io_config_json is None:
logging.warning('Not fusing subgraph {}: --io-config error'.format(subgraph_info))
continue
if any(not TensorShape(shape).is_fully_defined() for shape in node.attr['output_shapes'].list.shape):
logging.warning('Cannot infer output tensor shapes for subgraph {}'.format(node.name))
continue
subgraph_def = gdu.get_subgraph_def(node)
for sgn in subgraph_def.node:
inferred_shapes = sgn.attr.pop(gdu.kNeuronInferredShapes, None)
if gdu.kOutputShapes not in sgn.attr and inferred_shapes is not None:
sgn.attr[gdu.kOutputShapes].CopyFrom(inferred_shapes)
workdir_path = os.path.join(workdir_base, node.name)
os.makedirs(workdir_path, exist_ok=True)
input_path = os.path.join(workdir_path, _neuron_cc_input_name)
with open(input_path, 'wb') as f:
f.write(subgraph_def.SerializeToString())
command = [neuron_cc, 'compile', input_path, '--framework', 'TENSORFLOW',
'--pipeline', 'compile', 'SaveTemps',
'--output', os.path.join(workdir_path, _neuron_executable_name)]
command.extend(['--io-config', io_config_json])
if args_dict is not None:
extend_args = args_dict.get(node.name, [])
if isinstance(extend_args, (str, bytes)):
extend_args = [extend_args]
command.extend(extend_args)
if verbose is not None:
command.extend(['--verbose', str(verbose)])
subgraph_compilers[node.name] = Compiler(command, verbose, workdir_path, subgraph_info)
# try progress bar mode first
try_progress_bar_mode = len(subgraph_compilers) == 1 and verbose is None and workdir is None
parser = argparse.ArgumentParser()
parser.add_argument('--verbose', type=int, default=None)
verbose_args, _ = parser.parse_known_args(command)
progress_bar_mode_done = False
if try_progress_bar_mode and (verbose_args.verbose is None or verbose_args.verbose == 35):
node_name = next(iter(subgraph_compilers))
command = subgraph_compilers[node_name].command.copy()
command.extend(['--verbose=35'])
_, _, workdir_path, subgraph_info = subgraph_compilers[node_name]
info_string = 'fusing subgraph {} with neuron-cc'.format(subgraph_info)
with utils.logging_show_info():
logging.info(info_string)
neuron_cc_stderr_txt_path = os.path.join(workdir_path, 'neuron-cc-stderr.txt')
try:
# note: cannot use subprocess.PIPE due to https://bugs.python.org/issue30154
with open(neuron_cc_stderr_txt_path, 'w') as f:
proc = subprocess.run(command, cwd=workdir_path, timeout=timeout, stderr=f)
except subprocess.TimeoutExpired as e:
logging.warning(e)
progress_bar_mode_done = True
subgraph_compilers[node_name] = None
else:
if proc.returncode == 0:
progress_bar_mode_done = True
else:
with open(neuron_cc_stderr_txt_path, 'r') as f:
neuron_cc_stderr = f.read()
if not neuron_cc_stderr.endswith('IndexError: list index out of range\n'):
# neuron-cc recognized progress bar mode but crashed for other reasons
logging.warning(neuron_cc_stderr)
progress_bar_mode_done = True
subgraph_compilers[node_name] = None
if not progress_bar_mode_done:
if max_num_compilers is None:
num_cpu = multiprocessing.cpu_count()
try:
with open('/proc/meminfo') as f:
for line in f:
if 'MemAvailable' in line:
available_mem_in_kb = int(line.split()[1])
break
num_mem_gb = int(available_mem_in_kb / 4e6) # 4 GB memory for each neuron-cc process
max_num_compilers = max(1, min(num_cpu, num_mem_gb))
except:
max_num_compilers = num_cpu
with ThreadPoolExecutor(max_workers=max_num_compilers) as executor:
compiler_returns = {
node_name: executor.submit(
_fork_compiler, subgraph_compilers, node_name, timeout, try_progress_bar_mode)
for node_name in subgraph_compilers.keys()
}
compiler_returns = {key: value.result() for key, value in compiler_returns.items()}
for node_name in subgraph_compilers.keys():
if not compiler_returns[node_name]:
subgraph_compilers[node_name] = None
# fill NeuronOp properties
for node in gdu.get_neuron_nodes(graph_def):
node.attr['input_batch_axis'].list.i[:] = [-1 for _ in node.attr['input_names'].list.s]
node.attr['output_batch_axis'].list.i[:] = [-1 for _ in node.attr['output_names'].list.s]
if subgraph_compilers.get(node.name, None) is None:
continue
workdir_path = subgraph_compilers[node.name].workdir_path
executable_path = os.path.join(workdir_path, _neuron_executable_name)
with open(executable_path, 'rb') as f:
node.attr['executable'].s = f.read()
return graph_def