in src/ab/task/task.py [0:0]
def lazy_init(self):
"""
heavy weight init
"""
self.engine = Engine.get_instance(self.request.get('engine'))
self.algorithm = Algorithm.get_instance(self.request['algorithm'], self.engine._type)
# update spark app id as early as possible
if 'spark' in self.algorithm.params:
try:
self.spark = self.kwargs['spark'] = spark.get_or_create()
self.recorder.update_spark_app_id(self.spark.sparkContext.applicationId)
except AttributeError:
if 'spark' not in self.algorithm.defaults_map:
raise
self.data_source = DataSource.get_instance(**self.request)
if 'task_id' in self.algorithm.params:
self.kwargs['task_id'] = self.id
# TODO only set when key not exists in kwargs
if self.data_source:
# read data
if 'data' in self.algorithm.params:
if getattr(self.data_source,"sql"):
# read from custom sql
d = self.engine.read_data_by_sql(self.data_source)
self.kwargs['sample_rate'], self.kwargs['sample_count'], self.kwargs['data'] = \
1, len(d), d
else:
# sample from table
self.kwargs['sample_rate'], self.kwargs['sample_count'], self.kwargs['data'] = \
self.engine.read_data(self.data_source)
# underlying db
if 'db' in self.algorithm.params:
self.kwargs = self.data_source.db
# get table meta from db
if 'table_info' in self.algorithm.params:
self.kwargs['table_info'] = self.data_source.get_table_info()
if 'cache_client' in self.algorithm.params:
self.kwargs['cache_client'] = cache_plugin.get_cache_client()
if 'dfs_client' in self.algorithm.params:
self.kwargs['dfs_client'] = dfs.get_instance()
if 'eureka_client' in self.algorithm.params:
self.kwargs['eureka_client'] = eureka.get_instance()
if 'recorder' in self.algorithm.params:
self.kwargs['recorder'] = self.recorder
used_fixtures = set(self.algorithm.params) & fixture.fixtures.keys()
for f in used_fixtures:
ret = fixture.fixtures[f].run(self.request, self.kwargs)
if ret is not None:
if f in self.kwargs and not fixture.fixtures[f].overwrite:
raise AlgorithmException(data='fixture try to overwrite param {f}'.format(f=f))
self.kwargs[f] = ret