in dag_utils/tools.py [0:0]
def __init__(self,
env_vars={},
dbt_vars=None,
doc_dirs=[],
capture_docs=True,
**kwargs):
# Set DBT_VARS environment variable if necessary
if dbt_vars:
env_vars['DBT_VARS'] = json.dumps(dbt_vars)
# Disable colours on output -- Airflow does not render it
env_vars.setdefault('DBT_USE_COLORS', 'false')
# Disable anonymous usage stats
env_vars.setdefault('DBT_SEND_ANONYMOUS_USAGE_STATS', 'false')
# Enable JSON logging (if desired)
# env_vars.setdefault('DBT_LOG_FORMAT', 'json')
# Add the general DBT environment variables
env_vars.update({
'DBT_ENV_CUSTOM_ENV_PROJECT_ID':
'{{ var.value.PROJECT_ID }}',
'DBT_ENV_CUSTOM_ENV_REGION':
'{{ var.value.REGION }}',
'DBT_ENV_CUSTOM_ENV_BQ_LOCATION':
'{{ var.value.BQ_LOCATION }}',
'DBT_ENV_CUSTOM_ENV_GCS_DOCS_BUCKET':
'{{ var.value.GCS_DOCS_BUCKET }}',
})
# Add generic Airflow environment variables
env_vars.update({
'DBT_ENV_CUSTOM_ENV_AIRFLOW_BASE_URL':
os.getenv('AIRFLOW__WEBSERVER__BASE_URL'),
'DBT_ENV_CUSTOM_ENV_AIRFLOW_CTX_TASK_ID':
'{{ task.task_id }}',
'DBT_ENV_CUSTOM_ENV_AIRFLOW_CTX_DAG_ID':
'{{ dag_run.dag_id }}',
'DBT_ENV_CUSTOM_ENV_AIRFLOW_CTX_EXECUTION_DATE':
'{{ execution_date | ts }}',
})
if capture_docs:
doc_dirs = doc_dirs + [
'/dbt/target',
'/dbt/logs',
]
super().__init__(
env_vars=env_vars,
doc_dirs=doc_dirs,
**kwargs)