in src/translation/dags/dml_validation_dag.py [0:0]
def _dry_run(ti, **kwargs):
logging.info("Logging info for function _dry_run")
if kwargs["dag_run"].conf is not None and "config" in kwargs["dag_run"].conf:
logging.info("Configuration file is not empty")
files = kwargs["dag_run"].conf["files"]
if len(files) == 0:
logging.error(
"SQL File list is empty and \
there are no SQL files to execute SQL query"
)
else:
config = kwargs["dag_run"].conf["config"]
source = config["source"]
if source == "teradata":
mode = config["migrationTask"]["translationConfigDetails"][
"sourceDialect"
]["teradataDialect"]["mode"]
else:
mode = "sql"
unique_id = config[CUSTOM_RUN_ID_KEY]
gcsSourcePath = config["migrationTask"]["translationConfigDetails"][
"gcsSourcePath"
]
gcsTargetPath = config["migrationTask"]["translationConfigDetails"][
"gcsTargetPath"
]
skip_comment_expr = ["""--"""]
remove_expr_bteq = """BEGIN"""
skip_expr_bteq = ["""EXCEPTION WHEN ERROR""", """END"""]
total_files_count = len(files)
failed_query_count = 0
successful_query_count = 0
queryString = ""
aggregated_results = []
for filename in files:
f = os.path.join(DIRECTORY, filename)
logging.info(f)
content = open(f, "r").read()
statements = content.split(";")
statements.pop()
# Remove last item from list
# (i.e. Last item will new line for ; last character)
for stmt in statements:
stmt = stmt.strip()
# Default uncommentedStmtPresent is true means that line/statement doesn't have comments on starting
uncommentedStmtPresent = True
# Check if line/statement have comments on starting
if bool(
re.match(
r"(?=(" + "|".join(skip_comment_expr) + r"))", stmt, re.I
)
):
# Set uncommentedStmtPresent to false as starting characters in line has comments
uncommentedStmtPresent = False
for lineStmt in stmt.splitlines():
# Check if all the lines has comments, if not update uncommentedStmtPresent flag value to True and break the loop
if (
bool(
re.match(
r"(?=(" + "|".join(skip_comment_expr) + r"))",
lineStmt,
re.I,
)
)
is False
):
uncommentedStmtPresent = True
logging.info(
"In commented line separated by ; there is uncommented SQL statement which needs to process"
)
break
if mode == "SQL" and not uncommentedStmtPresent:
logging.info(
"SQL mode, skip the statement \
and goto next statement"
)
continue
elif mode == "BTEQ":
logging.info("BTEQ mode")
if (
bool(
re.match(
r"(?=(" + "|".join(skip_expr_bteq) + r"))",
stmt,
re.I,
)
)
and not uncommentedStmtPresent
):
logging.info(
"BTEQ mode, skip the statement \
and goto next statement"
)
continue
if bool(re.match(remove_expr_bteq, stmt, re.I)):
logging.info(
"Statement start with begin block \
hence remove BEGIN word"
)
stmt = re.split(remove_expr_bteq, stmt, flags=re.I)[1]
queryStr = ""
exec_time = datetime.datetime.now()
response_json = bq_client.query(
stmt, job_config=job_config
)._properties
# A dry run query completes immediately.
logging.debug(f"This query response json - {response_json}")
if response_json["status"]["state"] == "DONE":
successful_query_count += 1
statement_type = response_json["statistics"]["query"][
"statementType"
]
bq_table_name = response_json["configuration"]["query"][
"destinationTable"
]["tableId"]
logging.info("Nothing went wrong in _dry_run")
queryStr = f"INSERT INTO `{PROJECT_ID}.{BQ_RESULT_DATASET_NAME}.{BQ_RESULT_TABLE_NAME}`(unique_id,file_name,status,error_details,execution_start_time,gcs_input_path,gcs_output_path,bq_table_name,statement_type) VALUES ('{unique_id}','{filename}','success','','{exec_time}','{gcsSourcePath}','{gcsTargetPath}','{bq_table_name}','{statement_type}' )"
else:
failed_query_count += 1
logging.info("Dry run fail")
error_details = response_json["error"]["message"]
queryStr = f"INSERT INTO `{PROJECT_ID}.{BQ_RESULT_DATASET_NAME}.{BQ_RESULT_TABLE_NAME}`(unique_id,file_name,status,error_details,execution_start_time,gcs_input_path,gcs_output_path,bq_table_name,statement_type) VALUES ('{unique_id}','{filename}','fail','{error_details}','{exec_time}','{gcsSourcePath}','{gcsTargetPath}','', '')"
queryString = queryString + queryStr + ";"
aggregated_results.append(
{
"unique_id": unique_id,
"total_files": total_files_count,
"total_queries": successful_query_count + failed_query_count,
"successful_queries": successful_query_count,
"failed_queries": failed_query_count,
}
)
results = {
"unique_id": unique_id,
"aggregated_results": aggregated_results,
"queryString": queryString,
}
ti.xcom_push(key="dry_run_results", value=results)
ti.xcom_push(key="config", value=kwargs["dag_run"].conf["config"])
ti.xcom_push(
key="validation_mode",
value=config["validation_config"].get("validation_mode"),
)
else:
logging.error("Configuration file is empty")