def fetch_data()

in otava/importer.py [0:0]


    def fetch_data(self, test_conf: TestConfig, selector: DataSelector = DataSelector()) -> Series:
        if not isinstance(test_conf, PostgresTestConfig):
            raise ValueError("Expected PostgresTestConfig")

        if selector.branch:
            raise ValueError("Postgres tests don't support branching yet")

        since_time = selector.since_time
        until_time = selector.until_time
        if since_time.timestamp() > until_time.timestamp():
            raise DataImportError(
                f"Invalid time range: ["
                f"{format_timestamp(int(since_time.timestamp()))}, "
                f"{format_timestamp(int(until_time.timestamp()))}]"
            )
        metrics = self.__selected_metrics(test_conf.metrics, selector.metrics)

        columns, rows = self.__postgres.fetch_data(test_conf.query)

        # Decide which columns to fetch into which components of the result:
        try:
            time_index: int = columns.index(test_conf.time_column)
            attr_indexes: List[int] = [columns.index(c) for c in test_conf.attributes]
            metric_names = [m.name for m in metrics.values()]
            metric_columns = [m.column for m in metrics.values()]
            metric_indexes: List[int] = [columns.index(c) for c in metric_columns]
        except ValueError as err:
            raise DataImportError(f"Column not found {err.args[0]}")

        time: List[float] = []
        data: Dict[str, List[float]] = {}
        for n in metric_names:
            data[n] = []
        attributes: Dict[str, List[str]] = {}
        for i in attr_indexes:
            attributes[columns[i]] = []

        for row in rows:
            ts: datetime = row[time_index]
            if since_time is not None and ts < since_time:
                continue
            if until_time is not None and ts >= until_time:
                continue
            time.append(ts.timestamp())

            # Read metric values. Note we can still fail on conversion to float,
            # because the user is free to override the column selection and thus
            # they may select a column that contains non-numeric data:
            for name, i in zip(metric_names, metric_indexes):
                try:
                    data[name].append(float(row[i]))
                except ValueError as err:
                    raise DataImportError(
                        "Could not convert value in column " + columns[i] + ": " + err.args[0]
                    )

            # Attributes are just copied as-is, with no conversion:
            for i in attr_indexes:
                attributes[columns[i]].append(row[i])

        # Convert metrics to series.Metrics
        metrics = {m.name: Metric(m.direction, m.scale) for m in metrics.values()}

        # Leave last n points:
        time = time[-selector.last_n_points :]
        tmp = data
        data = {}
        for k, v in tmp.items():
            data[k] = v[-selector.last_n_points :]
        tmp = attributes
        attributes = {}
        for k, v in tmp.items():
            attributes[k] = v[-selector.last_n_points :]

        return Series(
            test_conf.name,
            branch=None,
            time=time,
            metrics=metrics,
            data=data,
            attributes=attributes,
        )