in otava/importer.py [0:0]
def fetch_data(self, test_conf: TestConfig, selector: DataSelector = DataSelector()) -> Series:
if not isinstance(test_conf, PostgresTestConfig):
raise ValueError("Expected PostgresTestConfig")
if selector.branch:
raise ValueError("Postgres tests don't support branching yet")
since_time = selector.since_time
until_time = selector.until_time
if since_time.timestamp() > until_time.timestamp():
raise DataImportError(
f"Invalid time range: ["
f"{format_timestamp(int(since_time.timestamp()))}, "
f"{format_timestamp(int(until_time.timestamp()))}]"
)
metrics = self.__selected_metrics(test_conf.metrics, selector.metrics)
columns, rows = self.__postgres.fetch_data(test_conf.query)
# Decide which columns to fetch into which components of the result:
try:
time_index: int = columns.index(test_conf.time_column)
attr_indexes: List[int] = [columns.index(c) for c in test_conf.attributes]
metric_names = [m.name for m in metrics.values()]
metric_columns = [m.column for m in metrics.values()]
metric_indexes: List[int] = [columns.index(c) for c in metric_columns]
except ValueError as err:
raise DataImportError(f"Column not found {err.args[0]}")
time: List[float] = []
data: Dict[str, List[float]] = {}
for n in metric_names:
data[n] = []
attributes: Dict[str, List[str]] = {}
for i in attr_indexes:
attributes[columns[i]] = []
for row in rows:
ts: datetime = row[time_index]
if since_time is not None and ts < since_time:
continue
if until_time is not None and ts >= until_time:
continue
time.append(ts.timestamp())
# Read metric values. Note we can still fail on conversion to float,
# because the user is free to override the column selection and thus
# they may select a column that contains non-numeric data:
for name, i in zip(metric_names, metric_indexes):
try:
data[name].append(float(row[i]))
except ValueError as err:
raise DataImportError(
"Could not convert value in column " + columns[i] + ": " + err.args[0]
)
# Attributes are just copied as-is, with no conversion:
for i in attr_indexes:
attributes[columns[i]].append(row[i])
# Convert metrics to series.Metrics
metrics = {m.name: Metric(m.direction, m.scale) for m in metrics.values()}
# Leave last n points:
time = time[-selector.last_n_points :]
tmp = data
data = {}
for k, v in tmp.items():
data[k] = v[-selector.last_n_points :]
tmp = attributes
attributes = {}
for k, v in tmp.items():
attributes[k] = v[-selector.last_n_points :]
return Series(
test_conf.name,
branch=None,
time=time,
metrics=metrics,
data=data,
attributes=attributes,
)