def fetch_data()

in otava/importer.py [0:0]


    def fetch_data(self, test_conf: TestConfig, selector: DataSelector = DataSelector()) -> Series:

        if not isinstance(test_conf, CsvTestConfig):
            raise ValueError("Expected CsvTestConfig")

        if selector.branch:
            raise ValueError("CSV tests don't support branching yet")

        since_time = selector.since_time
        until_time = selector.until_time
        file = Path(test_conf.file)

        if since_time.timestamp() > until_time.timestamp():
            raise DataImportError(
                f"Invalid time range: ["
                f"{format_timestamp(int(since_time.timestamp()))}, "
                f"{format_timestamp(int(until_time.timestamp()))}]"
            )

        try:
            with open(file, newline="") as csv_file:
                reader = csv.reader(
                    csv_file,
                    delimiter=test_conf.csv_options.delimiter,
                    quotechar=test_conf.csv_options.quote_char,
                )

                headers: List[str] = next(reader, None)
                metrics = self.__selected_metrics(test_conf.metrics, selector.metrics)

                # Decide which columns to fetch into which components of the result:
                try:
                    time_index: int = headers.index(test_conf.time_column)
                    attr_indexes: List[int] = [headers.index(c) for c in test_conf.attributes]
                    metric_names = [m.name for m in metrics.values()]
                    metric_columns = [m.column for m in metrics.values()]
                    metric_indexes: List[int] = [headers.index(c) for c in metric_columns]
                except ValueError as err:
                    raise DataImportError(f"Column not found {err.args[0]}")

                if time_index in attr_indexes:
                    attr_indexes.remove(time_index)
                if time_index in metric_indexes:
                    metric_indexes.remove(time_index)

                # Initialize empty lists to store the data and metadata:
                time: List[int] = []
                data: Dict[str, List[float]] = {}
                for n in metric_names:
                    data[n] = []
                attributes: Dict[str, List[str]] = {}
                for i in attr_indexes:
                    attributes[headers[i]] = []

                # Append the lists with data from each row:
                for row in reader:
                    self.check_row_len(headers, row)

                    # Filter by time:
                    ts: datetime = self.__convert_time(row[time_index])
                    if since_time is not None and ts < since_time:
                        continue
                    if until_time is not None and ts >= until_time:
                        continue
                    time.append(int(ts.timestamp()))

                    # Read metric values. Note we can still fail on conversion to float,
                    # because the user is free to override the column selection and thus
                    # they may select a column that contains non-numeric data:
                    for name, i in zip(metric_names, metric_indexes):
                        try:
                            data[name].append(float(row[i]))
                        except ValueError as err:
                            raise DataImportError(
                                "Could not convert value in column "
                                + headers[i]
                                + ": "
                                + err.args[0]
                            )

                    # Attributes are just copied as-is, with no conversion:
                    for i in attr_indexes:
                        attributes[headers[i]].append(row[i])

                # Convert metrics to series.Metrics
                metrics = {m.name: Metric(m.direction, m.scale) for m in metrics.values()}

                # Leave last n points:
                time = time[-selector.last_n_points :]
                tmp = data
                data = {}
                for k, v in tmp.items():
                    data[k] = v[-selector.last_n_points :]
                tmp = attributes
                attributes = {}
                for k, v in tmp.items():
                    attributes[k] = v[-selector.last_n_points :]

                return Series(
                    test_conf.name,
                    branch=None,
                    time=time,
                    metrics=metrics,
                    data=data,
                    attributes=attributes,
                )

        except FileNotFoundError:
            raise DataImportError(f"Input file not found: {file}")