in configurations/RedshiftConfigTestingLambda.py [0:0]
def handler(event, context):
print(event)
action = event['Input'].get('action')
user_config = get_json_config_from_s3(os.environ['USER_CONFIG_JSON_S3_PATH'])
system_config = get_json_config_from_s3(os.environ['SYSTEM_CONFIG_JSON_S3_PATH'])
cluster_identifier_prefix = os.environ['CLUSTER_IDENTIFIER_PREFIX']
what_if_timestamp = event['Input'].get('what_if_timestamp')
cluster_identifier = event['Input'].get('cluster_identifier')
sql_id = event['Input'].get('sql_id')
job_id = event['Input'].get('job_id')
redshift_cluster_configuration = event['Input'].get('redshift_cluster_configuration')
redshift_cluster_index = event['Input'].get('redshift_cluster_index')
try:
client = boto3.client('redshift')
if action == "initiate":
what_if_timestamp = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime(time.time()))
res = {'status': what_if_timestamp}
elif action == "run_extract":
res = {
'job_id': run_extract(
what_if_timestamp=what_if_timestamp,
simple_replay_log_location=user_config.get('SIMPLE_REPLAY_LOG_LOCATION'),
simple_replay_extract_start_time=user_config.get('SIMPLE_REPLAY_EXTRACT_START_TIME'),
simple_replay_extract_end_time=user_config.get('SIMPLE_REPLAY_EXTRACT_END_TIME'),
simple_replay_extract_overwrite_s3_path=user_config.get('SIMPLE_REPLAY_EXTRACT_OVERWRITE_S3_PATH'),
bucket_name=system_config.get('S3_BUCKET_NAME'),
redshift_user_name=system_config.get('MASTER_USER_NAME'),
extract_prefix=system_config.get('EXTRACT_PREFIX'),
script_prefix=system_config.get('SCRIPT_PREFIX'),
extract_bootstrap_script=system_config.get('EXTRACT_BOOTSTRAP_SCRIPT'),
job_definition=system_config.get('JOB_DEFINITION'),
job_queue=system_config.get('JOB_QUEUE')
)}
elif action == "batch_job_status":
res = {'status': batch_job_status(job_id=job_id)}
elif action == "get_redshift_configurations":
res = {'status': user_config.get('CONFIGURATIONS')}
elif action == "get_cluster_identifier":
res = {'status': get_cluster_identifier(client, user_config, redshift_cluster_configuration,
cluster_identifier_prefix)}
elif action == "cluster_status":
res = {'status': cluster_status(client, cluster_identifier)}
elif action == "create_parameter_group":
res = {'status': create_parameter_group(client, cluster_identifier)}
elif action == "update_parameter_group":
if user_config.get('PARAMETER_GROUP_CONFIG_S3_PATH') is None or user_config.get(
'PARAMETER_GROUP_CONFIG_S3_PATH') == "N/A":
parameter_group = system_config.get('PARAMETER_GROUP_CONFIG')
else:
parameter_group = user_config.get('PARAMETER_GROUP_CONFIG_S3_PATH')
res = {'status': update_parameter_group(client, cluster_identifier, parameter_group)}
elif action == "create_cluster":
res = {
'status': create_cluster(client,
cluster_identifier,
user_config.get('SNAPSHOT_ID'),
system_config.get('REDSHIFT_IAM_ROLE'),
cluster_identifier,
system_config.get('SUBNET_GROUP'),
system_config.get('SECURITY_GROUP_ID'),
user_config.get('SNAPSHOT_ACCOUNT_ID'),
redshift_cluster_configuration.get('NODE_TYPE'),
redshift_cluster_configuration.get('NUMBER_OF_NODES'),
master_user_name=system_config.get('MASTER_USER_NAME'),
database_name=system_config.get('DATABASE_NAME'),
secrets_manager_arn=system_config.get('SECRETS_MANAGER_ARN'),
port=int(system_config.get('PORT')),
publicly_accessible=(system_config.get('PUBLICLY_ACCESSIBLE')=="true")
)}
elif action == "classic_resize_cluster":
res = {'status': classic_resize_cluster(client, cluster_identifier,
redshift_cluster_configuration.get('NODE_TYPE'),
redshift_cluster_configuration.get('NUMBER_OF_NODES'))}
elif action == "resume_cluster":
client.resume_cluster(ClusterIdentifier=cluster_identifier)
res = {'status': 'initiated'}
elif action == "pause_cluster":
res = {
'status': pause_cluster(client=client,
cluster_identifier=cluster_identifier,
redshift_cluster_index=redshift_cluster_index,
auto_pause=user_config.get('AUTO_PAUSE'))}
elif action == "update_wlm_config":
res = {'status': update_wlm_config(client, cluster_identifier,
redshift_cluster_configuration.get('WLM_CONFIG_S3_PATH'))}
## Added to check for clusters in pending reboot after wlm change ##
elif action == "check_pending_reboot_status":
res = {'status': check_pending_reboot_status(client, cluster_identifier) }
elif action == "run_ddl_and_copy_script":
res = {
'sql_id': run_sql_script_from_s3(script_s3_path=user_config.get('DDL_AND_COPY_SCRIPT_S3_PATH'),
action=action,
cluster_identifier=cluster_identifier,
redshift_iam_role=system_config.get('REDSHIFT_IAM_ROLE'),
bucket_name=system_config.get('S3_BUCKET_NAME'),
db=system_config.get('DATABASE_NAME'),
user=system_config.get('MASTER_USER_NAME'))}
elif action == "run_redshift_performance_test":
res = {
'job_id': run_redshift_performance_test(
client=client,
cluster_identifier=cluster_identifier,
bucket_name=system_config.get('S3_BUCKET_NAME'),
performance_test_bootstrap_script=system_config.get('PERFORMANCE_TEST_BOOTSTRAP_SCRIPT'),
performance_test_python_script=system_config.get('PERFORMANCE_TEST_PYTHON_SCRIPT'),
sql_script_s3_path=user_config.get('SQL_SCRIPT_S3_PATH'),
number_of_parallel_sessions_list=user_config.get('NUMBER_OF_PARALLEL_SESSIONS_LIST'),
job_definition=system_config.get('JOB_DEFINITION'),
job_queue=system_config.get('JOB_QUEUE'),
redshift_iam_role=system_config.get('REDSHIFT_IAM_ROLE'),
redshift_user_name=system_config.get('MASTER_USER_NAME'),
db=system_config.get('DATABASE_NAME'),
disable_result_cache=system_config.get('DISABLE_RESULT_CACHE'),
default_output_limit=system_config.get('DEFAULT_OUTPUT_LIMIT'),
max_number_of_queries=system_config.get('MAX_NUMBER_OF_QUERIES'),
max_parallel_sessions=system_config.get('MAX_PARALLEL_SESSIONS'),
query_label_prefix=system_config.get('QUERY_LABEL_PREFIX')
)}
elif action == "run_replay":
res = {
'job_id': run_replay(
client=client,
what_if_timestamp=what_if_timestamp,
cluster_identifier=cluster_identifier,
extract_s3_path='s3://' + system_config.get('S3_BUCKET_NAME') + '/' + system_config.get(
'EXTRACT_PREFIX') + '/' + what_if_timestamp + '/',
simple_replay_overwrite_s3_path=user_config.get('SIMPLE_REPLAY_OVERWRITE_S3_PATH'),
simple_replay_log_location=user_config.get('SIMPLE_REPLAY_LOG_LOCATION'),
bucket_name=system_config.get('S3_BUCKET_NAME'),
redshift_user_name=system_config.get('MASTER_USER_NAME'),
redshift_iam_role=system_config.get('REDSHIFT_IAM_ROLE'),
db=system_config.get('DATABASE_NAME'),
extract_prefix=system_config.get('EXTRACT_PREFIX'),
replay_prefix=system_config.get('REPLAY_PREFIX'),
script_prefix=system_config.get('SCRIPT_PREFIX'),
snapshot_account_id=user_config.get('SNAPSHOT_ACCOUNT_ID'),
replay_bootstrap_script=system_config.get('REPLAY_BOOTSTRAP_SCRIPT'),
job_definition=system_config.get('JOB_DEFINITION'),
job_queue=system_config.get('JOB_QUEUE')
)}
elif action == "gather_comparison_stats":
res = {'sql_id': gather_comparison_stats(script_s3_path=system_config.get('GATHER_COMPARISON_STATS_SCRIPT'),
action=action,
cluster_identifier=cluster_identifier,
redshift_iam_role=system_config.get('REDSHIFT_IAM_ROLE'),
bucket_name=system_config.get('S3_BUCKET_NAME'),
db=system_config.get('DATABASE_NAME'),
user=system_config.get('MASTER_USER_NAME'),
run_type='sync',
what_if_timestamp=what_if_timestamp,
comparison_stats_s3_path=system_config.get(
'COMPARISON_STATS_S3_PATH'),
external_schema_script=system_config.get('EXTERNAL_SCHEMA_SCRIPT'),
query_label_prefix=system_config.get('QUERY_LABEL_PREFIX'),
node_type=redshift_cluster_configuration.get('NODE_TYPE'),
number_of_nodes=redshift_cluster_configuration.get('NUMBER_OF_NODES'),
region=system_config.get('REGION'),
cluster_config_s3_path=system_config.get('CLUSTER_CONFIG_S3_PATH'))
}
elif action == "populate_comparison_results":
res = {
'sql_id': populate_comparison_results(
script_s3_path=system_config.get('POPULATE_COMPARISON_RESULTS_SCRIPT'),
action=action,
cluster_identifier=cluster_identifier,
redshift_iam_role=system_config.get('REDSHIFT_IAM_ROLE'),
bucket_name=system_config.get('S3_BUCKET_NAME'),
db=system_config.get('DATABASE_NAME'),
user=system_config.get('MASTER_USER_NAME'),
what_if_timestamp=what_if_timestamp,
raw_comparison_results_s3_path=system_config.get('RAW_COMPARISON_RESULTS_S3_PATH'),
comparison_results_s3_path=system_config.get('COMPARISON_RESULTS_S3_PATH'))
}
elif action == "sql_status":
res = {'status': sql_status(sql_id)}
elif action == "run_glue_crawler":
res = {'status': run_glue_crawler(system_config.get('CRAWLER_NAME'))}
else:
raise ValueError("Invalid Task: " + action)
except Exception as e:
print(e)
print(traceback.format_exc())
raise
print(res)
return res