in tasks.py [0:0]
def apply_config_parameters_to_all_tables(c, env_name="prod"):
"""
Applies parameters from a configuration file and table schemas to .sqlx table template files,
generating new .sql files for each template.
Args:
c (object): An object providing context for task execution (e.g., task runner).
env_name (str, optional): Name of the environment to use for configuration. Defaults to "prod".
Raises:
FileNotFoundError: If specified configuration or schema files are not found.
ValueError: If configuration or schema files are invalid or missing required keys.
TemplateError: If an error occurs during template rendering.
JSONDecodeError: If a schema file cannot be parsed as JSON.
Example usage:
apply_config_parameters_to_all_tables(my_task_context, env_name="dev")
"""
import json
# Load configuration file according to environment name
current_path = Path(__file__).parent.resolve()
conf = yaml.safe_load(Path.joinpath(current_path,"config", "{}.yaml".format(env_name)).read_text())
# Fetch all query configs <key,value> to be applied
query_dict = conf['bigquery']['table']
# Locate file path for all templates to be used
template_path = Path.joinpath(current_path,"sql","table")
templateLoader = FileSystemLoader(searchpath=template_path)
templateEnv = Environment(loader=templateLoader)
# Locate file path for all table schemas to be used
schema_path = Path.joinpath(current_path,"sql","schema","table")
# Open the file in read-only mode
for template_file in template_path.iterdir():
if template_file.is_file() and template_file.resolve().suffix == '.sqlx':
for schema_file in schema_path.iterdir():
if schema_file.is_file() and schema_file.resolve().suffix == '.json' and template_file.resolve().stem == schema_file.resolve().stem:
with schema_file.open("r", encoding="utf-8") as f:
table_schema = str(f.read())
table_schema_dict = json.loads(table_schema)
columns_str = ""
for row in table_schema_dict:
columns_str+=(row['name'] + ' ' + row['type'] + ' ' + """OPTIONS (description = '""" + row['description'] + """'), """)
config_dict = {"columns": columns_str, **query_dict[template_file.stem]}
template = templateEnv.get_template(template_file.name)
new_sql = template.render(config_dict)
rendered_sql_file = Path.joinpath(template_path, template_file.resolve().stem+".sql")
with rendered_sql_file.open("w+", encoding ="utf-8") as f:
f.write(new_sql)
print("New SQL file rendered at {}".format(rendered_sql_file))