in perfkitbenchmarker/time_triggers/maintenance_simulation_trigger.py [0:0]
def _AggregateThroughputSample(self, s: sample.Sample) -> List[sample.Sample]:
"""Aggregate a time series sample into Live migration metrics.
Split the samples and compute mean and median and calls relevant
methods to generate an aggregated sample based on the time series sample.
Args:
s: A time series sample create using CreateTimeSeriesSample in samples.py
Returns:
A list of samples.
"""
metadata = copy.deepcopy(s.metadata)
time_series = metadata['timestamps']
values = metadata['values']
interval = metadata['interval']
# Default ramp up starts and ramp down starts if the benchmark does not
# provide it in the metadata.
ramp_up_ends = time_series[0]
ramp_down_starts = time_series[-1]
lm_ends = time_series[-1]
if sample.RAMP_DOWN_STARTS in metadata:
ramp_down_starts = metadata[sample.RAMP_DOWN_STARTS]
if sample.RAMP_UP_ENDS in metadata:
ramp_up_ends = metadata[sample.RAMP_UP_ENDS]
if self.capture_live_migration_timestamps:
# lm ends is computed from LM notification
lm_ends = self.lm_ends
lm_start = sample.ConvertDateTimeToUnixMs(self.trigger_time)
base_line_values = []
values_after_lm_starts = []
values_after_lm_ends = []
total_missing_seconds = 0
for i in range(len(values)):
time = time_series[i]
if time >= ramp_up_ends and time <= ramp_down_starts:
interval_values = []
# If more than 1 sequential value is missing from the time series.
# Distrubute the ops throughout the time series
if i > 0:
time_gap_in_seconds = (time - time_series[i - 1]) / 1000
missing_entry_count = int((time_gap_in_seconds / interval) - 1)
if missing_entry_count > 1:
total_missing_seconds += missing_entry_count * interval
interval_values.extend(
[int(values[i] / float(missing_entry_count + 1))]
* (missing_entry_count + 1)
)
else:
interval_values.append(values[i])
if time <= lm_start:
base_line_values.extend(interval_values)
else:
values_after_lm_starts.extend(interval_values)
if time > lm_ends:
values_after_lm_ends.extend(interval_values)
median = statistics.median(base_line_values)
mean = statistics.mean(base_line_values)
logging.info('LM Baseline median: %s', median)
logging.info('LM Baseline mean: %s', mean)
# Keep the metadata from the original sample except time series metadata
for field in sample.TIME_SERIES_METADATA:
if field in metadata:
del metadata[field]
samples = self._ComputeLossPercentile(
mean, values_after_lm_starts, metadata
) + self._ComputeLossWork(
median, values_after_lm_starts, interval, metadata
)
if values_after_lm_ends:
mean_after_lm_ends = statistics.mean(values_after_lm_ends)
samples += self._ComputeDegradation(mean, mean_after_lm_ends, metadata)
logging.info('Mean after LM ends: %s', mean_after_lm_ends)
logging.info(
'Number of samples after LM ends: %s', len(values_after_lm_ends)
)
# Seconds that are missing i.e without throughput.
samples.append(
sample.Sample(
'total_missing_seconds',
total_missing_seconds,
's',
metadata=metadata,
)
)
return samples