def apply_buckets_with_interpolation()

in tensorflow_transform/mappers.py [0:0]


def apply_buckets_with_interpolation(
    x: common_types.ConsistentTensorType,
    bucket_boundaries: common_types.BucketBoundariesType,
    name: Optional[str] = None) -> common_types.ConsistentTensorType:
  """Interpolates within the provided buckets and then normalizes to 0 to 1.

  A method for normalizing continuous numeric data to the range [0, 1].
  Numeric values are first bucketized according to the provided boundaries, then
  linearly interpolated within their respective bucket ranges. Finally, the
  interpolated values are normalized to the range [0, 1]. Values that are
  less than or equal to the lowest boundary, or greater than or equal to the
  highest boundary, will be mapped to 0 and 1 respectively. NaN values will be
  mapped to the middle of the range (.5).

  This is a non-linear approach to normalization that is less sensitive to
  outliers than min-max or z-score scaling. When outliers are present, standard
  forms of normalization can leave the majority of the data compressed into a
  very small segment of the output range, whereas this approach tends to spread
  out the more frequent values (if quantile buckets are used). Note that
  distance relationships in the raw data are not necessarily preserved (data
  points that close to each other in the raw feature space may not be equally
  close in the transformed feature space). This means that unlike linear
  normalization methods, correlations between features may be distorted by the
  transformation. This scaling method may help with stability and minimize
  exploding gradients in neural networks.

  Args:
    x: A numeric input `Tensor`/`CompositeTensor` (tf.float[32|64],
      tf.int[32|64]).
    bucket_boundaries: Sorted bucket boundaries as a rank-2 `Tensor` or list.
    name: (Optional) A name for this operation.

  Returns:
    A `Tensor` or `CompositeTensor` of the same shape as `x`, normalized to the
      range [0, 1]. If the input x is tf.float64, the returned values will be
      tf.float64. Otherwise, returned values are tf.float32.

  """
  with tf.compat.v1.name_scope(name, 'buckets_with_interpolation'):
    bucket_boundaries = tf.convert_to_tensor(bucket_boundaries)
    tf.compat.v1.assert_rank(bucket_boundaries, 2)
    x_values = _get_values_if_composite(x)
    compose_result_fn = _make_composite_tensor_wrapper_if_composite(x)
    if not (x_values.dtype.is_floating or x_values.dtype.is_integer):
      raise ValueError(
          'Input tensor to be normalized must be numeric, got {}.'.format(
              x_values.dtype))
    # Remove any non-finite boundaries.
    if bucket_boundaries.dtype in (tf.float64, tf.float32):
      bucket_boundaries = tf.expand_dims(
          tf.gather_nd(bucket_boundaries,
                       tf.where(tf.math.is_finite(bucket_boundaries))),
          axis=0)
    return_type = tf.float64 if x.dtype == tf.float64 else tf.float32
    num_boundaries = tf.cast(
        tf.shape(bucket_boundaries)[1], dtype=tf.int64, name='num_boundaries')
    assert_some_finite_boundaries = tf.compat.v1.assert_greater(
        num_boundaries,
        tf.constant(0, tf.int64),
        name='assert_1_or_more_finite_boundaries')
    with tf.control_dependencies([assert_some_finite_boundaries]):
      bucket_indices = _assign_buckets_all_shapes(x_values, bucket_boundaries)
      # Get max, min, and width of the corresponding bucket for each element.
      bucket_max = tf.cast(
          tf.gather(
              tf.concat([bucket_boundaries[0], bucket_boundaries[:, -1]],
                        axis=0), bucket_indices), return_type)
      bucket_min = tf.cast(
          tf.gather(
              tf.concat([bucket_boundaries[:, 0], bucket_boundaries[0]],
                        axis=0), bucket_indices), return_type)
    bucket_width = bucket_max - bucket_min
    zeros = tf.zeros_like(x_values, dtype=return_type)
    ones = tf.ones_like(x_values, dtype=return_type)

    # Linearly interpolate each value within its respective bucket range.
    interpolation_value = (
        (tf.cast(x_values, return_type) - bucket_min) / bucket_width)
    bucket_interpolation = tf.compat.v1.verify_tensor_all_finite(
        tf.where(
            # If bucket index is first or last, which represents "less than
            # min" and "greater than max" respectively, the bucket logically
            # has an infinite width and we can't meaningfully interpolate.
            tf.logical_or(
                tf.equal(bucket_indices, 0),
                tf.equal(bucket_indices, num_boundaries)),
            zeros,
            tf.where(
                # If the bucket width is zero due to numerical imprecision,
                # there is no point in interpolating
                tf.equal(bucket_width, 0.0),
                ones / 2.0,
                # Finally, for a bucket with a valid width, we can interpolate.
                interpolation_value)),
        'bucket_interpolation')
    bucket_indices_with_interpolation = tf.cast(
        tf.maximum(bucket_indices - 1, 0), return_type) + bucket_interpolation

    # Normalize the interpolated values to the range [0, 1].
    denominator = tf.cast(tf.maximum(num_boundaries - 1, 1), return_type)
    normalized_values = bucket_indices_with_interpolation / denominator
    if x_values.dtype.is_floating:
      # Impute NaNs with .5, the middle value of the normalized output range.
      imputed_values = tf.ones_like(x_values, dtype=return_type) / 2.0
      normalized_values = tf.where(
          tf.math.is_nan(x_values), imputed_values, normalized_values)
    # If there is only one boundary, all values < the boundary are 0, all values
    # >= the boundary are 1.
    single_boundary_values = lambda: tf.where(  # pylint: disable=g-long-lambda
        tf.equal(bucket_indices, 0), zeros, ones)
    normalized_result = tf.cond(
        tf.equal(num_boundaries, 1),
        single_boundary_values, lambda: normalized_values)
    return compose_result_fn(normalized_result)