in tensorflow_transform/analyzers.py [0:0]
def _numeric_combine(inputs: List[tf.Tensor],
fn: Callable[[np.ndarray], np.ndarray],
default_accumulator_value: Union[float, int],
reduce_instance_dims: bool = True,
output_dtypes: Optional[List[tf.DType]] = None,
key: Optional[tf.Tensor] = None,
key_vocabulary_filename: Optional[str] = None):
"""Apply a reduction, defined by a numpy function to multiple inputs.
Args:
inputs: A list of tensors, which will be independently reduced.
fn: A function to reduce tensors across instances/batches, to get a single
output.
default_accumulator_value: The default scalar value that each accumulator
entry is initialized to. Must be properly processed by the reduction
function.
reduce_instance_dims: By default collapses the batch and instance dimensions
to arrive at a single scalar output. If False, only collapses the batch
dimension and outputs a vector of the same shape as the input.
output_dtypes: (Optional) A list of dtypes of the output tensors. If None,
the output tensor has the same type as the input one.
key: (Optional) Apply the same operation, but on a per-key basis.
key_vocabulary_filename: (Optional) The file name for the key-output mapping
file. If None and key are provided, this combiner assumes the keys fit in
memory and will not store the result in a file. If empty string, a file
name will be chosen based on the current scope. If not an empty string,
should be unique within a given preprocessing function.
Returns:
Either:
(A) A list of Tensors with the same length as `inputs`, representing the
input Tensors that have been reduced by `fn` across instances and
batches (if key_vocabulary_filename is None).
(B) A Tensor with the filename where the key-value mapping is stored (if
key_vocabulary_filename is not None).
"""
for x in inputs:
if not isinstance(x, tf.Tensor):
raise TypeError('Expected a Tensor, but got %r' % x)
if not np.isscalar(default_accumulator_value):
raise TypeError('Expected a scalar, but got %r' % default_accumulator_value)
if output_dtypes is None:
output_dtypes = [x.dtype for x in inputs]
if reduce_instance_dims:
# If reducing over all dimensions, result is scalar.
output_shapes = [() for _ in inputs]
else:
# Reducing over batch dimensions.
output_shapes = [
(tuple(x.get_shape()) if x.get_shape().is_fully_defined() else None)
for x in inputs
]
combiner = NumPyCombiner(fn, default_accumulator_value,
[dtype.as_numpy_dtype for dtype in output_dtypes],
output_shapes)
if key is None:
return _apply_cacheable_combiner(combiner, *inputs)
if key_vocabulary_filename is None:
return _apply_cacheable_combiner_per_key(combiner, key, *inputs)
return _apply_cacheable_combiner_per_key_large(
combiner, _maybe_get_per_key_vocab_filename(key_vocabulary_filename),
key, *inputs)