aiops/RCRank/model/modules/QueryFormer/utils.py (313 lines of code) (raw):
import numpy as np
import pandas as pd
import csv
import torch
import re
def floyd_warshall_rewrite(adjacency_matrix):
(nrows, ncols) = adjacency_matrix.shape
assert nrows == ncols
M = adjacency_matrix.copy().astype('long')
for i in range(nrows):
for j in range(ncols):
if i == j:
M[i][j] = 0
elif M[i][j] == 0:
M[i][j] = 60
for k in range(nrows):
for i in range(nrows):
for j in range(nrows):
M[i][j] = min(M[i][j], M[i][k]+M[k][j])
return M
def get_job_table_sample(workload_file_name, num_materialized_samples = 1000):
tables = []
samples = []
with open(workload_file_name + ".csv", 'r') as f:
data_raw = list(list(rec) for rec in csv.reader(f, delimiter='#'))
for row in data_raw:
tables.append(row[0].split(','))
if int(row[3]) < 1:
print("Queries must have non-zero cardinalities")
exit(1)
print("Loaded queries with len ", len(tables))
num_bytes_per_bitmap = int((num_materialized_samples + 7) >> 3)
with open(workload_file_name + ".bitmaps", 'rb') as f:
for i in range(len(tables)):
four_bytes = f.read(4)
if not four_bytes:
print("Error while reading 'four_bytes'")
exit(1)
num_bitmaps_curr_query = int.from_bytes(four_bytes, byteorder='little')
bitmaps = np.empty((num_bitmaps_curr_query, num_bytes_per_bitmap * 8), dtype=np.uint8)
for j in range(num_bitmaps_curr_query):
# Read bitmap
bitmap_bytes = f.read(num_bytes_per_bitmap)
if not bitmap_bytes:
print("Error while reading 'bitmap_bytes'")
exit(1)
bitmaps[j] = np.unpackbits(np.frombuffer(bitmap_bytes, dtype=np.uint8))
samples.append(bitmaps)
print("Loaded bitmaps")
table_sample = []
for ts, ss in zip(tables,samples):
d = {}
for t, s in zip(ts,ss):
tf = t.split(' ')[0]
d[tf] = s
table_sample.append(d)
return table_sample
def get_hist_file(hist_path, bin_number = 50):
hist_file = pd.read_csv(hist_path)
for i in range(len(hist_file)):
freq = hist_file['freq'][i]
freq_np = np.frombuffer(bytes.fromhex(freq), dtype=np.float)
hist_file['freq'][i] = freq_np
table_column = []
for i in range(len(hist_file)):
table = hist_file['table'][i]
col = hist_file['column'][i]
table_alias = ''.join([tok[0] for tok in table.split('_')])
if table == 'movie_info_idx': table_alias = 'mi_idx'
combine = '.'.join([table_alias,col])
table_column.append(combine)
hist_file['table_column'] = table_column
for rid in range(len(hist_file)):
hist_file['bins'][rid] = \
[int(i) for i in hist_file['bins'][rid][1:-1].split(' ') if len(i)>0]
if bin_number != 50:
hist_file = re_bin(hist_file, bin_number)
return hist_file
def re_bin(hist_file, target_number):
for i in range(len(hist_file)):
freq = hist_file['freq'][i]
bins = freq2bin(freq,target_number)
hist_file['bins'][i] = bins
return hist_file
def freq2bin(freqs, target_number):
freq = freqs.copy()
maxi = len(freq)-1
step = 1. / target_number
mini = 0
while freq[mini+1]==0:
mini+=1
pointer = mini+1
cur_sum = 0
res_pos = [mini]
residue = 0
while pointer < maxi+1:
cur_sum += freq[pointer]
freq[pointer] = 0
if cur_sum >= step:
cur_sum -= step
res_pos.append(pointer)
else:
pointer += 1
if len(res_pos)==target_number: res_pos.append(maxi)
return res_pos
class Batch():
def __init__(self, attn_bias, rel_pos, heights, x, y=None):
super(Batch, self).__init__()
self.heights = heights
self.x, self.y = x, y
self.attn_bias = attn_bias
self.rel_pos = rel_pos
def to(self, device):
self.heights = self.heights.to(device)
self.x = self.x.to(device)
self.attn_bias, self.rel_pos = self.attn_bias.to(device), self.rel_pos.to(device)
return self
def __len__(self):
return self.in_degree.size(0)
def pad_1d_unsqueeze(x, padlen):
x = x + 1 # pad id = 0
xlen = x.size(0)
if xlen < padlen:
new_x = x.new_zeros([padlen], dtype=x.dtype)
new_x[:xlen] = x
x = new_x
return x.unsqueeze(0)
def pad_2d_unsqueeze(x, padlen):
xlen, xdim = x.size()
if xlen < padlen:
new_x = x.new_zeros([padlen, xdim], dtype=x.dtype) + 1
new_x[:xlen, :] = x
x = new_x
return x.unsqueeze(0)
def pad_rel_pos_unsqueeze(x, padlen):
x = x + 1
xlen = x.size(0)
if xlen < padlen:
new_x = x.new_zeros([padlen, padlen], dtype=x.dtype)
new_x[:xlen, :xlen] = x
x = new_x
return x.unsqueeze(0)
def pad_attn_bias_unsqueeze(x, padlen):
xlen = x.size(0)
if xlen < padlen:
new_x = x.new_zeros([padlen, padlen], dtype=x.dtype).fill_(float('-inf'))
new_x[:xlen, :xlen] = x
new_x[xlen:, :xlen] = 0
x = new_x
return x.unsqueeze(0)
def collator(small_set):
xs = [s['x'] for s in small_set]
x = torch.cat(xs)
attn_bias = torch.cat([s['attn_bias'] for s in small_set])
rel_pos = torch.cat([s['rel_pos'] for s in small_set])
heights = torch.cat([s['heights'] for s in small_set])
return Batch(attn_bias, rel_pos, heights, x)
def filterDict2Hist(hist_file, filterDict, encoding):
buckets = len(hist_file['bins'][0])
empty = np.zeros(buckets - 1)
ress = np.zeros((3, buckets-1))
for i in range(len(filterDict['colId'])):
colId = filterDict['colId'][i]
col = encoding.idx2col[colId]
if col == 'NA':
ress[i] = empty
continue
bins = hist_file.loc[hist_file['table_column']==col,'bins'].item()
opId = filterDict['opId'][0]
op = encoding.idx2op[opId]
val = filterDict['val'][0]
mini, maxi = encoding.column_min_max_vals[col]
val_unnorm = val * (maxi-mini) + mini
left = 0
right = len(bins)-1
for j in range(len(bins)):
if bins[j]<val_unnorm:
left = j
if bins[j]>val_unnorm:
right = j
break
res = np.zeros(len(bins)-1)
if op == '=':
res[left:right] = 1
elif op == '<':
res[:left] = 1
elif op == '>':
res[right:] = 1
ress[i] = res
ress = ress.flatten()
return ress
def formatJoin(json_node):
join = None
if 'Hash Cond' in json_node:
join = json_node['Hash Cond']
elif 'Join Filter' in json_node:
join = json_node['Join Filter']
elif 'Index Cond' in json_node and not json_node['Index Cond'][-2].isnumeric():
join = json_node['Index Cond']
if join is not None:
twoCol = join[1:-1].split(' = ')
twoCol = [json_node['Alias'] + '.' + col
if len(col.split('.')) == 1 and 'Alias' in json_node.keys() else col for col in twoCol ]
join = ' = '.join(sorted(twoCol))
return join
def formatFilter(plan):
alias = None
if 'Alias' in plan:
alias = plan['Alias']
else:
pl = plan
while 'parent' in pl:
pl = pl['parent']
if 'Alias' in pl:
alias = pl['Alias']
break
filters = []
if 'Filter' in plan:
filters.append(plan['Filter'])
if 'Index Cond' in plan and plan['Index Cond'][-2].isnumeric():
filters.append(plan['Index Cond'])
if 'Recheck Cond' in plan:
filters.append(plan['Recheck Cond'])
return filters, alias
class Encoding:
def __init__(self, column_min_max_vals,
col2idx, op2idx={'>':0, '=':1, '<':2, 'NA':3}):
self.col2idx = col2idx
self.op2idx = op2idx
idx2col = {}
for k,v in col2idx.items():
idx2col[v] = k
self.idx2col = idx2col
self.idx2op = {0:'>', 1:'=', 2:'<', 3:'NA'}
self.type2idx = {}
self.idx2type = {}
self.join2idx = {}
self.idx2join = {}
self.table2idx = {'NA':0}
self.idx2table = {0:'NA'}
def normalize_val(self, column, val, log=False):
mini, maxi = self.column_min_max_vals[column]
val_norm = 0.0
if maxi > mini:
val_norm = (val-mini) / (maxi-mini)
return val_norm
def encode_filters(self, filters=[], alias=None):
if len(filters) == 0:
return {'colId':[self.col2idx['NA']],
'opId': [self.op2idx['NA']]}
res = {'colId':[],'opId': []}
for filt in filters:
filt = ''.join(c for c in filt if c not in '()')
fs = re.split(' AND | OR ', filt)
for f in fs:
op = None
for k, v in self.idx2op.items():
if v in f:
op = v
if op is None:
op = 'NA'
col = f.split(' ')[0]
if alias is None:
column = col
else:
column = alias + '.' + col
if column not in self.col2idx:
self.col2idx[column] = len(self.col2idx)
self.idx2col[self.col2idx[column]] = column
res['colId'].append(self.col2idx[column])
res['opId'].append(self.op2idx[op])
return res
def encode_join(self, join):
if join not in self.join2idx:
self.join2idx[join] = len(self.join2idx)
self.idx2join[self.join2idx[join]] = join
return self.join2idx[join]
def encode_table(self, table):
if table not in self.table2idx:
self.table2idx[table] = len(self.table2idx)
self.idx2table[self.table2idx[table]] = table
return self.table2idx[table]
def encode_type(self, nodeType):
if nodeType not in self.type2idx:
self.type2idx[nodeType] = len(self.type2idx)
self.idx2type[self.type2idx[nodeType]] = nodeType
return self.type2idx[nodeType]
class TreeNode:
def __init__(self, nodeType, typeId, filt, card, join, join_str, filterDict, start_up_cost, total_cost, plan_rows, plan_width):
self.nodeType = nodeType
self.typeId = typeId
self.filter = filt
self.table = 'NA'
self.table_id = 0
self.query_id = None
self.join = join
self.join_str = join_str
self.children = []
self.rounds = 0
self.filterDict = filterDict
self.parent = None
self.feature = None
self.start_up_cost, self.total_cost, self.plan_rows, self.plan_width = start_up_cost, total_cost, plan_rows, plan_width
def addChild(self,treeNode):
self.children.append(treeNode)
def __str__(self):
return '{} with {}, {}, {} children'.format(self.nodeType, self.filter, self.join_str, len(self.children))
def __repr__(self):
return self.__str__()
@staticmethod
def print_nested(node, indent = 0):
print('--'*indent+ '{} with {} and {}, {} childs'.format(node.nodeType, node.filter, node.join_str, len(node.children)))
for k in node.children:
TreeNode.print_nested(k, indent+1)