def run_vacuum()

in src/AnalyzeVacuumUtility/lib/analyze_vacuum.py [0:0]


def run_vacuum(conn,
               cluster_name,
               cw,
               schema_name='public',
               table_name=None,
               blacklisted_tables=None,
               ignore_errors=False,
               vacuum_parameter='FULL',
               min_unsorted_pct=5,
               max_unsorted_pct=50,
               stats_off_pct=10,
               max_table_size_mb=(700 * 1024),
               min_interleaved_skew=1.4,
               min_interleaved_count=0,
               **kwargs):
    statements = []

    threshold = MAX_PERCENT - int(min_unsorted_pct) if min_unsorted_pct is not None else 5
    threshold_stanza = ""
    if vacuum_parameter is not None and vacuum_parameter.upper() != 'REINDEX':
        threshold_stanza = " to %d percent" % threshold

    if table_name is not None:
        get_vacuum_statement = '''SELECT 'vacuum %s ' + "schema" + '."' + "table" + '"%s ; '
                                         + '/* Size : ' + CAST("size" AS VARCHAR(10)) + ' MB'
                                         + ', Unsorted_pct : ' + coalesce(unsorted :: varchar(10),'null') 
                                         + ', Stats Off : ' + stats_off :: varchar(10)
                                         + ' */ ;' as statement,
                                         "table" as table_name,
                                         "schema" as schema_name
                                  FROM svv_table_info
                                  WHERE (unsorted > %s or stats_off > %s)
                                    AND   size < %s
                                    AND  "schema" ~ '%s'
                                    AND  "table" = '%s';
                                        ''' % (
            vacuum_parameter, threshold_stanza, min_unsorted_pct, stats_off_pct, max_table_size_mb, schema_name,
            table_name)

    elif blacklisted_tables is not None:
        comment("Extracting Candidate Tables for Vacuum...")
        blacklisted_tables_array = blacklisted_tables.split(',')
        get_vacuum_statement = '''SELECT 'vacuum %s ' + "schema" + '."' + "table" + '"%s ; '
                                         + '/* Size : ' + CAST("size" AS VARCHAR(10)) + ' MB'
                                         + ', Unsorted_pct : ' + coalesce(unsorted :: varchar(10),'null')
                                         + ', Stats Off : ' + stats_off :: varchar(10)
                                         + ' */ ;' as statement,
                                         "table" as table_name,
                                         "schema" as schema_name
                                  FROM svv_table_info
                                  WHERE (unsorted > %s or stats_off > %s)
                                    AND   size < %s
                                    AND  "schema" ~ '%s'
                                    AND  "table" NOT IN (%s);
                                        ''' % (
            vacuum_parameter, threshold_stanza, min_unsorted_pct, stats_off_pct, max_table_size_mb, schema_name,
            str(blacklisted_tables_array)[1:-1])

    else:
        # query for all tables in the schema ordered by size descending
        comment("Extracting Candidate Tables for Vacuum...")

        get_vacuum_statement = '''
                SELECT 'vacuum %s ' + feedback_tbl.schema_name + '."' + feedback_tbl.table_name + '"%s; '
                       + '/* Size : ' + CAST(info_tbl."size" AS VARCHAR(10)) + ' MB' 
                       + ', Unsorted_pct : ' + coalesce(unsorted :: varchar(10),'null') 
                       + ', Stats Off : ' + stats_off :: varchar(10)
                       + ' */ ;' as statement,
                       table_name,
                       schema_name
                FROM (SELECT schema_name,
                             table_name
                      FROM (SELECT TRIM(n.nspname) schema_name,
                                   TRIM(c.relname) table_name,
                                   DENSE_RANK() OVER (ORDER BY COUNT(*) DESC) AS qry_rnk,
                                   COUNT(*)
                            FROM stl_alert_event_log AS l
                              JOIN (SELECT query,
                                           tbl,
                                           perm_table_name
                                    FROM stl_scan
                                    WHERE perm_table_name <> 'Internal Worktable'
                                    GROUP BY query,
                                             tbl,
                                             perm_table_name) AS s ON s.query = l.query
                              JOIN pg_class c ON c.oid = s.tbl
                              JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
                            WHERE l.userid > 1
                            AND   l.event_time >= dateadd(DAY,%s,CURRENT_DATE)
                            AND   regexp_instr(solution,'.*VACUUM.*reclaim deleted.') > 0
                            GROUP BY TRIM(n.nspname),
                                     TRIM(c.relname)) anlyz_tbl
                      WHERE anlyz_tbl.qry_rnk <%s) feedback_tbl
                  JOIN svv_table_info info_tbl
                    ON info_tbl.schema = feedback_tbl.schema_name
                   AND info_tbl.table = feedback_tbl.table_name
                WHERE (info_tbl.unsorted > %s OR info_tbl.stats_off > %s)
                AND   info_tbl.size < %s
                AND   TRIM(info_tbl.schema) ~ '%s'
                ORDER BY info_tbl.size,
                         info_tbl.skew_rows
                            ''' % (vacuum_parameter,
                                   threshold_stanza,
                                   goback_no_of_days,
                                   query_rank,
                                   min_unsorted_pct,
                                   stats_off_pct,
                                   max_table_size_mb,
                                   schema_name)

    if debug:
        comment(get_vacuum_statement)

    vacuum_statements = execute_query(conn, get_vacuum_statement)
    comment("Found %s Tables requiring Vacuum and flagged by alert" % len(vacuum_statements))

    for vs in vacuum_statements:
        statements.append(vs[0])
        statements.append("analyze %s.\"%s\"" % (vs[2], vs[1]))

    if not run_commands(conn, statements, cw=cw, cluster_name=cluster_name, suppress_errors=ignore_errors):
        if not ignore_errors:
            if debug:
                print("Error running statements: %s" % (str(statements),))
            return ERROR

    statements = []
    if table_name is None and blacklisted_tables is None:
        # query for all tables in the schema ordered by size descending
        comment("Extracting Candidate Tables for Vacuum ...")
        get_vacuum_statement = '''SELECT 'vacuum %s ' + "schema" + '."' + "table" + '"%s; '
                                                   + '/* Size : ' + CAST("size" AS VARCHAR(10)) + ' MB'
                                                   + ',  Unsorted_pct : ' + coalesce(info_tbl.unsorted :: varchar(10),'N/A')
                                                   + ' */ ;' as statement,
                                         info_tbl."table" as table_name,
                                         info_tbl."schema" as schema_name
                                        FROM svv_table_info info_tbl
                                        WHERE "schema" ~ '%s'
                                                AND
                                                 (
                                                --If the size of the table is less than the max_table_size_mb then , run vacuum based on condition: >min_unsorted_pct
                                                    ((size < %s) AND (unsorted > %s or stats_off > %s))
                                                    OR
                                                --If the size of the table is greater than the max_table_size_mb then , run vacuum based on condition:
                                                -- >min_unsorted_pct AND < max_unsorted_pct
                                                --This is to avoid big table with large unsorted_pct
                                                     ((size > %s) AND (unsorted > %s AND unsorted < %s ))
                                                 )
                                        ORDER BY "size" ASC ,skew_rows ASC;
                                        ''' % (vacuum_parameter,
                                               threshold_stanza,
                                               schema_name,
                                               max_table_size_mb,
                                               min_unsorted_pct,
                                               stats_off_pct,
                                               max_table_size_mb,
                                               min_unsorted_pct,
                                               max_unsorted_pct)

        if debug:
            comment(get_vacuum_statement)

        vacuum_statements = execute_query(conn, get_vacuum_statement)
        comment("Found %s Tables requiring Vacuum due to stale statistics" % len(vacuum_statements))

        for vs in vacuum_statements:
            statements.append(vs[0])
            statements.append("analyze %s.\"%s\"" % (vs[2], vs[1]))

        if not run_commands(conn, statements, cw=cw, cluster_name=cluster_name, suppress_errors=ignore_errors):
            if not ignore_errors:
                if debug:
                    print("Error running statements: %s" % (str(statements),))
                return ERROR

    statements = []
    if table_name is None and blacklisted_tables is None:
        # query for all tables in the schema for vacuum reindex
        comment("Extracting Candidate Tables for Vacuum reindex of Interleaved Sort Keys...")
        get_vacuum_statement = ''' SELECT 'vacuum REINDEX ' + schema_name + '."' + table_name + '" ; ' + '/* Rows : ' + CAST("rows" AS VARCHAR(10))
                                    + ', Interleaved_skew : ' + CAST("max_skew" AS VARCHAR(10))
                                    + ', Reindex Flag : '  + CAST(reindex_flag AS VARCHAR(10)) + ' */ ;' AS statement, table_name, schema_name
                                FROM (SELECT TRIM(n.nspname) schema_name, TRIM(t.relname) table_name,
                                                 MAX(v.interleaved_skew) max_skew, MAX(c.count) AS rows,
                                                 CASE
                                                   -- v.interleaved_skew can be null if the table has never been vacuumed so account for that
                                                   WHEN (max(c.max_bucket) = 0) OR (MAX(NVL(v.interleaved_skew,10)) > %s AND MAX(c.count) > %s) THEN 'Yes'
                                                   ELSE 'No'
                                                 END AS reindex_flag
                                            FROM svv_interleaved_columns v
                                            JOIN (SELECT tbl,col, max(compressed_val) AS max_bucket,  SUM(count) AS count
                                                  FROM stv_interleaved_counts
                                                  GROUP BY tbl,col) c
                                            ON (v.tbl = c.tbl AND v.col = c.col)
                                            JOIN pg_class t ON t.oid = c.tbl
                                            JOIN pg_catalog.pg_namespace n ON n.oid = t.relnamespace
                                            GROUP BY 1, 2)
                                WHERE reindex_flag = 'Yes'
                                    AND schema_name ~ '%s'
                                        ''' % (min_interleaved_skew, min_interleaved_count, schema_name)

        if debug:
            comment(get_vacuum_statement)

        vacuum_statements = execute_query(conn, get_vacuum_statement)
        comment("Found %s Tables with Interleaved Sort Keys requiring Vacuum" % len(vacuum_statements))

        for vs in vacuum_statements:
            statements.append(vs[0])
            statements.append("analyze %s.\"%s\"" % (vs[2], vs[1]))

        if not run_commands(conn, statements, cw=cw, cluster_name=cluster_name, suppress_errors=ignore_errors):
            if not ignore_errors:
                if debug:
                    print("Error running statements: %s" % (str(statements),))
                return ERROR

    return True