in src/ColumnEncodingUtility/analyze-schema-compression.py [0:0]
def analyze(table_info):
schema_name = table_info[0]
table_name = table_info[1]
dist_style = table_info[4]
owner = table_info[5]
if len(table_info) > 6:
table_comment = table_info[6]
# get the count of columns that have raw encoding applied
table_unoptimised = False
count_unoptimised = 0
encodings_modified = False
output = get_count_raw_columns(schema_name, table_name)
if output is None:
print("Unable to determine potential RAW column encoding for %s" % table_name)
return ERROR
else:
for row in output:
if row[0] > 0:
table_unoptimised = True
count_unoptimised += row[0]
if not table_unoptimised and not force:
comment("Table %s.%s does not require encoding optimisation" % (schema_name, table_name))
return OK
else:
comment("Table %s.%s contains %s unoptimised columns" % (schema_name, table_name, count_unoptimised))
if force:
comment("Using Force Override Option")
statement = 'analyze compression %s."%s"' % (schema_name, table_name)
if comprows is not None:
statement = statement + (" comprows %s" % int(comprows))
try:
if debug:
comment(statement)
comment("Analyzing Table '%s.%s'" % (schema_name, table_name,))
# run the analyze in a loop, because it could be locked by another process modifying rows and get a timeout
analyze_compression_result = None
analyze_retry = 10
attempt_count = 0
last_exception = None
while attempt_count < analyze_retry and analyze_compression_result is None:
try:
analyze_compression_result = execute_query(statement)
# Commiting otherwise anaylze keep an exclusive lock until a commit arrive which can be very long
execute_query('commit;')
except KeyboardInterrupt:
# To handle Ctrl-C from user
cleanup(get_pg_conn())
return TERMINATED_BY_USER
except Exception as e:
execute_query('rollback;')
print(e)
attempt_count += 1
last_exception = e
# Exponential Backoff
time.sleep(2 ** attempt_count * RETRY_TIMEOUT)
if analyze_compression_result is None:
if last_exception is not None:
print("Unable to analyze %s due to Exception %s" % (table_name, last_exception.message))
else:
print("Unknown Error")
return ERROR
if target_schema is None:
set_target_schema = schema_name
else:
set_target_schema = target_schema
if set_target_schema == schema_name:
target_table = '%s_$mig' % table_name
else:
target_table = table_name
create_table = 'begin;\nlock table %s."%s";\ncreate table %s."%s"(' % (
schema_name, table_name, set_target_schema, target_table,)
# query the table column definition
descr = get_table_desc(schema_name, table_name)
encode_columns = []
statements = []
sortkeys = {}
has_zindex_sortkeys = False
has_identity = False
non_identity_columns = []
fks = []
table_distkey = None
new_sortkey_arr = [t.strip() for t in new_sort_keys.split(',')] if new_sort_keys is not None else []
# count of suggested optimizations
count_optimized = 0
# process each item given back by the analyze request
for row in analyze_compression_result:
if debug:
comment("Analyzed Compression Row State: %s" % str(row))
col = row[1]
row_sortkey = descr[col][4]
# compare the previous encoding to the new encoding
# don't use new encoding for first sortkey
datatype = descr[col][1]
new_encoding = row[2]
new_encoding = new_encoding if not abs(row_sortkey) == 1 else 'raw'
old_encoding = descr[col][2]
old_encoding = 'raw' if old_encoding == 'none' else old_encoding
if new_encoding != old_encoding:
encodings_modified = True
count_optimized += 1
# fix datatypes from the description type to the create type
col_type = descr[col][1].replace('character varying', 'varchar').replace('without time zone', '')
# check whether columns are too wide
if analyze_col_width and ("varchar" in col_type or "int" in col_type):
new_col_type = reduce_column_length(col_type, descr[col][0], table_name)
if new_col_type != col_type:
col_type = new_col_type
encodings_modified = True
# link in the existing distribution key, or set the new one
row_distkey = descr[col][3]
if table_name is not None and new_dist_key is not None:
if col == new_dist_key:
distkey = 'DISTKEY'
dist_style = 'KEY'
table_distkey = col
else:
distkey = ''
else:
if str(row_distkey).upper()[0] == 'T':
distkey = 'DISTKEY'
dist_style = 'KEY'
table_distkey = col
else:
distkey = ''
# link in the existing sort keys, or set the new ones
if table_name is not None and len(new_sortkey_arr) > 0:
if col in new_sortkey_arr:
sortkeys[new_sortkey_arr.index(col) + 1] = col
else:
if row_sortkey != 0:
# add the absolute ordering of the sortkey to the list of all sortkeys
sortkeys[abs(row_sortkey)] = col
if row_sortkey < 0:
has_zindex_sortkeys = True
# don't compress first sort key column. This will be set on the basis of the existing sort key not
# being modified, or on the assignment of the new first sortkey
if sortkeys.get(1, None) == col:
compression = 'RAW'
else:
compression = new_encoding
# extract null/not null setting
col_null = descr[col][5]
if str(col_null).upper() == 'TRUE':
col_null = 'NOT NULL'
else:
col_null = ''
# get default or identity syntax for this column
default_or_identity = descr[col][6]
if default_or_identity:
ident_data = get_identity(default_or_identity)
if ident_data is None:
default_value = 'default %s' % default_or_identity
non_identity_columns.append('"%s"' % col)
else:
default_value = 'identity (%s, %s)' % ident_data
has_identity = True
else:
default_value = ''
non_identity_columns.append('"%s"' % col)
if debug:
comment("Column %s will be encoded as %s (previous %s)" % (
col, compression, old_encoding))
# add the formatted column specification
encode_columns.extend(['"%s" %s %s %s encode %s %s'
% (col, col_type, default_value, col_null, compression, distkey)])
# abort if a new distkey was set but we couldn't find it in the set of all columns
if new_dist_key is not None and table_distkey is None:
msg = "Column '%s' not found when setting new Table Distribution Key" % new_dist_key
comment(msg)
raise Exception(msg)
ordered_sortkey_columns = [sortkeys[index] for index in range(1, len(sortkeys) + 1)]
# abort if new sortkeys were set but we couldn't find them in the set of all columns
if new_sort_keys is not None and len(ordered_sortkey_columns) != len(new_sortkey_arr):
if debug:
comment("Requested Sort Keys: %s" % new_sortkey_arr)
comment("Resolved Sort Keys: %s" % ordered_sortkey_columns)
msg = "Column resolution of sortkeys '%s' not found when setting new Table Sort Keys" % new_sortkey_arr
comment(msg)
raise Exception(msg)
# if this table's encodings have not changed, then don't do a modification, unless force options is set
if (not force) and (not encodings_modified):
comment("Column Encoding resulted in an identical table - no changes will be made")
else:
comment("Column Encoding will be modified for %s.%s" % (schema_name, table_name))
# add all the column encoding statements on to the create table statement, suppressing the leading
# comma on the first one
for i, s in enumerate(encode_columns):
create_table += '\n%s%s' % ('' if i == 0 else ',', s)
create_table = create_table + '\n)\n'
# add diststyle all if needed
if dist_style == 'ALL':
create_table = create_table + 'diststyle all\n'
# add sort key as a table block to accommodate multiple columns
if len(sortkeys) > 0:
if debug:
comment("Adding Sortkeys: %s" % sortkeys)
sortkey = '%sSORTKEY(' % ('INTERLEAVED ' if has_zindex_sortkeys else '')
ordered_sortkey_columns = [sortkeys[index] for index in range(1, len(sortkeys) + 1)]
sortkey += ','.join(ordered_sortkey_columns) + ')\n'
create_table = create_table + (' %s ' % sortkey)
create_table = create_table + ';'
# run the create table statement
statements.extend([create_table])
# get the primary key statement
statements.extend([get_primary_key(schema_name, set_target_schema, table_name, target_table)])
# set the table owner
statements.extend(['alter table %s."%s" owner to "%s";' % (set_target_schema, target_table, owner)])
if table_comment is not None:
statements.extend(
['comment on table %s."%s" is \'%s\';' % (set_target_schema, target_table, table_comment)])
# insert the old data into the new table
# if we have identity column(s), we can't insert data from them, so do selective insert
if has_identity:
source_columns = ', '.join(non_identity_columns)
mig_columns = '(' + source_columns + ')'
else:
source_columns = '*'
mig_columns = ''
insert = 'insert into %s."%s" %s select %s from %s."%s"' % (set_target_schema,
target_table,
mig_columns,
source_columns,
schema_name,
table_name)
if len(ordered_sortkey_columns) > 0:
insert = "%s order by \"%s\";" % (insert, ",".join(ordered_sortkey_columns).replace(',', '\",\"'))
else:
insert = "%s;" % (insert)
statements.extend([insert])
# analyze the new table
analyze = 'analyze %s."%s";' % (set_target_schema, target_table)
statements.extend([analyze])
if set_target_schema == schema_name:
# rename the old table to _$old or drop
if drop_old_data:
drop = 'drop table %s."%s" cascade;' % (set_target_schema, table_name)
else:
# the alter table statement for the current data will use the first 104 characters of the
# original table name, the current datetime as YYYYMMDD and a 10 digit random string
drop = 'alter table %s."%s" rename to "%s_%s_%s_$old";' % (
set_target_schema, table_name, table_name[0:104], datetime.date.today().strftime("%Y%m%d"),
shortuuid.ShortUUID().random(length=10))
statements.extend([drop])
# rename the migrate table to the old table name
rename = 'alter table %s."%s" rename to "%s";' % (set_target_schema, target_table, table_name)
statements.extend([rename])
# add foreign keys
fks = get_foreign_keys(schema_name, set_target_schema, table_name)
# add grants back
grants = get_grants(schema_name, table_name, db_user)
if grants is not None:
statements.extend(grants)
statements.extend(['commit;'])
if do_execute:
if not run_commands(get_pg_conn(), statements):
if not ignore_errors:
if debug:
print("Error running statements: %s" % (str(statements),))
return ERROR
# emit a cloudwatch metric for the table
if cw is not None:
dimensions = [
{'Name': 'ClusterIdentifier', 'Value': db_host.split('.')[0]},
{'Name': 'TableName', 'Value': table_name}
]
aws_utils.put_metric(cw, 'Redshift', 'ColumnEncodingModification', dimensions, None, 1, 'Count')
if debug:
comment("Emitted Cloudwatch Metric for Column Encoded table")
else:
comment("No encoding modifications run for %s.%s" % (schema_name, table_name))
except Exception as e:
print('Exception %s during analysis of %s' % (e, table_name))
print(traceback.format_exc())
return ERROR
print_statements(statements)
return (OK, fks, encodings_modified)