decisionai_plugin/common/util/data.py (105 lines of code) (raw):

import pandas as pd import numpy as np from functools import reduce from .constant import VALUE, TIMESTAMP from .fill_type import Fill from .gran import Gran from .timeutil import dt_to_str, str_to_dt, get_time_offset, convert_freq def normalize(df, normalize_base=None): def max_min_scaler(x, base): maxx = np.max(x) minn = np.min(x) if base: maxx = base['max'] minn = base['min'] if maxx != minn: return (x - minn) / (maxx - minn) else: x[:] = 1 return x data = pd.DataFrame(index=df.index) for item in df.columns: if item == 'timestamp': data[item] = df[item] continue base = normalize_base[item] if normalize_base is not None and item in normalize_base else None data[item] = df[[item]].apply(lambda x: max_min_scaler(x, base)) return data def fill_missing(input_series, fill_type: Fill, fill_value): if fill_type == Fill.NotFill: return input_series if fill_type == Fill.Previous: return input_series.fillna(method='ffill', limit=len(input_series)).fillna(method='bfill', limit=len(input_series)) if fill_type == Fill.Subsequent: return input_series.fillna(method='bfill', limit=len(input_series)).fillna(method='ffill', limit=len(input_series)) if fill_type == Fill.Linear: return input_series.interpolate(method='linear', limit_direction='both', axis=0, limit=len(input_series)) if fill_type == Fill.Pad: return input_series.fillna(fill_value) return input_series.fillna(0) def generate_filled_missing_by_time_range(input_frame, start_time, end_time, gran, custom_in_seconds, fill_type: Fill, fill_value): if fill_type == Fill.NotFill: return input_frame full_data_range = pd.date_range(start=start_time, end=end_time, freq=convert_freq(gran, custom_in_seconds)) full_data_range = pd.DataFrame(full_data_range, columns=[TIMESTAMP]) full_data_range[TIMESTAMP] = full_data_range[TIMESTAMP].dt.tz_localize(None) input_frame = pd.merge(full_data_range, input_frame, how='left', on=TIMESTAMP) return fill_missing(input_frame, fill_type, fill_value) def generate_filled_missing_by_period(input_frame, end_time, gran, custom_in_seconds, periods, fill_type: Fill, fill_value): if fill_type == Fill.NotFill: return input_frame full_data_range = pd.date_range(end=end_time, freq=convert_freq(gran, custom_in_seconds), periods=periods) full_data_range = pd.DataFrame(full_data_range, columns=[TIMESTAMP]) full_data_range[TIMESTAMP] = full_data_range[TIMESTAMP].dt.tz_localize(None) input_frame = pd.merge(full_data_range, input_frame, how='left', on=TIMESTAMP) return fill_missing(input_frame, fill_type, fill_value) def generate_inner_join_frame(input_frames): return reduce(lambda left, right: pd.merge(left, right, on=TIMESTAMP, how='inner'), input_frames) def generate_outer_join_frame(input_frames, fill_type: Fill, fill_value): if fill_type == Fill.NotFill: return generate_inner_join_frame(input_frames) merged = reduce(lambda left, right: pd.merge(left, right, on=TIMESTAMP, how='outer'), input_frames) return fill_missing(merged, fill_type, fill_value) # series_data should have this format: [{"series_id":"xxx","metric_id":"xxx","dim":{xxx},"value":[{"timestamp":xxx,"value":xxx,"fieldxxx":xxx},...]] def generate_filled_missing_by_field(series_data, start, end, gran, custom_in_seconds, fill_type:Fill, fill_value, fields=None): if fields is None: fields = [] fields.insert(0, VALUE) data_panel = {} full_data_range = pd.date_range(start=start, end=get_time_offset(end, (gran, custom_in_seconds), -1), freq=convert_freq(Gran[gran], custom_in_seconds)) full_data_range = pd.DataFrame(full_data_range, columns=[TIMESTAMP]) full_data_range[TIMESTAMP] = full_data_range[TIMESTAMP].dt.tz_localize(None) # set index to improve merging performance full_data_range = full_data_range.set_index(TIMESTAMP) series_info = [] for series in series_data: if series is None: pass metric_id = series.metric_id series_id = series.series_id taglist = series.dim series_info.append([metric_id, series_id, taglist]) ts_df = pd.DataFrame([value[TIMESTAMP] for value in series.value], columns=[TIMESTAMP]) ts_df[TIMESTAMP] = ts_df[TIMESTAMP].dt.tz_localize(None) ts_df.set_index(TIMESTAMP, inplace=True) for field in fields: ts_df[series_id] = [value[field] for value in series.value] ts_df = pd.merge(full_data_range, ts_df, how='left', on=TIMESTAMP) if field not in data_panel: data_panel[field] = ts_df else: data_panel[field][series_id] = ts_df[series_id] timestamps = [] cols_len = len(data_panel[VALUE].columns.values.tolist()) for index, row in data_panel[VALUE].iterrows(): not_missing = False for i in range(cols_len): if pd.notna(row[i]): not_missing = True break timestamps.append([index, not_missing]) for field in fields: if field == VALUE: data_panel[field] = fill_missing(data_panel[field], fill_type, fill_value) else: data_panel[field] = data_panel[field].fillna(0) data_panel[field].reset_index(drop=False, inplace=True) return data_panel, series_info, timestamps