in moonlight/util/segments.py [0:0]
def peaks(values, minval=None, invalidate_distance=0, name=None):
"""Labels peaks in values.
Args:
values: 1D tensor of a numeric type.
minval: Minimum value which is considered a peak.
invalidate_distance: Invalidates nearby potential peaks. The peaks are
searched sequentially by descending value, and from left to right for
equal values. Once a peak is found in this order, it invalidates any peaks
yet to be seen that are <= invalidate_distance away. A distance of 0
effectively produces no invalidation.
name: Optional name for the op.
Returns:
peak_centers: The (rounded) centers of each peak, which are locations where
the value is higher than the value before and after. If there is a run
of equal values at the peak, the rounded center of the run is returned.
int32 1D tensor.
"""
with tf.name_scope(name, "peaks", [values]):
values = tf.convert_to_tensor(values, name="values")
invalidate_distance = tf.convert_to_tensor(
invalidate_distance, name="invalidate_distance", dtype=tf.int32)
# Segment the values and find local maxima.
# Take the center of each run of consecutive equal values.
segment_centers, _ = _segments_1d(values, mode=SegmentsMode.CENTERS)
segment_values = tf.gather(values, segment_centers)
# If we have zero or one segments, there are no peaks. Just use zeros as the
# edge values in that case.
first_val, second_val, penultimate_val, last_val = tf.cond(
# pyformat: disable
tf.greater_equal(tf.shape(segment_values)[0], 2),
lambda: tuple(segment_values[i] for i in (0, 1, -2, -1)),
lambda: tuple(tf.constant(0, values.dtype) for i in range(4)))
# Each segment must be greater than the segment before and after it.
segment_is_peak = tf.concat(
[[first_val > second_val],
tf.greater(segment_values[1:-1],
tf.maximum(segment_values[:-2], segment_values[2:])),
[last_val > penultimate_val]],
axis=0)
if minval is not None:
# Filter the peaks by minval.
segment_is_peak = tf.logical_and(segment_is_peak,
tf.greater_equal(segment_values, minval))
# Get the center coordinates of each peak, and sort by descending value.
all_peaks = tf.boolean_mask(segment_centers, segment_is_peak)
num_peaks = tf.shape(all_peaks)[0]
peak_values = tf.boolean_mask(segment_values, segment_is_peak)
_, peak_order = tf.nn.top_k(peak_values, k=num_peaks, sorted=True)
all_peaks = tf.gather(all_peaks, peak_order)
all_peaks.set_shape([None])
# Loop over the peaks, accepting one at a time and possibly invalidating
# other ones.
def loop_condition(_, current_peaks):
return tf.shape(current_peaks)[0] > 0
def loop_body(accepted_peaks, current_peaks):
peak = current_peaks[0]
remaining_peaks = current_peaks[1:]
keep_peaks = tf.greater(
tf.abs(remaining_peaks - peak), invalidate_distance)
remaining_peaks = tf.boolean_mask(remaining_peaks, keep_peaks)
return tf.concat([accepted_peaks, [peak]], axis=0), remaining_peaks
accepted_peaks = tf.while_loop(
loop_condition,
loop_body, [tf.zeros([0], all_peaks.dtype), all_peaks],
shape_invariants=[tf.TensorShape([None]),
tf.TensorShape([None])])[0]
# Sort the peaks by index.
# TODO(ringw): Add a tf.sort op that sorts in ascending order.
sorted_negative_peaks, _ = tf.nn.top_k(
-accepted_peaks, k=tf.shape(accepted_peaks)[0], sorted=True)
return -sorted_negative_peaks