def __init__()

in dag_utils/tools.py [0:0]


    def __init__(self,
                 env_vars={},
                 dbt_vars=None,
                 doc_dirs=[],
                 capture_docs=True,
                 **kwargs):

        # Set DBT_VARS environment variable if necessary
        if dbt_vars:
            env_vars['DBT_VARS'] = json.dumps(dbt_vars)

        # Disable colours on output -- Airflow does not render it
        env_vars.setdefault('DBT_USE_COLORS', 'false')

        # Disable anonymous usage stats
        env_vars.setdefault('DBT_SEND_ANONYMOUS_USAGE_STATS', 'false')

        # Enable JSON logging (if desired)
        # env_vars.setdefault('DBT_LOG_FORMAT', 'json')

        # Add the general DBT environment variables
        env_vars.update({
            'DBT_ENV_CUSTOM_ENV_PROJECT_ID':
                '{{ var.value.PROJECT_ID }}',
            'DBT_ENV_CUSTOM_ENV_REGION':
                '{{ var.value.REGION }}',
            'DBT_ENV_CUSTOM_ENV_BQ_LOCATION':
                '{{ var.value.BQ_LOCATION }}',
            'DBT_ENV_CUSTOM_ENV_GCS_DOCS_BUCKET':
                '{{ var.value.GCS_DOCS_BUCKET }}',
        })

        # Add generic Airflow environment variables
        env_vars.update({
            'DBT_ENV_CUSTOM_ENV_AIRFLOW_BASE_URL':
                os.getenv('AIRFLOW__WEBSERVER__BASE_URL'),
            'DBT_ENV_CUSTOM_ENV_AIRFLOW_CTX_TASK_ID':
                '{{ task.task_id }}',
            'DBT_ENV_CUSTOM_ENV_AIRFLOW_CTX_DAG_ID':
                '{{ dag_run.dag_id }}',
            'DBT_ENV_CUSTOM_ENV_AIRFLOW_CTX_EXECUTION_DATE':
                '{{ execution_date | ts }}',
        })

        if capture_docs:
            doc_dirs = doc_dirs + [
                '/dbt/target',
                '/dbt/logs',
            ]

        super().__init__(
            env_vars=env_vars,
            doc_dirs=doc_dirs,
            **kwargs)