def fetch_data()

in otava/importer.py [0:0]


    def fetch_data(self, test: TestConfig, selector: DataSelector = DataSelector()) -> Series:
        """
        Loads test data from graphite.
        Converts raw timeseries data into a columnar format,
        where each metric is represented by a list of floats. All metrics
        have aligned indexes - that is values["foo"][3] applies to the
        the same time point as values["bar"][3]. The time points are extracted
        to a separate column.
        """
        if not isinstance(test, GraphiteTestConfig):
            raise ValueError("Expected GraphiteTestConfig")

        try:
            attributes = test.tags.copy()
            if selector.branch:
                attributes += [selector.branch]

            # if the user has specified since_<commit/version> and/or until_<commit/version>,
            # we need to attempt to extract a timestamp from appropriate Graphite events, and
            # update selector.since_time and selector.until_time, respectively
            since_events = self.graphite.fetch_events_with_matching_time_option(
                attributes, selector.since_commit, selector.since_version
            )
            if len(since_events) > 0:
                # since timestamps of metrics get rounded down, in order to include these, we need to
                # - round down the event's pub_time
                # - subtract a small amount of time (Graphite does not appear to include the left-hand
                # endpoint for a time range)
                rounded_time = round(
                    int(since_events[-1].pub_time.timestamp()),
                    resolution([int(since_events[-1].pub_time.timestamp())]),
                )
                selector.since_time = parse_datetime(str(rounded_time)) - timedelta(milliseconds=1)

            until_events = self.graphite.fetch_events_with_matching_time_option(
                attributes, selector.until_commit, selector.until_version
            )
            if len(until_events) > 0:
                selector.until_time = until_events[0].pub_time

            if selector.since_time.timestamp() > selector.until_time.timestamp():
                raise DataImportError(
                    f"Invalid time range: ["
                    f"{format_timestamp(int(selector.since_time.timestamp()))}, "
                    f"{format_timestamp(int(selector.until_time.timestamp()))}]"
                )

            metrics = test.metrics.values()
            if selector.metrics is not None:
                metrics = [m for m in metrics if m.name in selector.metrics]
            path_to_metric = {test.get_path(selector.branch, m.name): m for m in metrics}
            targets = [test.get_path(selector.branch, m.name) for m in metrics]

            graphite_result = self.graphite.fetch_data(targets, selector)
            if not graphite_result:
                raise DataImportError(f"No timeseries found in Graphite for test {test.name}.")

            times = [[x.time for x in series.points] for series in graphite_result]
            time: List[int] = merge_sorted(times)[-selector.last_n_points :]

            def column(series: List[DataPoint]) -> List[float]:
                value_by_time = dict([(x.time, x.value) for x in series])
                return [value_by_time.get(t) for t in time]

            # Keep order of the keys in the result values the same as order of metrics
            values = OrderedDict()
            for m in metrics:
                values[m.name] = []
            for ts in graphite_result:
                values[path_to_metric[ts.path].name] = column(ts.points)
            for m in metrics:
                if len(values[m.name]) == 0:
                    del values[m.name]
            metrics = [m for m in metrics if m.name in values.keys()]

            events = self.graphite.fetch_events(
                attributes, selector.since_time, selector.until_time
            )
            time_resolution = resolution(time)
            events_by_time = {}
            for e in events:
                events_by_time[round(int(e.pub_time.timestamp()), time_resolution)] = e

            run_ids = []
            commits = []
            versions = []
            branches = []
            for t in time:
                event = events_by_time.get(t)
                run_ids.append(event.run_id if event is not None else None)
                commits.append(event.commit if event is not None else None)
                versions.append(event.version if event is not None else None)
                branches.append(event.branch if event is not None else None)

            attributes = {
                "run": run_ids,
                "branch": branches,
                "version": versions,
                "commit": commits,
            }
            if selector.attributes is not None:
                attributes = {a: attributes[a] for a in selector.attributes}

            metrics = {m.name: Metric(m.direction, m.scale) for m in metrics}
            return Series(
                test.name,
                branch=selector.branch,
                time=time,
                metrics=metrics,
                data=values,
                attributes=attributes,
            )

        except GraphiteError as e:
            raise DataImportError(f"Failed to import test {test.name}: {e.message}")