def start_pipeline_execution()

in CommonLayerCode/datalake-library/python/datalake_library/octagon/peh.py [0:0]


    def start_pipeline_execution(self, pipeline_name, dataset_date=None, comment=None):
        self.logger.debug("peh start_pipeline_execution() called")

        throw_none_or_empty(pipeline_name, "Pipeline name is not specified")

        if dataset_date:
            validate_date(dataset_date)

        if not self.check_pipeline(pipeline_name):
            self.logger.error(f"Pipeline doesn't exist or inactive : {pipeline_name}")
            return None

        peh_id = str(uuid.uuid4())
        current_time = datetime.datetime.utcnow()
        utc_time_iso = get_timestamp_iso(current_time)
        local_date_iso = get_local_date()

        item = {}
        # Add extra fields
        item["id"] = peh_id
        item["version"] = 1
        item["pipeline"] = pipeline_name
        item["active"] = True
        item["execution_date"] = local_date_iso

        if not dataset_date:
            item["dataset_date"] = local_date_iso
        else:
            item["dataset_date"] = dataset_date

        item["status"] = PEH_STATUS_STARTED

        if comment:
            item["comment"] = comment
        else:
            item["comment"] = f"Pipeline: {pipeline_name} has started execution"

        item["start_timestamp"] = utc_time_iso
        item["last_updated_timestamp"] = utc_time_iso
        item["status_last_updated_timestamp"] = PEH_STATUS_STARTED + "#" + utc_time_iso
        item["history"] = [{"status": PEH_STATUS_STARTED, "timestamp": utc_time_iso}]

        if self.peh_ttl > 0:
            item["ttl"] = get_ttl(self.peh_ttl)

        self.peh_table.put_item(Item=item)

        self.client.set_pipeline_execution(peh_id, pipeline_name)

        return peh_id