in src/config_reader.py [0:0]
def main():
logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logging.info("Starting config_reader...")
exceptions = []
if not sys.argv[1]:
exceptions.append(
ValueError("ERROR: No Source Project argument provided!"))
if not sys.argv[2]:
exceptions.append(
ValueError("ERROR: No Source Dataset argument provided!"))
if not sys.argv[3]:
exceptions.append(
ValueError("ERROR: No Target Dataset argument provided!"))
if not sys.argv[4]:
exceptions.append(ValueError("ERROR: No Test flag argument provided"))
if exceptions:
raise ExceptionGroup("⛔ Invalid configurations:", exceptions)
source_project = sys.argv[1]
generate_query_bq_client.project = source_project
source_dataset = f"{source_project}.{sys.argv[2]}"
target_dataset = f"{source_project}.{sys.argv[3]}"
gen_test = sys.argv[4]
if not sys.argv[5]:
logging.info("SQL Flavour not provided. Defaulting to ECC.")
sql_flavour = "ECC"
else:
sql_flavour = sys.argv[5]
os.makedirs(_GENERATED_DAG_DIR, exist_ok=True)
os.makedirs(_GENERATED_SQL_DIR, exist_ok=True)
try:
# Load config from environment variable
cortex_config = load_config_file_from_env()
# Gets allowTelemetry config and defaults to True
allow_telemetry = cortex_config.get("allowTelemetry", True)
except (FileNotFoundError, PermissionError):
# File used by telemetry only and the path assumes execution
# using Cloud Build.
#
# Access while executing locally cannot be assumed. If file is not
# available then continue execution with telemetry not allowed.
#
# File not accessible or found, setting allow_telemetry to false.
allow_telemetry = False
# Get location from config
bq_location = cortex_config["location"]
# Read settings from settings file.
with open(_CONFIG_FILE, encoding="utf-8") as settings_file:
t = jinja2.Template(settings_file.read(),
trim_blocks=True,
lstrip_blocks=True)
resolved_configs = t.render({"sql_flavour": sql_flavour})
try:
configs = yaml.load(resolved_configs, Loader=yaml.SafeLoader)
except Exception as e:
raise SystemExit(
f"⛔️ Error reading '{_CONFIG_FILE}' file."
) from e
table_configs = configs["data_to_replicate"]
# Let's make sure table settings are specified correctly in the settings
# files.
# NOTE: We are doing this separately, and ahead of actual processing to
# make sure we capture any settings error early.
error_message = validate_table_configs(table_configs)
if error_message:
exit_message = (f"⛔ Invalid configurations in '{_CONFIG_FILE}'!! "
f"Reason: {error_message} ⛔")
raise SystemExit(exit_message)
# Process each table entry in the settings to create CDC table/view.
# This is done in parallel using multiple threads.
pool = futures.ThreadPoolExecutor(10)
threads = []
for table_config in table_configs:
threads.append(
pool.submit(process_table, table_config, source_dataset,
target_dataset, gen_test, allow_telemetry, bq_location))
if len(threads) > 0:
logging.info("Waiting for all tasks to complete...")
futures.wait(threads)
# In order to capture error from any of the threads,
# we need to access the result. If any individual thread
# throws an exception, it will be caught with this call.
# Otherwise, system will always exit with SUCCESS.
for t in threads:
_ = t.result()
logging.info("✅ config_reader done. ✅")