in python/plugins/debug/sql_collector.py [0:0]
def dump_status_delta(self, zf: zipfile.ZipFile, prefix: str):
def formatters_for_table(table_name: str,
pk: str) -> Tuple[List[str], List[Callable]]:
columns = [
r[0] for r in self.session.run_sql(
"select column_name from information_schema.columns where table_schema='sys' and table_name=? order by ordinal_position",
[table_name]).fetch_all()
]
fcolumns = []
def float_or_none(s):
if s is None:
return s
return float(s)
for column in columns:
lcolumn = column.lower()
if "," + lcolumn + "," in "," + pk + ",":
fcolumns.append(lambda s, e: f"{s[column]}")
elif table_name == "io_global_by_file_by_bytes" and lcolumn == "write_pct":
def fmt(s, e):
e_read = float(e["total_read"] or 0)
s_read = float_or_none(s["total_read"])
e_written = float(e["total_written"] or 0)
s_written = float_or_none(s["total_written"])
if s_read is None or s_written is None or (
e_read - s_read) + (e_written -
s_written) == 0:
return None
return f"{100 - ((e_read-s_read) / ((e_read - s_read) + (e_written - s_written))) * 100:.2f}"
fcolumns.append(fmt)
elif (table_name, lcolumn) in [
("io_global_by_file_by_bytes", "total"),
("io_global_by_wait_by_bytes", "total_requested")
]:
def fmt(s, e):
e = float_or_none(e[column])
s = float(s[column] or 0)
if e is None:
return None
return format_bytes(e - s)
fcolumns.append(fmt)
elif lcolumn[:4] in ('max_',
'min_') and lcolumn.endswith('_latency'):
fcolumns.append(lambda s, e: (format_pico_time(
float(e[column])) if e[column] is not None else None))
elif lcolumn == 'avg_latency':
def fmt(s, e):
e_lat = float(e["total_latency"] or 0)
s_lat = float_or_none(s["total_latency"])
e_total = float(e["total"] or 0)
s_total = float_or_none(s["total"])
if s_lat is None or s_total is None or e_total - s_total == 0:
return None
return format_pico_time(
(e_lat - s_lat) / (e_total - s_total))
fcolumns.append(fmt)
elif lcolumn.endswith('_avg_latency'):
prefix = column[:-12]
def fmt(s, e):
e_lat = float(e[f"{prefix}_latency"] or 0)
s_lat = float_or_none(s[f"{prefix}_latency"])
e_total = float(e[f"{prefix}s"] or 0)
s_total = float_or_none(s[f"{prefix}s"])
if s_lat is None or s_total is None or e_total - s_total == 0:
return None
return format_pico_time(
(e_lat - s_lat) / (e_total - s_total))
fcolumns.append(fmt)
elif column.endswith('latency'):
fcolumns.append(lambda s, e: (format_pico_time(
(float(e[column]) - float(s[column] or 0))
if e[column] is not None else None)))
elif column in ('avg_read', 'avg_write', 'avg_written'):
suffix = "read" if lcolumn == "avg_read" else "written"
suffix2 = "read" if lcolumn == "avg_read" else "write"
def fmt(s, e):
e_total = float(s[f"total_{suffix}"] or 0)
s_total = float_or_none(s[f"total_{suffix}"])
e_count = float(e[f"count_{suffix2}"] or 0)
s_count = float_or_none(s[f"count_{suffix2}"])
if s_total == None or s_count is None:
return None
n = e_total - s_total
d = e_count - s_count
if d == 0:
return None
return format_bytes(n / d)
fcolumns.append(fmt)
elif lcolumn.endswith("_memory") or lcolumn.endswith(
'_memory_allocated'
) or (lcolumn.endswith("_read") or lcolumn.endswith('_written')
or lcolumn.endswith("_write")
) and not column.startswith("count_"):
def fmt(s, e):
e_ = float_or_none(e[column])
s_ = float(s[column] or 0)
if e_ is None:
return None
return format_bytes(e_ - s_)
fcolumns.append(fmt)
else:
def fmt(s, e):
s_ = float(s[column] or 0)
e_ = float_or_none(e[column])
if e_ is None:
return None
return f"{e_ - s_}"
fcolumns.append(fmt)
return columns, fcolumns
for table_name, *_, pk in k_sys_views_delta:
columns, formatters = formatters_for_table(table_name, pk)
lines = []
lines.append("# " + "\t".join(columns))
for start_row, end_row in zip(self.status_start[table_name],
self.status_end[table_name]):
entry = []
for i, fmt in enumerate(formatters):
try:
entry.append(fmt(start_row, end_row))
except:
print(columns[i], start_row, end_row)
raise
lines.append("\t".join([s or "-" for s in entry]))
write_tsv(zf, f"{prefix}-delta.{table_name}", lines)