in src/raw_dag_generator/generate_dags.py [0:0]
def process_table(bq_client, table_config, raw_dataset, raw_project):
api_name = table_config["api_name"]
base_table = table_config["base_table"].lower()
logging.info(" Generating files for '%s'", base_table)
python_template_file = Path(_TEMPLATE_DIR, "airflow_dag_sfdc_to_raw.py")
output_dag_py_file = Path(
_GENERATED_DAG_DIR,
(raw_project + "_" + raw_dataset + "_sfdc_extract_to_raw_"
+ base_table.replace(".", "_") + ".py"))
today = datetime.datetime.now()
load_frequency = table_config["load_frequency"]
subs = {
"project_id": raw_project,
"raw_dataset": raw_dataset,
"base_table": base_table,
"api_name": api_name,
"load_frequency": load_frequency,
"year": today.year,
"month": today.month,
"day": today.day
}
generate_file_from_template(python_template_file, output_dag_py_file,
**subs)
logging.info(" Generated dag python file")
raw_table = raw_project + "." + raw_dataset + "." + base_table
if not table_exists(bq_client, raw_table):
logging.info(
"Raw table %s doesn't exists. "
"Creating one according to the schema mapping.", raw_table)
schema_file = Path(_THIS_DIR,
f"../table_schema/{base_table}.csv").absolute()
schema_list = []
has_recordstamp = False
with open(
schema_file,
encoding="utf-8",
newline="",
) as csv_file:
for row in csv.DictReader(csv_file, delimiter=","):
source_name = row["SourceField"]
target_name = row["TargetField"]
if "recordstamp" in [source_name.lower(), target_name.lower()]:
has_recordstamp = True
schema_list.append((source_name, row["DataType"]))
# If we handle raw tables, we need Recordstamp field.
if not has_recordstamp:
schema_list.append(("Recordstamp", "TIMESTAMP"))
create_table(bq_client, raw_table, schema_list)