in src/AnalyzeVacuumUtility/lib/analyze_vacuum.py [0:0]
def run_vacuum(conn,
cluster_name,
cw,
schema_name='public',
table_name=None,
blacklisted_tables=None,
ignore_errors=False,
vacuum_parameter='FULL',
min_unsorted_pct=5,
max_unsorted_pct=50,
stats_off_pct=10,
max_table_size_mb=(700 * 1024),
min_interleaved_skew=1.4,
min_interleaved_count=0,
**kwargs):
statements = []
threshold = MAX_PERCENT - int(min_unsorted_pct) if min_unsorted_pct is not None else 5
threshold_stanza = ""
if vacuum_parameter is not None and vacuum_parameter.upper() != 'REINDEX':
threshold_stanza = " to %d percent" % threshold
if table_name is not None:
get_vacuum_statement = '''SELECT 'vacuum %s ' + "schema" + '."' + "table" + '"%s ; '
+ '/* Size : ' + CAST("size" AS VARCHAR(10)) + ' MB'
+ ', Unsorted_pct : ' + coalesce(unsorted :: varchar(10),'null')
+ ', Stats Off : ' + stats_off :: varchar(10)
+ ' */ ;' as statement,
"table" as table_name,
"schema" as schema_name
FROM svv_table_info
WHERE (unsorted > %s or stats_off > %s)
AND size < %s
AND "schema" ~ '%s'
AND "table" = '%s';
''' % (
vacuum_parameter, threshold_stanza, min_unsorted_pct, stats_off_pct, max_table_size_mb, schema_name,
table_name)
elif blacklisted_tables is not None:
comment("Extracting Candidate Tables for Vacuum...")
blacklisted_tables_array = blacklisted_tables.split(',')
get_vacuum_statement = '''SELECT 'vacuum %s ' + "schema" + '."' + "table" + '"%s ; '
+ '/* Size : ' + CAST("size" AS VARCHAR(10)) + ' MB'
+ ', Unsorted_pct : ' + coalesce(unsorted :: varchar(10),'null')
+ ', Stats Off : ' + stats_off :: varchar(10)
+ ' */ ;' as statement,
"table" as table_name,
"schema" as schema_name
FROM svv_table_info
WHERE (unsorted > %s or stats_off > %s)
AND size < %s
AND "schema" ~ '%s'
AND "table" NOT IN (%s);
''' % (
vacuum_parameter, threshold_stanza, min_unsorted_pct, stats_off_pct, max_table_size_mb, schema_name,
str(blacklisted_tables_array)[1:-1])
else:
# query for all tables in the schema ordered by size descending
comment("Extracting Candidate Tables for Vacuum...")
get_vacuum_statement = '''
SELECT 'vacuum %s ' + feedback_tbl.schema_name + '."' + feedback_tbl.table_name + '"%s; '
+ '/* Size : ' + CAST(info_tbl."size" AS VARCHAR(10)) + ' MB'
+ ', Unsorted_pct : ' + coalesce(unsorted :: varchar(10),'null')
+ ', Stats Off : ' + stats_off :: varchar(10)
+ ' */ ;' as statement,
table_name,
schema_name
FROM (SELECT schema_name,
table_name
FROM (SELECT TRIM(n.nspname) schema_name,
TRIM(c.relname) table_name,
DENSE_RANK() OVER (ORDER BY COUNT(*) DESC) AS qry_rnk,
COUNT(*)
FROM stl_alert_event_log AS l
JOIN (SELECT query,
tbl,
perm_table_name
FROM stl_scan
WHERE perm_table_name <> 'Internal Worktable'
GROUP BY query,
tbl,
perm_table_name) AS s ON s.query = l.query
JOIN pg_class c ON c.oid = s.tbl
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE l.userid > 1
AND l.event_time >= dateadd(DAY,%s,CURRENT_DATE)
AND regexp_instr(solution,'.*VACUUM.*reclaim deleted.') > 0
GROUP BY TRIM(n.nspname),
TRIM(c.relname)) anlyz_tbl
WHERE anlyz_tbl.qry_rnk <%s) feedback_tbl
JOIN svv_table_info info_tbl
ON info_tbl.schema = feedback_tbl.schema_name
AND info_tbl.table = feedback_tbl.table_name
WHERE (info_tbl.unsorted > %s OR info_tbl.stats_off > %s)
AND info_tbl.size < %s
AND TRIM(info_tbl.schema) ~ '%s'
ORDER BY info_tbl.size,
info_tbl.skew_rows
''' % (vacuum_parameter,
threshold_stanza,
goback_no_of_days,
query_rank,
min_unsorted_pct,
stats_off_pct,
max_table_size_mb,
schema_name)
if debug:
comment(get_vacuum_statement)
vacuum_statements = execute_query(conn, get_vacuum_statement)
comment("Found %s Tables requiring Vacuum and flagged by alert" % len(vacuum_statements))
for vs in vacuum_statements:
statements.append(vs[0])
statements.append("analyze %s.\"%s\"" % (vs[2], vs[1]))
if not run_commands(conn, statements, cw=cw, cluster_name=cluster_name, suppress_errors=ignore_errors):
if not ignore_errors:
if debug:
print("Error running statements: %s" % (str(statements),))
return ERROR
statements = []
if table_name is None and blacklisted_tables is None:
# query for all tables in the schema ordered by size descending
comment("Extracting Candidate Tables for Vacuum ...")
get_vacuum_statement = '''SELECT 'vacuum %s ' + "schema" + '."' + "table" + '"%s; '
+ '/* Size : ' + CAST("size" AS VARCHAR(10)) + ' MB'
+ ', Unsorted_pct : ' + coalesce(info_tbl.unsorted :: varchar(10),'N/A')
+ ' */ ;' as statement,
info_tbl."table" as table_name,
info_tbl."schema" as schema_name
FROM svv_table_info info_tbl
WHERE "schema" ~ '%s'
AND
(
--If the size of the table is less than the max_table_size_mb then , run vacuum based on condition: >min_unsorted_pct
((size < %s) AND (unsorted > %s or stats_off > %s))
OR
--If the size of the table is greater than the max_table_size_mb then , run vacuum based on condition:
-- >min_unsorted_pct AND < max_unsorted_pct
--This is to avoid big table with large unsorted_pct
((size > %s) AND (unsorted > %s AND unsorted < %s ))
)
ORDER BY "size" ASC ,skew_rows ASC;
''' % (vacuum_parameter,
threshold_stanza,
schema_name,
max_table_size_mb,
min_unsorted_pct,
stats_off_pct,
max_table_size_mb,
min_unsorted_pct,
max_unsorted_pct)
if debug:
comment(get_vacuum_statement)
vacuum_statements = execute_query(conn, get_vacuum_statement)
comment("Found %s Tables requiring Vacuum due to stale statistics" % len(vacuum_statements))
for vs in vacuum_statements:
statements.append(vs[0])
statements.append("analyze %s.\"%s\"" % (vs[2], vs[1]))
if not run_commands(conn, statements, cw=cw, cluster_name=cluster_name, suppress_errors=ignore_errors):
if not ignore_errors:
if debug:
print("Error running statements: %s" % (str(statements),))
return ERROR
statements = []
if table_name is None and blacklisted_tables is None:
# query for all tables in the schema for vacuum reindex
comment("Extracting Candidate Tables for Vacuum reindex of Interleaved Sort Keys...")
get_vacuum_statement = ''' SELECT 'vacuum REINDEX ' + schema_name + '."' + table_name + '" ; ' + '/* Rows : ' + CAST("rows" AS VARCHAR(10))
+ ', Interleaved_skew : ' + CAST("max_skew" AS VARCHAR(10))
+ ', Reindex Flag : ' + CAST(reindex_flag AS VARCHAR(10)) + ' */ ;' AS statement, table_name, schema_name
FROM (SELECT TRIM(n.nspname) schema_name, TRIM(t.relname) table_name,
MAX(v.interleaved_skew) max_skew, MAX(c.count) AS rows,
CASE
-- v.interleaved_skew can be null if the table has never been vacuumed so account for that
WHEN (max(c.max_bucket) = 0) OR (MAX(NVL(v.interleaved_skew,10)) > %s AND MAX(c.count) > %s) THEN 'Yes'
ELSE 'No'
END AS reindex_flag
FROM svv_interleaved_columns v
JOIN (SELECT tbl,col, max(compressed_val) AS max_bucket, SUM(count) AS count
FROM stv_interleaved_counts
GROUP BY tbl,col) c
ON (v.tbl = c.tbl AND v.col = c.col)
JOIN pg_class t ON t.oid = c.tbl
JOIN pg_catalog.pg_namespace n ON n.oid = t.relnamespace
GROUP BY 1, 2)
WHERE reindex_flag = 'Yes'
AND schema_name ~ '%s'
''' % (min_interleaved_skew, min_interleaved_count, schema_name)
if debug:
comment(get_vacuum_statement)
vacuum_statements = execute_query(conn, get_vacuum_statement)
comment("Found %s Tables with Interleaved Sort Keys requiring Vacuum" % len(vacuum_statements))
for vs in vacuum_statements:
statements.append(vs[0])
statements.append("analyze %s.\"%s\"" % (vs[2], vs[1]))
if not run_commands(conn, statements, cw=cw, cluster_name=cluster_name, suppress_errors=ignore_errors):
if not ignore_errors:
if debug:
print("Error running statements: %s" % (str(statements),))
return ERROR
return True