in sdlf-utils/pipeline-examples/event-dataset-dependencies/sdlf-engineering-datalakeLibrary/python/datalake_library/transforms/stage_a_transforms/light_transform_athena_ctas_sqoop.py [0:0]
def transform_object(self, bucket, body, team, dataset):
# returns table path, or table path with partition name
# example if table has no partition
# full_table_path = pre-stage/team/dataset/TABLE_NAME
# if table has partition:
# full_table_path = pre-stage/team/dataset/TABLE_NAME/dt=partitionvalue
# Requires LF: describe all tables in prestage_db
def get_table_info(database, table):
logger.info(f'DB: {database} Tbl: {table}')
glue_response = glue_client.get_table(
DatabaseName=database,
Name=table)
logger.debug('Glue get_table response: {}'.format(glue_response))
table_location = glue_response['Table']['StorageDescriptor']['Location']
table_columns = glue_response['Table']['StorageDescriptor']['Columns']
table_bucket = table_location.split('/')[2]
table_path = table_location.split(table_bucket + "/")[1]
return table_bucket, table_path, table_columns
# this method takes an s3 key with the format
# 'team/dataset/table_name/partition=XXXXXX/file_name'
# or without partition
# 'team/dataset/table_name/file_name'
# and returns table_name, partition (empty if no partition)
def table_data_from_s3_key():
full_input_file = f's3://{bucket}/{key}'
table_partitions = []
num_folders = key.count('/') # counts number of folders
database = key.split('/')[1]
table_name = key.split('/')[2] # take third folder
path_partitions = key.split(table_name + '/')[1]
table_partitions_path = ''
if num_folders > 3: # if it has partitions
table_partitions_path = path_partitions.rsplit('/', 1)[0]
for partition_num in range(3, num_folders):
partition_folder = key.split('/')[partition_num]
name = partition_folder.split('=')[0]
value = partition_folder.split('=')[1]
part_dictionary = {"name": name, "value": value}
table_partitions.append(part_dictionary)
return full_input_file, table_name, table_partitions, table_partitions_path, database
try:
# Custom Code
eight_buckets = ['fct_creditpayment',
'fct_operation',
'fct_operationconditions',
'fct_amortization',
'frt_contract',
'fct_amortizationdetail',
'fct_paymentdetail']
by_idoperation = ['fct_operation',
'fct_operationconditions',
'fct_amortization',
'frt_contract',
'fct_amortizationdetail']
by_idcreditpayment = ['fct_creditpayment',
'fct_paymentdetail']
# End Custom Code
# obtenemos el ambiente para armar el nombre de la base de datos
############################################
# INITIAL VARIABLE DEFINITION / EXTRACTION #
############################################
# GET DB PREFIX
key = body['key']
pipeline = body['pipeline']
ssmcli = boto3.client('ssm')
ssmresponse = ssmcli.get_parameter(
Name='/SDLF/Misc/pEnv'
)
db_env = ssmresponse['Parameter']['Value']
ssmresponse = ssmcli.get_parameter(
Name=f'/SDLF/ATHENA/{team}/{pipeline}/WorkgroupName'
)
workgroup = ssmresponse['Parameter']['Value']
input_file, source_table, partitions, partitions_path, database = table_data_from_s3_key()
# Determinamos el nombre de la BD, se espera que sea <nombredelaBD>_<env> ej: raw_datbase_dev
source_db = f'{team}_{database}_raw_{db_env}'
target_db = f'{team}_{database}_{db_env}'
# we assume that source and target tables have the same name
target_table = source_table
target_table_format = 'ORC'
# get the info of the target table
target_table_bucket, target_table_path, target_table_columns = get_table_info(target_db, target_table)
target_table_full_path = target_table_path + ("/" + partitions_path if partitions_path else '')
# delete previously ingested pre-stage files (reprocessing)
s3_interface.delete_objects(target_table_bucket, target_table_full_path + '/')
ctas_path = f's3://{target_table_bucket}/{target_table_full_path}'
non_partition_columns = ''
primitive_types = [
'boolean', 'byte', 'short', 'int', 'long', 'float', 'double', 'string',
'varchar', 'date', 'timestamp'
]
first_primitive_column = ''
for column in target_table_columns:
non_partition_columns += f"{column['Name']}, "
if first_primitive_column == '' and column['Type'] in primitive_types:
first_primitive_column = column['Name']
non_partition_columns = non_partition_columns.rsplit(', ', 1)[0]
# Obtain the first column to bucket by it
bucket_field = first_primitive_column
partition_filter = ''
number_of_buckets = 1
# Custom Code
if target_table in eight_buckets:
number_of_buckets = 8
if target_table in by_idoperation:
bucket_field = 'idoperation'
elif target_table in by_idcreditpayment:
bucket_field = 'idcreditpayment'
# End Custom Code
rand_suffix = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(4))
if not partitions:
# CTAS can't be used if a table has the same path
# It's easier to delete the table, but this process keeps the LF permissions granted on this table
# Change the target table path (temporary)
change_location = f"ALTER TABLE {target_db}.{source_table} SET LOCATION '{ctas_path}_{rand_suffix}'"
# Insert into (CTAS) using the original path
ctas_query = f'CREATE TABLE {target_db}.{source_table}_{rand_suffix} ' \
f' WITH ( ' \
f" format = '{target_table_format}'," \
f" external_location ='{ctas_path}', " \
f" bucketed_by = ARRAY['{bucket_field}'], " \
f' bucket_count = {number_of_buckets} ' \
f' ) ' \
f'AS ' \
f'SELECT {non_partition_columns} ' \
f'FROM {source_db}.{source_table} '
# f"WHERE \"$path\" = \'{input_file}\'"
# Delete the CTAS table definition (keeps the data)
drop_temp_table = f'DROP TABLE {target_db}.{source_table}_{rand_suffix} '
# Return the target table to it's original location
revert_location = f"ALTER TABLE {target_db}.{source_table} SET LOCATION '{ctas_path}'"
steps = [{'info': f'CHANGE STAGE TABLE LOCATION',
'sql': change_location,
'db': target_db},
{'info': f'CREATE TEMP STAGE TABLE (CTAS)',
'sql': ctas_query,
'db': target_db},
{'info': f'DROP TEMP STAGE TABLE (CTAS)',
'sql': drop_temp_table,
'db': target_db},
{'info': f'REVERT TO ORIGINAL STAGE TABLE LOCATION',
'sql': revert_location,
'db': target_db}
]
else:
for partition in partitions:
partition_filter += f'{partition["name"]}=\'{partition["value"]}\' AND'
# Remove the last AND
partition_filter = partition_filter.rsplit(' ', 1)[0]
partitions_to_add = partition_filter.replace("AND", ",")
add_partition_to_source = f'ALTER TABLE {source_db}.{source_table} ' \
f'ADD IF NOT EXISTS PARTITION( ' \
f'{partitions_to_add})'
ctas_query = f'CREATE TABLE {target_db}.{source_table}_{rand_suffix} ' \
f' WITH ( ' \
f" format = '{target_table_format}'," \
f" external_location ='{ctas_path}', " \
f" bucketed_by = ARRAY['{bucket_field}'], " \
f' bucket_count = {number_of_buckets} ' \
f' ) ' \
f'AS ' \
f'SELECT {non_partition_columns} ' \
f'FROM {source_db}.{source_table} ' \
f"WHERE {partitions_to_add}"
drop_table = f'DROP TABLE {target_db}.{source_table}_{rand_suffix}'
add_partition = f'ALTER TABLE {target_db}.{target_table} ' \
f'ADD IF NOT EXISTS PARTITION( ' \
f'{partitions_to_add})'
steps = [{'info': f'ADD PARTITION TO RAW TABLE',
'sql': add_partition_to_source,
'db': source_db},
{'info': f'CREATE STAGE TEMP TABLE',
'sql': ctas_query,
'db': target_db},
{'info': f'DROP STAGE TEMP TABLE',
'sql': drop_table,
'db': target_db},
{'info': f'ADD PARTITION TO STAGE TABLE',
'sql': add_partition,
'db': target_db}]
num_of_steps = len(steps)
job_details = {
'steps': steps,
'num_of_steps': num_of_steps,
'current_step': 0,
'jobStatus': 'STARTING_NEXT_QUERY',
'partitions': partitions,
'db_env': db_env,
'workgroup': workgroup,
'target_table_full_path': target_table_full_path,
'source_db': source_db,
'source_table': source_table,
'target_db': target_db,
'target_table': target_table
}
response = {
'processedKeysPath': target_table_path,
'jobDetails': job_details
}
return response
except Exception as exp:
exception_type, exception_value, exception_traceback = sys.exc_info()
traceback_string = traceback.format_exception(exception_type, exception_value, exception_traceback)
err_msg = json.dumps({
"errorType": exception_type.__name__,
"errorMessage": str(exception_value),
"stackTrace": traceback_string
})
logger.error(err_msg)