def peaks()

in moonlight/util/segments.py [0:0]


def peaks(values, minval=None, invalidate_distance=0, name=None):
  """Labels peaks in values.

  Args:
    values: 1D tensor of a numeric type.
    minval: Minimum value which is considered a peak.
    invalidate_distance: Invalidates nearby potential peaks. The peaks are
      searched sequentially by descending value, and from left to right for
      equal values. Once a peak is found in this order, it invalidates any peaks
      yet to be seen that are <= invalidate_distance away. A distance of 0
      effectively produces no invalidation.
    name: Optional name for the op.

  Returns:
    peak_centers: The (rounded) centers of each peak, which are locations where
        the value is higher than the value before and after. If there is a run
        of equal values at the peak, the rounded center of the run is returned.
        int32 1D tensor.
  """
  with tf.name_scope(name, "peaks", [values]):
    values = tf.convert_to_tensor(values, name="values")
    invalidate_distance = tf.convert_to_tensor(
        invalidate_distance, name="invalidate_distance", dtype=tf.int32)
    # Segment the values and find local maxima.
    # Take the center of each run of consecutive equal values.
    segment_centers, _ = _segments_1d(values, mode=SegmentsMode.CENTERS)
    segment_values = tf.gather(values, segment_centers)
    # If we have zero or one segments, there are no peaks. Just use zeros as the
    # edge values in that case.
    first_val, second_val, penultimate_val, last_val = tf.cond(
        # pyformat: disable
        tf.greater_equal(tf.shape(segment_values)[0], 2),
        lambda: tuple(segment_values[i] for i in (0, 1, -2, -1)),
        lambda: tuple(tf.constant(0, values.dtype) for i in range(4)))
    # Each segment must be greater than the segment before and after it.
    segment_is_peak = tf.concat(
        [[first_val > second_val],
         tf.greater(segment_values[1:-1],
                    tf.maximum(segment_values[:-2], segment_values[2:])),
         [last_val > penultimate_val]],
        axis=0)
    if minval is not None:
      # Filter the peaks by minval.
      segment_is_peak = tf.logical_and(segment_is_peak,
                                       tf.greater_equal(segment_values, minval))

    # Get the center coordinates of each peak, and sort by descending value.
    all_peaks = tf.boolean_mask(segment_centers, segment_is_peak)
    num_peaks = tf.shape(all_peaks)[0]
    peak_values = tf.boolean_mask(segment_values, segment_is_peak)
    _, peak_order = tf.nn.top_k(peak_values, k=num_peaks, sorted=True)
    all_peaks = tf.gather(all_peaks, peak_order)
    all_peaks.set_shape([None])

    # Loop over the peaks, accepting one at a time and possibly invalidating
    # other ones.
    def loop_condition(_, current_peaks):
      return tf.shape(current_peaks)[0] > 0

    def loop_body(accepted_peaks, current_peaks):
      peak = current_peaks[0]
      remaining_peaks = current_peaks[1:]

      keep_peaks = tf.greater(
          tf.abs(remaining_peaks - peak), invalidate_distance)
      remaining_peaks = tf.boolean_mask(remaining_peaks, keep_peaks)

      return tf.concat([accepted_peaks, [peak]], axis=0), remaining_peaks

    accepted_peaks = tf.while_loop(
        loop_condition,
        loop_body, [tf.zeros([0], all_peaks.dtype), all_peaks],
        shape_invariants=[tf.TensorShape([None]),
                          tf.TensorShape([None])])[0]
    # Sort the peaks by index.
    # TODO(ringw): Add a tf.sort op that sorts in ascending order.
    sorted_negative_peaks, _ = tf.nn.top_k(
        -accepted_peaks, k=tf.shape(accepted_peaks)[0], sorted=True)
    return -sorted_negative_peaks