def _dry_run()

in src/translation/dags/dml_validation_dag.py [0:0]


def _dry_run(ti, **kwargs):
    logging.info("Logging info for function _dry_run")

    if kwargs["dag_run"].conf is not None and "config" in kwargs["dag_run"].conf:
        logging.info("Configuration file is not empty")
        files = kwargs["dag_run"].conf["files"]

        if len(files) == 0:
            logging.error(
                "SQL File list is empty and \
                    there are no SQL files to execute SQL query"
            )
        else:
            config = kwargs["dag_run"].conf["config"]
            source = config["source"]
            if source == "teradata":
                mode = config["migrationTask"]["translationConfigDetails"][
                    "sourceDialect"
                ]["teradataDialect"]["mode"]
            else:
                mode = "sql"

            unique_id = config[CUSTOM_RUN_ID_KEY]
            gcsSourcePath = config["migrationTask"]["translationConfigDetails"][
                "gcsSourcePath"
            ]
            gcsTargetPath = config["migrationTask"]["translationConfigDetails"][
                "gcsTargetPath"
            ]
            skip_comment_expr = ["""--"""]
            remove_expr_bteq = """BEGIN"""
            skip_expr_bteq = ["""EXCEPTION WHEN ERROR""", """END"""]

            total_files_count = len(files)

            failed_query_count = 0
            successful_query_count = 0

            queryString = ""
            aggregated_results = []

            for filename in files:
                f = os.path.join(DIRECTORY, filename)
                logging.info(f)

                content = open(f, "r").read()
                statements = content.split(";")
                statements.pop()
                # Remove last item from list
                # (i.e. Last item will new line for ; last character)

                for stmt in statements:
                    stmt = stmt.strip()

                    # Default uncommentedStmtPresent is true means that line/statement doesn't have comments on starting
                    uncommentedStmtPresent = True
                    # Check if line/statement have comments on starting
                    if bool(
                        re.match(
                            r"(?=(" + "|".join(skip_comment_expr) + r"))", stmt, re.I
                        )
                    ):
                        # Set uncommentedStmtPresent to false as starting characters in line has comments
                        uncommentedStmtPresent = False
                        for lineStmt in stmt.splitlines():
                            # Check if all the lines has comments, if not update uncommentedStmtPresent flag value to True and break the loop
                            if (
                                bool(
                                    re.match(
                                        r"(?=(" + "|".join(skip_comment_expr) + r"))",
                                        lineStmt,
                                        re.I,
                                    )
                                )
                                is False
                            ):
                                uncommentedStmtPresent = True
                                logging.info(
                                    "In commented line separated by ; there is uncommented SQL statement which needs to process"
                                )
                                break

                    if mode == "SQL" and not uncommentedStmtPresent:
                        logging.info(
                            "SQL mode, skip the statement \
                                and goto next statement"
                        )
                        continue

                    elif mode == "BTEQ":
                        logging.info("BTEQ mode")

                        if (
                            bool(
                                re.match(
                                    r"(?=(" + "|".join(skip_expr_bteq) + r"))",
                                    stmt,
                                    re.I,
                                )
                            )
                            and not uncommentedStmtPresent
                        ):
                            logging.info(
                                "BTEQ mode, skip the statement \
                                    and goto next statement"
                            )
                            continue

                        if bool(re.match(remove_expr_bteq, stmt, re.I)):
                            logging.info(
                                "Statement start with begin block \
                                    hence remove BEGIN word"
                            )
                            stmt = re.split(remove_expr_bteq, stmt, flags=re.I)[1]

                    queryStr = ""
                    exec_time = datetime.datetime.now()

                    response_json = bq_client.query(
                        stmt, job_config=job_config
                    )._properties
                    # A dry run query completes immediately.
                    logging.debug(f"This query response json - {response_json}")

                    if response_json["status"]["state"] == "DONE":
                        successful_query_count += 1
                        statement_type = response_json["statistics"]["query"][
                            "statementType"
                        ]
                        bq_table_name = response_json["configuration"]["query"][
                            "destinationTable"
                        ]["tableId"]
                        logging.info("Nothing went wrong in _dry_run")
                        queryStr = f"INSERT INTO `{PROJECT_ID}.{BQ_RESULT_DATASET_NAME}.{BQ_RESULT_TABLE_NAME}`(unique_id,file_name,status,error_details,execution_start_time,gcs_input_path,gcs_output_path,bq_table_name,statement_type) VALUES ('{unique_id}','{filename}','success','','{exec_time}','{gcsSourcePath}','{gcsTargetPath}','{bq_table_name}','{statement_type}' )"
                    else:
                        failed_query_count += 1
                        logging.info("Dry run fail")
                        error_details = response_json["error"]["message"]
                        queryStr = f"INSERT INTO `{PROJECT_ID}.{BQ_RESULT_DATASET_NAME}.{BQ_RESULT_TABLE_NAME}`(unique_id,file_name,status,error_details,execution_start_time,gcs_input_path,gcs_output_path,bq_table_name,statement_type) VALUES ('{unique_id}','{filename}','fail','{error_details}','{exec_time}','{gcsSourcePath}','{gcsTargetPath}','', '')"

                    queryString = queryString + queryStr + ";"

            aggregated_results.append(
                {
                    "unique_id": unique_id,
                    "total_files": total_files_count,
                    "total_queries": successful_query_count + failed_query_count,
                    "successful_queries": successful_query_count,
                    "failed_queries": failed_query_count,
                }
            )

            results = {
                "unique_id": unique_id,
                "aggregated_results": aggregated_results,
                "queryString": queryString,
            }

            ti.xcom_push(key="dry_run_results", value=results)
            ti.xcom_push(key="config", value=kwargs["dag_run"].conf["config"])
            ti.xcom_push(
                key="validation_mode",
                value=config["validation_config"].get("validation_mode"),
            )

    else:
        logging.error("Configuration file is empty")