tensorflow_model_analysis/evaluators/eval_saved_model_util.py [93:145]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
class _AggState:
  """Combine state for AggregateCombineFn.

  There are two parts to the state: the metric variables (the actual state),
  and a list of FeaturesPredictionsLabels or other inputs. See
  _AggregateCombineFn for why we need this.
  """

  # We really want the batch size to be adaptive like it is in
  # beam.BatchElements(), but there isn't an easy way to make it so. For now
  # we will limit stored inputs to a max overall byte size.
  # TODO(b/73789023): Figure out how to make this batch size dynamic.
  _TOTAL_INPUT_BYTE_SIZE_THRESHOLD = 16 << 20  # 16MiB
  _DEFAULT_DESIRED_BATCH_SIZE = 1000

  __slots__ = [
      'metric_variables', 'inputs', 'size_estimator', '_desired_batch_size'
  ]

  # TODO(b/173811366): Consider removing the desired_batch_size knob and
  # only use input size.
  def __init__(self, desired_batch_size: Optional[int] = None):
    self.metric_variables = None  # type: Optional[types.MetricVariablesType]
    self.inputs = []  # type: List[bytes]
    self.size_estimator = size_estimator.SizeEstimator(
        size_threshold=self._TOTAL_INPUT_BYTE_SIZE_THRESHOLD, size_fn=len)
    if desired_batch_size and desired_batch_size > 0:
      self._desired_batch_size = desired_batch_size
    else:
      self._desired_batch_size = self._DEFAULT_DESIRED_BATCH_SIZE

  def __iadd__(self, other: '_AggState') -> '_AggState':
    self.metric_variables = _add_metric_variables(self.metric_variables,
                                                  other.metric_variables)
    self.inputs.extend(other.inputs)
    self.size_estimator += other.size_estimator
    return self

  def add_input(self, new_input: bytes):
    self.inputs.append(new_input)
    self.size_estimator.update(new_input)

  def clear_inputs(self):
    del self.inputs[:]
    self.size_estimator.clear()

  def add_metrics_variables(self, metric_variables: types.MetricVariablesType):
    self.metric_variables = _add_metric_variables(self.metric_variables,
                                                  metric_variables)

  def should_flush(self) -> bool:
    return (len(self.inputs) >= self._desired_batch_size or
            self.size_estimator.should_flush())
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



tensorflow_model_analysis/evaluators/legacy_aggregate.py [80:130]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
class _AggState:
  """Combine state for AggregateCombineFn.

  There are two parts to the state: the metric variables (the actual state),
  and a list of FeaturesPredictionsLabels or other inputs. See
  _AggregateCombineFn for why we need this.
  """

  # We really want the batch size to be adaptive like it is in
  # beam.BatchElements(), but there isn't an easy way to make it so. For now
  # we will limit stored inputs to a max overall byte size.
  # TODO(b/73789023): Figure out how to make this batch size dynamic.
  _TOTAL_INPUT_BYTE_SIZE_THRESHOLD = 16 << 20  # 16MiB
  _DEFAULT_DESIRED_BATCH_SIZE = 1000

  __slots__ = [
      'metric_variables', 'inputs', 'size_estimator', '_desired_batch_size'
  ]

  def __init__(self, desired_batch_size: Optional[int] = None):
    self.metric_variables = None  # type: Optional[types.MetricVariablesType]
    self.inputs = []  # type: List[bytes]
    self.size_estimator = size_estimator.SizeEstimator(
        size_threshold=self._TOTAL_INPUT_BYTE_SIZE_THRESHOLD, size_fn=len)
    if desired_batch_size and desired_batch_size > 0:
      self._desired_batch_size = desired_batch_size
    else:
      self._desired_batch_size = self._DEFAULT_DESIRED_BATCH_SIZE

  def __iadd__(self, other: '_AggState') -> '_AggState':
    self.metric_variables = _add_metric_variables(self.metric_variables,
                                                  other.metric_variables)
    self.inputs.extend(other.inputs)
    self.size_estimator += other.size_estimator
    return self

  def add_input(self, new_input: bytes):
    self.inputs.append(new_input)
    self.size_estimator.update(new_input)

  def clear_inputs(self):
    del self.inputs[:]
    self.size_estimator.clear()

  def add_metrics_variables(self, metric_variables: types.MetricVariablesType):
    self.metric_variables = _add_metric_variables(self.metric_variables,
                                                  metric_variables)

  def should_flush(self) -> bool:
    return (len(self.inputs) >= self._desired_batch_size or
            self.size_estimator.should_flush())
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



