in CommonLayerCode/datalake-library/python/datalake_library/octagon/peh.py [0:0]
def start_pipeline_execution(self, pipeline_name, dataset_date=None, comment=None):
self.logger.debug("peh start_pipeline_execution() called")
throw_none_or_empty(pipeline_name, "Pipeline name is not specified")
if dataset_date:
validate_date(dataset_date)
if not self.check_pipeline(pipeline_name):
self.logger.error(f"Pipeline doesn't exist or inactive : {pipeline_name}")
return None
peh_id = str(uuid.uuid4())
current_time = datetime.datetime.utcnow()
utc_time_iso = get_timestamp_iso(current_time)
local_date_iso = get_local_date()
item = {}
# Add extra fields
item["id"] = peh_id
item["version"] = 1
item["pipeline"] = pipeline_name
item["active"] = True
item["execution_date"] = local_date_iso
if not dataset_date:
item["dataset_date"] = local_date_iso
else:
item["dataset_date"] = dataset_date
item["status"] = PEH_STATUS_STARTED
if comment:
item["comment"] = comment
else:
item["comment"] = f"Pipeline: {pipeline_name} has started execution"
item["start_timestamp"] = utc_time_iso
item["last_updated_timestamp"] = utc_time_iso
item["status_last_updated_timestamp"] = PEH_STATUS_STARTED + "#" + utc_time_iso
item["history"] = [{"status": PEH_STATUS_STARTED, "timestamp": utc_time_iso}]
if self.peh_ttl > 0:
item["ttl"] = get_ttl(self.peh_ttl)
self.peh_table.put_item(Item=item)
self.client.set_pipeline_execution(peh_id, pipeline_name)
return peh_id