def CreateTimeSeriesSample()

in perfkitbenchmarker/linux_packages/aerospike_client.py [0:0]


def CreateTimeSeriesSample(samples: List[sample.Sample]) -> List[sample.Sample]:
  """Create time series samples from a list of per time window sample.

  Args:
    samples: A list of sample.Sample. Each sample represent throughput, latency
      collected in that time window.

  Returns:
    A list of time series samples, where each sample encodes a list of values
      for the entire run.
  """
  sample_results = {}
  for s in samples:
    ns_op = s.metric
    sample_results.setdefault(ns_op, []).append(AsbenchResult(s))

  ts_samples = []
  total_ops = []
  total_ops_value = 0.0
  for ns_op in sample_results:
    sample_results[ns_op] = sorted(
        sample_results[ns_op], key=lambda r: r.timestamp
    )
    results = sample_results[ns_op]
    rampup_end_time = min(r.timestamp for r in results) + RAMPUP_TIME_IN_MS
    ts_samples.append(
        sample.CreateTimeSeriesSample(
            [r.ops for r in results],
            [r.timestamp for r in results],
            ns_op + '_client_tps',
            'ops',
            1,
            ramp_up_ends=rampup_end_time,
            additional_metadata={},
        )
    )
    if results[0].read_min:
      # the workload does read operations
      ts_samples.extend([
          sample.CreateTimeSeriesSample(
              [r.read_min for r in results],
              [r.timestamp for r in results],
              f'Read_Min_{sample.LATENCY_TIME_SERIES}',
              'us',
              1,
              ramp_up_ends=rampup_end_time,
              additional_metadata={},
          ),
          sample.CreateTimeSeriesSample(
              [r.read_max for r in results],
              [r.timestamp for r in results],
              f'Read_Max_{sample.LATENCY_TIME_SERIES}',
              'us',
              1,
              ramp_up_ends=rampup_end_time,
              additional_metadata={},
          ),
      ])
    total_ops_value += sum([r.ops for r in results]) / len(results)
    if results[0].write_min:
      # The workload has write operations
      ts_samples.extend([
          sample.CreateTimeSeriesSample(
              [r.write_min for r in results],
              [r.timestamp for r in results],
              f'Write_Min_{sample.LATENCY_TIME_SERIES}',
              'us',
              1,
              ramp_up_ends=rampup_end_time,
              additional_metadata={},
          ),
          sample.CreateTimeSeriesSample(
              [r.write_max for r in results],
              [r.timestamp for r in results],
              f'Write_Max_{sample.LATENCY_TIME_SERIES}',
              'us',
              1,
              ramp_up_ends=rampup_end_time,
              additional_metadata={},
          ),
      ])
  total_ops.append(sample.Sample(
      'total_ops', total_ops_value, 'ops', {}
  ))
  return ts_samples + total_ops