tools/bigquery_mdr/crr_init.sql (312 lines of code) (raw):

CREATE OR REPLACE PROCEDURE `<project_id>.DR_ADMIN.crr_init`(IN var_project_id STRING, IN var_execution_region_id STRING, OUT p_error_load_batch_id STRING, OUT p_out_status STRING) BEGIN --Copyright 2024 Google. This software is provided as-is, --without warranty or representation for any use or purpose. --Your use of it is subject to your agreement with Google. --Purpose: This Procedure initiates the creation of the replica for each of the CRR datasets. This Procedure will also initiates the deletion of the replica which is no longer part of the CRR Solution DECLARE var_primary_region STRING; DECLARE var_secondary_region STRING; DECLARE var_promotion_flag BOOL; DECLARE var_crr_error_bucket_name STRING; DECLARE var_max_concurrent_backfills INT64; DECLARE var_ref_backfill_size_in_gb INT64; DECLARE var_error_count INT64; DECLARE var_config_project_id STRING; DECLARE sum_physical_bytes FLOAT64; DECLARE temp_physical_bytes FLOAT64; DECLARE total_retry_count INT64; DECLARE retry_count INT64; CALL `<project_id>.DR_ADMIN.crr_associate`(var_project_id, var_execution_region_id, p_error_load_batch_id, p_out_status); --Create temp table to capture the error details CREATE OR REPLACE TEMP TABLE dr_error_details as (select * from DR_ADMIN.crr_error_details where 1=2); --Generate batch id to capture any runtime error SET p_error_load_batch_id = (select GENERATE_UUID() ); --Validation of the input arguments IF var_project_id is null or var_execution_region_id is null THEN RAISE USING MESSAGE = 'One or more of the input argument is not fully populated'; END IF; -- Get the current region information from the config table SET (var_primary_region,var_secondary_region,var_promotion_flag,var_crr_error_bucket_name,var_ref_backfill_size_in_gb) = (SELECT AS STRUCT lower(primary_region),lower(secondary_region),dr_promotion_flag,crr_error_bucket_name,ref_backfill_size_in_gb FROM DR_ADMIN.dr_region_config ); -- Validation of the region config table entry IF var_primary_region is null or var_secondary_region is null or var_promotion_flag is null or var_crr_error_bucket_name is null or var_ref_backfill_size_in_gb is null THEN RAISE USING MESSAGE = 'Region Config Table is not fully populated'; END IF; --Discard the exeuction, if the execution region is different from the primary region IF lower(var_primary_region) <> lower(var_execution_region_id) THEN RAISE USING MESSAGE = 'Region Mismatch. This execution will be discarded'; END IF; SET var_config_project_id = (select project_id from DR_ADMIN.replica_inscope_projects where lower(project_id) = lower(var_project_id)); -- Validate whether the project is in scope IF var_config_project_id is null THEN RAISE USING MESSAGE = 'The Project is not in scope'; END IF; --Trigger the replication process if the execution region is same as the primary region IF lower(var_primary_region)=lower(var_execution_region_id) THEN --Retrieve the dataset metadata for the current project and region EXECUTE IMMEDIATE """ CREATE OR REPLACE TEMP TABLE schemata_options as SELECT schema_name as dataset_name, option_name,option_value FROM `"""||var_project_id||".region-"||var_primary_region||"""`.INFORMATION_SCHEMA.SCHEMATA_OPTIONS """; --Retrieve the replication metadata for the current project and region EXECUTE IMMEDIATE """ CREATE OR REPLACE TEMP TABLE schemata_replicas as SELECT schema_name,replica_primary_assigned,creation_complete FROM `"""||var_project_id||".region-"||var_primary_region||"""`.INFORMATION_SCHEMA.SCHEMATA_REPLICAS """ ; --Retrieve the KMS key entry for the current project EXECUTE IMMEDIATE "CREATE OR REPLACE TEMP TABLE replica_kms_key_config as SELECT dataset_name,"||UPPER(REPLACE(var_secondary_region,'-','_'))||"_KMS_key_path as kms_key FROM DR_ADMIN.replica_kms_key_config WHERE project_id = '"||var_project_id||"'"; --Get the list of all the datasets with the dr flag enabled CREATE OR REPLACE TEMP TABLE schemata_options_labels as SELECT dataset_name , SPLIT(REPLACE(REPLACE(REGEXP_EXTRACT(option_value, r'(dr",.*?)\)'),"\"",""),")",""))[1] AS dr_flag , SAFE_CAST(SPLIT(REPLACE(REGEXP_EXTRACT(option_value, r'(dr_backfill_priority.*\d)'),"\"",""))[1] AS INT64) AS dr_priority , SPLIT(REPLACE(REPLACE(REGEXP_EXTRACT(option_value, r'(dr_functional_group.*?)\)'),"\"",""),")",""))[1] AS dr_functional_group , SPLIT(REPLACE(REPLACE(REGEXP_EXTRACT(option_value, r'(metadata_created_by.*?)\)'),"\"",""),")",""))[1] AS metadata_created_by , SPLIT(REPLACE(REPLACE(REGEXP_EXTRACT(option_value, r'(metadata_comment.*?)\)'),"\"",""),")",""))[1] AS metadata_comment FROM schemata_options WHERE option_name = 'labels' AND option_value like '%dr%' ; --Board all the dr flag enabled dataset to the crr process CREATE OR REPLACE TEMP TABLE dataset_boarding as SELECT dataset_name, coalesce(dr_priority,999999) as priority, coalesce(dr_functional_group,'null') as functional_group,CASE WHEN dr_flag like '%true%' THEN true else false end as active_flag,metadata_created_by,CURRENT_TIMESTAMP as metadata_created_time, metadata_comment FROM schemata_options_labels; --Retrieve the current replication status of the datasets CREATE OR REPLACE TEMP TABLE dataset_current_status as SELECT repl1.schema_name as dataset_name ,CASE WHEN rep2.schema_name is null THEN false else true end as active_flag FROM (SELECT schema_name FROM schemata_replicas WHERE replica_primary_assigned=true and creation_complete=true ) repl1 LEFT JOIN (SELECT schema_name FROM schemata_replicas WHERE replica_primary_assigned=false and creation_complete=true ) rep2 on repl1.schema_name = rep2.schema_name; --Retrieve only the datasets which needs to be replicated and dropped CREATE OR REPLACE TEMP TABLE dr_load_config as SELECT var_project_id as project_id, dr_bd.dataset_name,dr_bd.functional_group,dr_bd.priority,kms_config.kms_key as replica_kms_key,dr_bd.active_flag, dr_bd.metadata_created_by, dr_bd.metadata_created_time,dr_bd.metadata_comment FROM ( SELECT dr_load.dataset_name,dr_load.functional_group,dr_load.priority,dr_load.active_flag, dr_load.metadata_created_by, dr_load.metadata_created_time,dr_load.metadata_comment FROM (SELECT dr_bd.* , curr.dataset_name as curr_dataset_name FROM dataset_boarding dr_bd LEFT JOIN dataset_current_status curr ON dr_bd.dataset_name=curr.dataset_name and dr_bd.active_flag=curr.active_flag) dr_load WHERE curr_dataset_name is null ) dr_bd LEFT JOIN ( SELECT dataset_name, kms_key FROM replica_kms_key_config ) kms_config on dr_bd.dataset_name = kms_config.dataset_name; --Getting the dataset physical bytes in GB's EXECUTE IMMEDIATE "create or replace temp table temp1_table_storage as select project_id,table_schema,SUM(TOTAL_PHYSICAL_BYTES)/1024/1024 as total_physical_bytes from `"||var_project_id||".region-"||var_primary_region||"`.INFORMATION_SCHEMA.TABLE_STORAGE group by project_id,table_schema" ; --Creating the active dataset(unreplicated) list with the storage info create or replace temp table active_load_config as SELECT a.project_id,a.dataset_name,a.replica_kms_key,a.priority,coalesce(total_physical_bytes,0) as total_physical_bytes ,row_number() over (partition by 1 order by a.priority,a.dataset_name) row_num FROM dr_load_config a left join temp1_table_storage b ON a.project_id = b.project_id and a.dataset_name= b.table_schema WHERE active_flag=true; SET retry_count =1; SET total_retry_count = (select count(1) from active_load_config ); SET sum_physical_bytes = 0.0; --Batching Datasets till the total dataset size is var_ref_backfill_size_in_gb WHILE retry_count <= total_retry_count and sum_physical_bytes < var_ref_backfill_size_in_gb DO begin SET temp_physical_bytes = (select total_physical_bytes from active_load_config where row_num = retry_count ); SET sum_physical_bytes = sum_physical_bytes + temp_physical_bytes; SET retry_count = retry_count + 1; end ; end while; SET var_max_concurrent_backfills = retry_count-1 ; --Gathering all the unreplicated dataset based on the batch size calculated FOR active in ( SELECT project_id,dataset_name,replica_kms_key FROM active_load_config WHERE row_num <= var_max_concurrent_backfills order by row_num ) do --Send each dataset to the create replication process CALL DR_ADMIN.crr_replicator(active.project_id,active.dataset_name,var_secondary_region,true,active.replica_kms_key,p_error_load_batch_id,p_out_status); SELECT active.dataset_name; END FOR; --Retrieve the datasets with the dr flag disabled FOR inactive in ( --Retrieving the dataset only if the replica is exist SELECT project_id,dr_bd.dataset_name from (SELECT project_id,dataset_name,priority FROM dr_load_config WHERE active_flag=false ) dr_bd inner join (SELECT schema_name as dataset_name FROM schemata_replicas WHERE replica_primary_assigned=false and creation_complete=true ) curr_status ON dr_bd.dataset_name = curr_status.dataset_name order by dr_bd.priority,dr_bd.dataset_name ) do --Send each dataset to the drop replciation process CALL DR_ADMIN.crr_replicator(inactive.project_id,inactive.dataset_name,var_secondary_region,false,null,p_error_load_batch_id,p_out_status); END FOR; END IF; SET var_error_count = (SELECT count(*) as err_count from dr_error_details); IF var_error_count > 0 THEN EXPORT DATA OPTIONS ( uri = CONCAT('gs://',var_crr_error_bucket_name,'/',var_project_id,'/replication/',p_error_load_batch_id,'-error-*.csv'), format = 'CSV', overwrite = true, header = true, field_delimiter = ';') AS ( SELECT * from dr_error_details ); SET p_out_status = "FAILED:Execution Error"; END IF; EXCEPTION WHEN ERROR THEN BEGIN DECLARE last_job_id STRING; DECLARE var_crr_error_bucket_name STRING; SET last_job_id = @@last_job_id; SET var_crr_error_bucket_name = (SELECT crr_error_bucket_name FROM DR_ADMIN.dr_region_config ); EXPORT DATA OPTIONS ( uri = CONCAT('gs://',var_crr_error_bucket_name,'/',var_project_id,'/replication/',p_error_load_batch_id,'-error-*.csv'), format = 'CSV', overwrite = true, header = true, field_delimiter = ';') AS ( SELECT p_error_load_batch_id as batch_id,var_project_id as project_id,var_execution_region_id as execution_region_id, NULL as dataset_name,last_job_id as bq_job_id,'REPLICATION' as crr_job_type,'DR_ADMIN.crr_init' as error_location,REPLACE(@@error.statement_text,'\n','') as error_statement,REPLACE(@@error.message,'\n','') as error_message,CURRENT_TIMESTAMP as metadata_created_time,NULL as metadata_comment ); SET p_out_status = "FAILED:Execution Error"; END; END; CREATE OR REPLACE PROCEDURE `<project_id>.DR_ADMIN.crr_replicator`(IN var_project_id STRING, IN var_dataset_name STRING, IN var_secondary_region STRING, IN var_active_flag BOOL, IN var_kms_key STRING, IN p_error_load_batch_id STRING, OUT p_out_status STRING) BEGIN --Copyright 2024 Google. This software is provided as-is, --without warranty or representation for any use or purpose. --Your use of it is subject to your agreement with Google. --Purpose: This Procedure creates the replica for each of the CRR datasets. This Procedure will also drop the replica which is no longer part of the CRR Solution DECLARE sql_statmnt STRING; --Validation of the input arguments IF var_project_id is null or var_dataset_name is null or var_secondary_region is null or var_active_flag is null or p_error_load_batch_id is null THEN RAISE USING MESSAGE = 'One or more of the input argument is not fully populated'; END IF; --Verify that the dataset is in active state for crr process IF var_active_flag = true THEN IF var_kms_key IS NOT NULL THEN --Create Replica with the KMS Key, if the KMS key is present SET sql_statmnt="ALTER SCHEMA `"||var_project_id||"`."||var_dataset_name||" ADD REPLICA IF NOT EXISTS `"||var_secondary_region||"` OPTIONS(location='"||var_secondary_region||"',replica_kms_key='"||var_kms_key||"');"; ELSE --Create Replcia without the KMS Key, if the KMS key is absent SET sql_statmnt="ALTER SCHEMA `"||var_project_id||"`."||var_dataset_name||" ADD REPLICA IF NOT EXISTS `"||var_secondary_region||"` OPTIONS(location='"||var_secondary_region||"');"; END IF; ELSE --Drop the Replica if the dataset is not part of the crr process SET sql_statmnt="ALTER SCHEMA `"||var_project_id||"`."||var_dataset_name||" DROP REPLICA IF EXISTS `"||var_secondary_region||"`;"; END IF; SELECT sql_statmnt; EXECUTE IMMEDIATE sql_statmnt; EXCEPTION WHEN ERROR THEN BEGIN DECLARE last_job_id STRING; SET last_job_id = @@last_job_id; IF p_error_load_batch_id IS NULL THEN SET p_error_load_batch_id= (select GENERATE_UUID() ); END IF; INSERT INTO dr_error_details (batch_id,project_id,execution_region_id,dataset_name,bq_job_id,crr_job_type,error_location,error_statement,error_message,metadata_created_time,metadata_comment ) SELECT p_error_load_batch_id as batch_id,var_project_id as project_id,var_secondary_region as execution_region_id, var_dataset_name as dataset_name,last_job_id as bq_job_id,'REPLICATION' as crr_job_type,'DR_ADMIN.crr_replicator' as error_location,REPLACE(@@error.statement_text,'\n','') as error_statement,REPLACE(@@error.message,'\n','') as error_message,CURRENT_TIMESTAMP as metadata_created_time,NULL as metadata_comment; SET p_out_status = "FAILED:Execution Error"; END; END; CREATE OR REPLACE PROCEDURE `DR_ADMIN.crr_associate`(IN var_project_id STRING, IN var_execution_region_id STRING, OUT p_error_load_batch_id STRING, OUT p_out_status STRING) BEGIN --Copyright 2024 Google. This software is provided as-is, --without warranty or representation for any use or purpose. --Your use of it is subject to your agreement with Google. --Purpose: This Procedure initiates the creation of the replica for each of the CRR datasets. This Procedure will also initiates the deletion of the replica which is no longer part of the CRR Solution /* DECLARE var_project_id STRING DEFAULT '<project_id>'; DECLARE var_execution_region_id STRING DEFAULT 'us-central1'; DECLARE p_error_load_batch_id STRING DEFAULT NULL; DECLARE p_out_status STRING DEFAULT NULL; */ DECLARE var_primary_region STRING; DECLARE var_secondary_region STRING; DECLARE var_max_concurrent_backfills INT64 ; DECLARE var_promotion_flag BOOL; DECLARE var_crr_error_bucket_name STRING; DECLARE var_config_project_id STRING; DECLARE var_reservation_name STRING; DECLARE var_error_count INT64; --Create temp table to capture the error details CREATE OR REPLACE TEMP TABLE dr_error_details as (select * from DR_ADMIN.crr_error_details where 1=2); --Generate batch id to capture any runtime error SET p_error_load_batch_id = (select GENERATE_UUID() ); --Validation of the input arguments IF var_project_id is null or var_execution_region_id is null THEN RAISE USING MESSAGE = 'One or more of the input argument is not fully populated'; END IF; -- Get the current region information FROM the config table SET (var_primary_region,var_secondary_region,var_max_concurrent_backfills,var_promotion_flag,var_crr_error_bucket_name) = (SELECT AS STRUCT lower(primary_region),lower(secondary_region), max_concurrent_backfills,dr_promotion_flag,crr_error_bucket_name FROM DR_ADMIN.dr_region_config ); -- Validation of the region config table entry IF var_primary_region is null or var_secondary_region is null or var_max_concurrent_backfills is null or var_promotion_flag is null or var_crr_error_bucket_name is null THEN RAISE USING MESSAGE = 'Region Config Table is not fully populated'; END IF; --Discard the exeuction, if the execution region is different from the primary region IF lower(var_primary_region) <> lower(var_execution_region_id) THEN RAISE USING MESSAGE = 'Region Mismatch. This execution will be discarded'; END IF; SET (var_config_project_id,var_reservation_name) = (SELECT AS STRUCT project_id,concat(failover_reserrvation_project,".",failover_reservation_name) as reservation_name from DR_ADMIN.replica_inscope_projects where lower(project_id) = lower(var_project_id)); -- Validate whether the project is in scope IF var_config_project_id is null or var_reservation_name is null THEN RAISE USING MESSAGE = 'Replica Inscope Projects Table is not fully populated'; END IF; --Trigger the replication process if the execution region is same as the primary region IF lower(var_primary_region)=lower(var_execution_region_id) THEN --Retrieve the dataset metadata for the current project and region EXECUTE IMMEDIATE """ CREATE OR REPLACE TEMP TABLE schemata_options as SELECT schema_name as dataset_name, option_name,option_value FROM `"""||var_project_id||".region-"||var_primary_region||"""`.INFORMATION_SCHEMA.SCHEMATA_OPTIONS """; --Retrieve the replication metadata for the current project and region EXECUTE IMMEDIATE """ CREATE OR REPLACE TEMP TABLE schemata_replicas as SELECT schema_name,failover_reservation_project_id,creation_complete,location FROM `"""||var_project_id||".region-"||var_primary_region||"""`.INFORMATION_SCHEMA.SCHEMATA_REPLICAS """ ; --Get the list of all the datasets with the dr flag enabled CREATE OR REPLACE TEMP TABLE schemata_options_labels as SELECT dataset_name , SPLIT(REPLACE(REPLACE(REGEXP_EXTRACT(option_value, r'(dr",.*?)\)'),"\"",""),")",""))[1] AS dr_flag , SAFE_CAST(SPLIT(REPLACE(REGEXP_EXTRACT(option_value, r'(dr_backfill_priority.*\d)'),"\"",""))[1] AS INT64) AS dr_priority , SPLIT(REPLACE(REPLACE(REGEXP_EXTRACT(option_value, r'(dr_functional_group.*?)\)'),"\"",""),")",""))[1] AS dr_functional_group , SPLIT(REPLACE(REPLACE(REGEXP_EXTRACT(option_value, r'(metadata_created_by.*?)\)'),"\"",""),")",""))[1] AS metadata_created_by , SPLIT(REPLACE(REPLACE(REGEXP_EXTRACT(option_value, r'(metadata_comment.*?)\)'),"\"",""),")",""))[1] AS metadata_comment FROM schemata_options WHERE option_name = 'labels' AND option_value like '%dr%' ; --Board all the dr flag enabled dataset to the crr process CREATE OR REPLACE TEMP TABLE dataset_boarding as SELECT dataset_name, coalesce(dr_priority,999999) as priority, coalesce(dr_functional_group,'null') as functional_group,CASE WHEN dr_flag like '%true%' THEN true else false end as active_flag,metadata_created_by,CURRENT_TIMESTAMP as metadata_created_time, metadata_comment FROM schemata_options_labels; --Retrieve the current replication status of the datasets EXECUTE IMMEDIATE """ CREATE OR REPLACE TEMP TABLE reservation_disassociation as select schema_name as dataset_name from schemata_replicas where creation_complete=true and location in ('"""||var_primary_region||"""','"""||var_secondary_region||"""') and failover_reservation_project_id is null group by schema_name having count(*) >1 """ ; EXECUTE IMMEDIATE """ CREATE OR REPLACE TEMP TABLE reservation_association as select schema_name as dataset_name from schemata_replicas where creation_complete=true and location in ('"""||var_primary_region||"""','"""||var_secondary_region||"""') and failover_reservation_project_id is not null group by schema_name having count(*) >1 """ ; --Retrieve only the datasets which needs to be associated CREATE OR REPLACE TEMP TABLE dataset_associate_list as select rds.dataset_name from reservation_disassociation rds inner join dataset_boarding dbr on rds.dataset_name =dbr.dataset_name and dbr.active_flag = true ; CREATE OR REPLACE TEMP TABLE dataset_disassociate_list as select ras.dataset_name from reservation_association ras inner join dataset_boarding dbr on ras.dataset_name =dbr.dataset_name and dbr.active_flag = false ; --Associate the dataset from the association list FOR active in ( select distinct dataset_name from dataset_associate_list ) DO --Send each dataset to the association process CALL DR_ADMIN.dataset_associator(var_project_id, active.dataset_name,true, var_reservation_name,var_primary_region, p_error_load_batch_id, p_out_status); END FOR; --Dissociate the dataset from the disassociation list FOR inactive in ( select distinct dataset_name from dataset_disassociate_list ) DO --Send each dataset to the association process CALL DR_ADMIN.dataset_associator(var_project_id, inactive.dataset_name,false, var_reservation_name,var_primary_region, p_error_load_batch_id, p_out_status); END FOR; END IF; SET var_error_count = (SELECT count(*) as err_count from dr_error_details); IF var_error_count > 0 THEN EXPORT DATA OPTIONS ( uri = CONCAT('gs://',var_crr_error_bucket_name,'/',var_project_id,'/association/',p_error_load_batch_id,'-error-*.csv'), format = 'CSV', overwrite = true, header = true, field_delimiter = ';') AS ( SELECT * from dr_error_details ); SET p_out_status = "FAILED:Execution Error"; END IF; EXCEPTION WHEN ERROR THEN BEGIN DECLARE last_job_id STRING; DECLARE var_crr_error_bucket_name STRING; SET last_job_id = @@last_job_id; SET var_crr_error_bucket_name = (SELECT crr_error_bucket_name FROM DR_ADMIN.dr_region_config ); EXPORT DATA OPTIONS ( uri = CONCAT('gs://',var_crr_error_bucket_name,'/',var_project_id,'/association/',p_error_load_batch_id,'-error-*.csv'), format = 'CSV', overwrite = true, header = true, field_delimiter = ';') AS ( SELECT p_error_load_batch_id as batch_id,var_project_id as project_id,var_execution_region_id as execution_region_id, NULL as dataset_name,last_job_id as bq_job_id,'ASSOCIATION' as crr_job_type,'DR_ADMIN.crr_associate' as error_location,REPLACE(@@error.statement_text,'\n','') as error_statement,REPLACE(@@error.message,'\n','') as error_message,CURRENT_TIMESTAMP as metadata_created_time,NULL as metadata_comment ); SET p_out_status = "FAILED:Execution Error"; END; END; CREATE OR REPLACE PROCEDURE `DR_ADMIN.dataset_associator`(IN var_project_id STRING, IN var_dataset_name STRING, IN var_active_flag BOOL, IN var_reservation_name STRING, IN var_primary_region STRING, IN p_error_load_batch_id STRING, OUT p_out_status STRING) BEGIN --Copyright 2024 Google. This software is provided as-is, --without warranty or representation for any use or purpose. --Your use of it is subject to your agreement with Google. --Purpose: This Procedure creates the replica for each of the CRR datasets. This Procedure will also drop the replica which is no longer part of the CRR Solution DECLARE sql_statmnt STRING; --Validation of the input arguments IF var_project_id is null or var_dataset_name is null or var_active_flag is null or var_reservation_name is null or p_error_load_batch_id is null THEN RAISE USING MESSAGE = 'One or more of the input argument is not fully populated'; END IF; --Verify that the dataset is in active state for crr process IF var_active_flag = true THEN SET sql_statmnt="ALTER SCHEMA `"||var_project_id||"`."||var_dataset_name||" SET OPTIONS(failover_reservation='"||var_reservation_name||"')"; ELSE SET sql_statmnt="ALTER SCHEMA `"||var_project_id||"`."||var_dataset_name||" SET OPTIONS(failover_reservation=null)"; END IF; SELECT sql_statmnt; EXECUTE IMMEDIATE sql_statmnt; EXCEPTION WHEN ERROR THEN BEGIN DECLARE last_job_id STRING; SET last_job_id = @@last_job_id; IF p_error_load_batch_id IS NULL THEN SET p_error_load_batch_id= (select GENERATE_UUID() ); END IF; INSERT INTO dr_error_details (batch_id,project_id,execution_region_id,dataset_name,bq_job_id,crr_job_type,error_location,error_statement,error_message,metadata_created_time,metadata_comment ) SELECT p_error_load_batch_id as batch_id,var_project_id as project_id,var_primary_region as execution_region_id, var_dataset_name as dataset_name,last_job_id as bq_job_id,'ASSOCIATION' as crr_job_type,'DR_ADMIN.dataset_associator' as error_location,REPLACE(@@error.statement_text,'\n','') as error_statement,REPLACE(@@error.message,'\n','') as error_message,CURRENT_TIMESTAMP as metadata_created_time,NULL as metadata_comment; SET p_out_status = "FAILED:Execution Error"; END; END;