in tensorflow_model_analysis/extractors/tfjs_predict_extractor.py [0:0]
def _batch_reducible_process(
self, element: types.Extracts) -> Sequence[types.Extracts]:
"""Invokes the tfjs model on the provided inputs and stores the result."""
result = copy.copy(element)
result[constants.PREDICTIONS_KEY] = []
batched_features = collections.defaultdict(list)
feature_rows = element[constants.FEATURES_KEY]
for r in feature_rows:
for key, value in r.items():
if value.dtype == np.int64:
value = value.astype(np.int32)
batched_features[key].append(value)
for spec in self._eval_config.model_specs:
model_name = spec.name if len(self._eval_config.model_specs) > 1 else ''
if model_name not in self._loaded_models:
raise ValueError('model for "{}" not found: eval_config={}'.format(
spec.name, self._eval_config))
model_features = {}
for k in self._model_properties[model_name]['inputs']:
k_name = k.split(':')[0]
if k_name not in batched_features:
raise ValueError('model requires feature "{}" not available in '
'input.'.format(k_name))
dim = self._model_properties[model_name]['inputs'][k]
elems = []
for i in batched_features[k_name]:
if np.ndim(i) > len(dim):
raise ValueError('ranks for input "{}" are not compatible '
'with the model.'.format(k_name))
# TODO(dzats): See if we can support case where multiple dimensions
# are not defined.
elems.append(np.reshape(i, dim))
model_features[k] = elems
model_features = {k: np.concatenate(v) for k, v in model_features.items()}
batched_entries = collections.defaultdict(list)
for feature, value in model_features.items():
batched_entries[_DATA_JSON].append(value.tolist())
batched_entries[_DTYPE_JSON].append(str(value.dtype))
batched_entries[_SHAPE_JSON].append(value.shape)
batched_entries[_TF_INPUT_NAME_JSON].append(feature)
cur_subdir = str(uuid.uuid4())
cur_input_path = os.path.join(self._model_properties[model_name]['path'],
_EXAMPLES_SUBDIR, cur_subdir)
tf.io.gfile.makedirs(cur_input_path)
for entry, value in batched_entries.items():
with tf.io.gfile.GFile(os.path.join(cur_input_path, entry), 'w') as f:
f.write(json.dumps(value))
cur_output_path = os.path.join(self._model_properties[model_name]['path'],
_OUTPUTS_SUBDIR, cur_subdir)
tf.io.gfile.makedirs(cur_output_path)
inference_command = [
self._binary_path, '--model_path=' +
os.path.join(self._model_properties[model_name]['path'], _MODEL_JSON),
'--inputs_dir=' + cur_input_path, '--outputs_dir=' + cur_output_path
]
popen = subprocess.Popen(
inference_command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = popen.communicate()
if popen.returncode != 0:
raise ValueError(
'Inference failed with status {}\nstdout:\n{}\nstderr:\n{}'.format(
popen.returncode, stdout, stderr))
try:
with tf.io.gfile.GFile(os.path.join(cur_output_path, _DATA_JSON)) as f:
data = json.load(f)
with tf.io.gfile.GFile(os.path.join(cur_output_path, _DTYPE_JSON)) as f:
dtype = json.load(f)
with tf.io.gfile.GFile(os.path.join(cur_output_path, _SHAPE_JSON)) as f:
shape = json.load(f)
except FileNotFoundError as e:
raise FileNotFoundError(
'Unable to find files containing inference result. This likely '
'means that inference did not succeed. Error {}'.format(e))
name = [
n.split(':')[0]
for n in self._model_properties[model_name]['outputs'].keys()
]
tf.io.gfile.rmtree(cur_input_path)
tf.io.gfile.rmtree(cur_output_path)
outputs = {}
for n, s, t, d in zip(name, shape, dtype, data):
d_val = [d[str(i)] for i in range(len(d))]
outputs[n] = np.reshape(np.array(d_val, t), s)
for v in outputs.values():
if len(v) != len(feature_rows):
raise ValueError('Did not get the expected number of results.')
for i in range(len(feature_rows)):
output = {k: v[i] for k, v in outputs.items()}
if len(output) == 1:
output = list(output.values())[0]
if len(self._eval_config.model_specs) == 1:
result[constants.PREDICTIONS_KEY].append(output)
else:
if i >= len(result[constants.PREDICTIONS_KEY]):
result[constants.PREDICTIONS_KEY].append({})
result[constants.PREDICTIONS_KEY][i].update({spec.name: output})
return [result]