in tensorflow_transform/mappers.py [0:0]
def apply_buckets_with_interpolation(
x: common_types.ConsistentTensorType,
bucket_boundaries: common_types.BucketBoundariesType,
name: Optional[str] = None) -> common_types.ConsistentTensorType:
"""Interpolates within the provided buckets and then normalizes to 0 to 1.
A method for normalizing continuous numeric data to the range [0, 1].
Numeric values are first bucketized according to the provided boundaries, then
linearly interpolated within their respective bucket ranges. Finally, the
interpolated values are normalized to the range [0, 1]. Values that are
less than or equal to the lowest boundary, or greater than or equal to the
highest boundary, will be mapped to 0 and 1 respectively. NaN values will be
mapped to the middle of the range (.5).
This is a non-linear approach to normalization that is less sensitive to
outliers than min-max or z-score scaling. When outliers are present, standard
forms of normalization can leave the majority of the data compressed into a
very small segment of the output range, whereas this approach tends to spread
out the more frequent values (if quantile buckets are used). Note that
distance relationships in the raw data are not necessarily preserved (data
points that close to each other in the raw feature space may not be equally
close in the transformed feature space). This means that unlike linear
normalization methods, correlations between features may be distorted by the
transformation. This scaling method may help with stability and minimize
exploding gradients in neural networks.
Args:
x: A numeric input `Tensor`/`CompositeTensor` (tf.float[32|64],
tf.int[32|64]).
bucket_boundaries: Sorted bucket boundaries as a rank-2 `Tensor` or list.
name: (Optional) A name for this operation.
Returns:
A `Tensor` or `CompositeTensor` of the same shape as `x`, normalized to the
range [0, 1]. If the input x is tf.float64, the returned values will be
tf.float64. Otherwise, returned values are tf.float32.
"""
with tf.compat.v1.name_scope(name, 'buckets_with_interpolation'):
bucket_boundaries = tf.convert_to_tensor(bucket_boundaries)
tf.compat.v1.assert_rank(bucket_boundaries, 2)
x_values = _get_values_if_composite(x)
compose_result_fn = _make_composite_tensor_wrapper_if_composite(x)
if not (x_values.dtype.is_floating or x_values.dtype.is_integer):
raise ValueError(
'Input tensor to be normalized must be numeric, got {}.'.format(
x_values.dtype))
# Remove any non-finite boundaries.
if bucket_boundaries.dtype in (tf.float64, tf.float32):
bucket_boundaries = tf.expand_dims(
tf.gather_nd(bucket_boundaries,
tf.where(tf.math.is_finite(bucket_boundaries))),
axis=0)
return_type = tf.float64 if x.dtype == tf.float64 else tf.float32
num_boundaries = tf.cast(
tf.shape(bucket_boundaries)[1], dtype=tf.int64, name='num_boundaries')
assert_some_finite_boundaries = tf.compat.v1.assert_greater(
num_boundaries,
tf.constant(0, tf.int64),
name='assert_1_or_more_finite_boundaries')
with tf.control_dependencies([assert_some_finite_boundaries]):
bucket_indices = _assign_buckets_all_shapes(x_values, bucket_boundaries)
# Get max, min, and width of the corresponding bucket for each element.
bucket_max = tf.cast(
tf.gather(
tf.concat([bucket_boundaries[0], bucket_boundaries[:, -1]],
axis=0), bucket_indices), return_type)
bucket_min = tf.cast(
tf.gather(
tf.concat([bucket_boundaries[:, 0], bucket_boundaries[0]],
axis=0), bucket_indices), return_type)
bucket_width = bucket_max - bucket_min
zeros = tf.zeros_like(x_values, dtype=return_type)
ones = tf.ones_like(x_values, dtype=return_type)
# Linearly interpolate each value within its respective bucket range.
interpolation_value = (
(tf.cast(x_values, return_type) - bucket_min) / bucket_width)
bucket_interpolation = tf.compat.v1.verify_tensor_all_finite(
tf.where(
# If bucket index is first or last, which represents "less than
# min" and "greater than max" respectively, the bucket logically
# has an infinite width and we can't meaningfully interpolate.
tf.logical_or(
tf.equal(bucket_indices, 0),
tf.equal(bucket_indices, num_boundaries)),
zeros,
tf.where(
# If the bucket width is zero due to numerical imprecision,
# there is no point in interpolating
tf.equal(bucket_width, 0.0),
ones / 2.0,
# Finally, for a bucket with a valid width, we can interpolate.
interpolation_value)),
'bucket_interpolation')
bucket_indices_with_interpolation = tf.cast(
tf.maximum(bucket_indices - 1, 0), return_type) + bucket_interpolation
# Normalize the interpolated values to the range [0, 1].
denominator = tf.cast(tf.maximum(num_boundaries - 1, 1), return_type)
normalized_values = bucket_indices_with_interpolation / denominator
if x_values.dtype.is_floating:
# Impute NaNs with .5, the middle value of the normalized output range.
imputed_values = tf.ones_like(x_values, dtype=return_type) / 2.0
normalized_values = tf.where(
tf.math.is_nan(x_values), imputed_values, normalized_values)
# If there is only one boundary, all values < the boundary are 0, all values
# >= the boundary are 1.
single_boundary_values = lambda: tf.where( # pylint: disable=g-long-lambda
tf.equal(bucket_indices, 0), zeros, ones)
normalized_result = tf.cond(
tf.equal(num_boundaries, 1),
single_boundary_values, lambda: normalized_values)
return compose_result_fn(normalized_result)