in otava/importer.py [0:0]
def fetch_data(self, test_conf: TestConfig, selector: DataSelector = DataSelector()) -> Series:
if not isinstance(test_conf, CsvTestConfig):
raise ValueError("Expected CsvTestConfig")
if selector.branch:
raise ValueError("CSV tests don't support branching yet")
since_time = selector.since_time
until_time = selector.until_time
file = Path(test_conf.file)
if since_time.timestamp() > until_time.timestamp():
raise DataImportError(
f"Invalid time range: ["
f"{format_timestamp(int(since_time.timestamp()))}, "
f"{format_timestamp(int(until_time.timestamp()))}]"
)
try:
with open(file, newline="") as csv_file:
reader = csv.reader(
csv_file,
delimiter=test_conf.csv_options.delimiter,
quotechar=test_conf.csv_options.quote_char,
)
headers: List[str] = next(reader, None)
metrics = self.__selected_metrics(test_conf.metrics, selector.metrics)
# Decide which columns to fetch into which components of the result:
try:
time_index: int = headers.index(test_conf.time_column)
attr_indexes: List[int] = [headers.index(c) for c in test_conf.attributes]
metric_names = [m.name for m in metrics.values()]
metric_columns = [m.column for m in metrics.values()]
metric_indexes: List[int] = [headers.index(c) for c in metric_columns]
except ValueError as err:
raise DataImportError(f"Column not found {err.args[0]}")
if time_index in attr_indexes:
attr_indexes.remove(time_index)
if time_index in metric_indexes:
metric_indexes.remove(time_index)
# Initialize empty lists to store the data and metadata:
time: List[int] = []
data: Dict[str, List[float]] = {}
for n in metric_names:
data[n] = []
attributes: Dict[str, List[str]] = {}
for i in attr_indexes:
attributes[headers[i]] = []
# Append the lists with data from each row:
for row in reader:
self.check_row_len(headers, row)
# Filter by time:
ts: datetime = self.__convert_time(row[time_index])
if since_time is not None and ts < since_time:
continue
if until_time is not None and ts >= until_time:
continue
time.append(int(ts.timestamp()))
# Read metric values. Note we can still fail on conversion to float,
# because the user is free to override the column selection and thus
# they may select a column that contains non-numeric data:
for name, i in zip(metric_names, metric_indexes):
try:
data[name].append(float(row[i]))
except ValueError as err:
raise DataImportError(
"Could not convert value in column "
+ headers[i]
+ ": "
+ err.args[0]
)
# Attributes are just copied as-is, with no conversion:
for i in attr_indexes:
attributes[headers[i]].append(row[i])
# Convert metrics to series.Metrics
metrics = {m.name: Metric(m.direction, m.scale) for m in metrics.values()}
# Leave last n points:
time = time[-selector.last_n_points :]
tmp = data
data = {}
for k, v in tmp.items():
data[k] = v[-selector.last_n_points :]
tmp = attributes
attributes = {}
for k, v in tmp.items():
attributes[k] = v[-selector.last_n_points :]
return Series(
test_conf.name,
branch=None,
time=time,
metrics=metrics,
data=data,
attributes=attributes,
)
except FileNotFoundError:
raise DataImportError(f"Input file not found: {file}")