in client/migrationx-transformer/src/main/python/airflow_dag_parser/patch/task_instance_patch.py [0:0]
def get_template_context(self, session=None):
task = self.task
from airflow import macros
tables = None
if 'tables' in task.params:
tables = task.params['tables']
params = {}
run_id = ''
dag_run = DagRun(
dag_id=self.task.dag.dag_id,
run_id='dag_parser_' + datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
execution_date=datetime.now(),
start_date=datetime.now(),
state=State.RUNNING
)
ds = self.execution_date.strftime('%Y-%m-%d')
ts = self.execution_date.isoformat()
yesterday_ds = (self.execution_date - timedelta(1)).strftime('%Y-%m-%d')
tomorrow_ds = (self.execution_date + timedelta(1)).strftime('%Y-%m-%d')
# For manually triggered dagruns that aren't run on a schedule, next/previous
# schedule dates don't make sense, and should be set to execution date for
# consistency with how execution_date is set for manually triggered tasks, i.e.
# triggered_date == execution_date.
if dag_run and dag_run.external_trigger:
prev_execution_date = self.execution_date
next_execution_date = self.execution_date
else:
prev_execution_date = task.dag.previous_schedule(self.execution_date)
next_execution_date = task.dag.following_schedule(self.execution_date)
next_ds = None
next_ds_nodash = None
if next_execution_date:
next_ds = next_execution_date.strftime('%Y-%m-%d')
next_ds_nodash = next_ds.replace('-', '')
prev_ds = None
prev_ds_nodash = None
if prev_execution_date:
prev_ds = prev_execution_date.strftime('%Y-%m-%d')
prev_ds_nodash = prev_ds.replace('-', '')
ds_nodash = ds.replace('-', '')
ts_nodash = self.execution_date.strftime('%Y%m%dT%H%M%S')
ts_nodash_with_tz = ts.replace('-', '').replace(':', '')
yesterday_ds_nodash = yesterday_ds.replace('-', '')
tomorrow_ds_nodash = tomorrow_ds.replace('-', '')
ti_key_str = "{dag_id}__{task_id}__{ds_nodash}".format(
dag_id=task.dag_id, task_id=task.task_id, ds_nodash=ds_nodash)
if task.params:
params.update(task.params)
if configuration.getboolean('core', 'dag_run_conf_overrides_params'):
self.overwrite_params_with_dag_run_conf(params=params, dag_run=dag_run)
class VariableAccessor:
"""
Wrapper around Variable. This way you can get variables in templates by using
{var.value.your_variable_name}.
"""
def __init__(self):
self.var = None
def __getattr__(self, item):
self.var = Variable.get(item)
return self.var
def __repr__(self):
return str(self.var)
class VariableJsonAccessor:
"""
Wrapper around deserialized Variables. This way you can get variables
in templates by using {var.json.your_variable_name}.
"""
def __init__(self):
self.var = None
def __getattr__(self, item):
self.var = Variable.get(item, deserialize_json=True)
return self.var
def __repr__(self):
return str(self.var)
return {
'dag': task.dag,
'ds': ds,
'next_ds': next_ds,
'next_ds_nodash': next_ds_nodash,
'prev_ds': prev_ds,
'prev_ds_nodash': prev_ds_nodash,
'ds_nodash': ds_nodash,
'ts': ts,
'ts_nodash': ts_nodash,
'ts_nodash_with_tz': ts_nodash_with_tz,
'yesterday_ds': yesterday_ds,
'yesterday_ds_nodash': yesterday_ds_nodash,
'tomorrow_ds': tomorrow_ds,
'tomorrow_ds_nodash': tomorrow_ds_nodash,
'END_DATE': ds,
'end_date': ds,
'dag_run': dag_run,
'run_id': run_id,
'execution_date': self.execution_date,
'prev_execution_date': prev_execution_date,
'next_execution_date': next_execution_date,
'latest_date': ds,
'macros': macros,
'params': params,
'tables': tables,
'task': task,
'task_instance': self,
'ti': self,
'task_instance_key_str': ti_key_str,
'conf': configuration,
'test_mode': self.test_mode,
'var': {
'value': VariableAccessor(),
'json': VariableJsonAccessor()
},
'inlets': task.inlets,
'outlets': task.outlets,
}
pass