def _batch_reducible_process()

in tensorflow_model_analysis/extractors/tfjs_predict_extractor.py [0:0]


  def _batch_reducible_process(
      self, element: types.Extracts) -> Sequence[types.Extracts]:
    """Invokes the tfjs model on the provided inputs and stores the result."""
    result = copy.copy(element)
    result[constants.PREDICTIONS_KEY] = []

    batched_features = collections.defaultdict(list)
    feature_rows = element[constants.FEATURES_KEY]
    for r in feature_rows:
      for key, value in r.items():
        if value.dtype == np.int64:
          value = value.astype(np.int32)
        batched_features[key].append(value)

    for spec in self._eval_config.model_specs:
      model_name = spec.name if len(self._eval_config.model_specs) > 1 else ''
      if model_name not in self._loaded_models:
        raise ValueError('model for "{}" not found: eval_config={}'.format(
            spec.name, self._eval_config))

      model_features = {}
      for k in self._model_properties[model_name]['inputs']:
        k_name = k.split(':')[0]
        if k_name not in batched_features:
          raise ValueError('model requires feature "{}" not available in '
                           'input.'.format(k_name))
        dim = self._model_properties[model_name]['inputs'][k]
        elems = []
        for i in batched_features[k_name]:
          if np.ndim(i) > len(dim):
            raise ValueError('ranks for input "{}" are not compatible '
                             'with the model.'.format(k_name))
          # TODO(dzats): See if we can support case where multiple dimensions
          # are not defined.
          elems.append(np.reshape(i, dim))
        model_features[k] = elems

      model_features = {k: np.concatenate(v) for k, v in model_features.items()}

      batched_entries = collections.defaultdict(list)
      for feature, value in model_features.items():
        batched_entries[_DATA_JSON].append(value.tolist())
        batched_entries[_DTYPE_JSON].append(str(value.dtype))
        batched_entries[_SHAPE_JSON].append(value.shape)
        batched_entries[_TF_INPUT_NAME_JSON].append(feature)

      cur_subdir = str(uuid.uuid4())
      cur_input_path = os.path.join(self._model_properties[model_name]['path'],
                                    _EXAMPLES_SUBDIR, cur_subdir)
      tf.io.gfile.makedirs(cur_input_path)
      for entry, value in batched_entries.items():
        with tf.io.gfile.GFile(os.path.join(cur_input_path, entry), 'w') as f:
          f.write(json.dumps(value))

      cur_output_path = os.path.join(self._model_properties[model_name]['path'],
                                     _OUTPUTS_SUBDIR, cur_subdir)
      tf.io.gfile.makedirs(cur_output_path)
      inference_command = [
          self._binary_path, '--model_path=' +
          os.path.join(self._model_properties[model_name]['path'], _MODEL_JSON),
          '--inputs_dir=' + cur_input_path, '--outputs_dir=' + cur_output_path
      ]

      popen = subprocess.Popen(
          inference_command,
          stdin=subprocess.PIPE,
          stdout=subprocess.PIPE,
          stderr=subprocess.PIPE)
      stdout, stderr = popen.communicate()
      if popen.returncode != 0:
        raise ValueError(
            'Inference failed with status {}\nstdout:\n{}\nstderr:\n{}'.format(
                popen.returncode, stdout, stderr))

      try:
        with tf.io.gfile.GFile(os.path.join(cur_output_path, _DATA_JSON)) as f:
          data = json.load(f)
        with tf.io.gfile.GFile(os.path.join(cur_output_path, _DTYPE_JSON)) as f:
          dtype = json.load(f)
        with tf.io.gfile.GFile(os.path.join(cur_output_path, _SHAPE_JSON)) as f:
          shape = json.load(f)
      except FileNotFoundError as e:
        raise FileNotFoundError(
            'Unable to find files containing inference result. This likely '
            'means that inference did not succeed. Error {}'.format(e))

      name = [
          n.split(':')[0]
          for n in self._model_properties[model_name]['outputs'].keys()
      ]

      tf.io.gfile.rmtree(cur_input_path)
      tf.io.gfile.rmtree(cur_output_path)

      outputs = {}
      for n, s, t, d in zip(name, shape, dtype, data):
        d_val = [d[str(i)] for i in range(len(d))]
        outputs[n] = np.reshape(np.array(d_val, t), s)

      for v in outputs.values():
        if len(v) != len(feature_rows):
          raise ValueError('Did not get the expected number of results.')

      for i in range(len(feature_rows)):
        output = {k: v[i] for k, v in outputs.items()}

        if len(output) == 1:
          output = list(output.values())[0]

        if len(self._eval_config.model_specs) == 1:
          result[constants.PREDICTIONS_KEY].append(output)
        else:
          if i >= len(result[constants.PREDICTIONS_KEY]):
            result[constants.PREDICTIONS_KEY].append({})
          result[constants.PREDICTIONS_KEY][i].update({spec.name: output})
    return [result]