def update_feature_pipeline()

in utilities/feature_store_helper.py [0:0]


    def update_feature_pipeline(self, s3_uri_prefix: str, fg_name: str, 
                                user_defined_script: str=None,
                                script_type: str='python',
                                enable: bool=True, 
                                instance_type: str='ml.m5.xlarge', instance_count: int=1,
                                max_processes: int=16, max_workers: int=4,
                                max_runtime_in_seconds: int=3600) -> None:
        """Updates an existing feature pipeline to use a new script.
        
        See schedule_feature_pipeline for details. Note that update_feature_pipeline takes
        advantage of the existing schedule. To change the schedule, you must remove and 
        re-create the feature pipeline.
        
        Args:
            s3_uri_prefix (str): Raw data source, can be prefix for multiple files, currently only CSV supported.
            fg_name (str): Name of the feature group this pipeline is associated with.
            user_defined_script (str): Optional, filename for processing script.
            script_type (str): Optional, 'python' or 'pyspark_sql' indicating type of script to be created.
            enable (bool): Optional, whether or not to immediately enable the pipeline.
            instance_type (str): Optional, instance type for the processing job.
            instance_count (int): Optional, number of instances to use for the processing job.
            max_processes (int): Optional, max number of processes to use for feature ingestion.
            max_workers (int): Optional, max number of workers to use in each process.
            max_runtime_in_seconds (int): Optional, max number of seconds permitted for this processing job to run, defaults to one hour.
        """

        self._create_script(self._pipeline_script_tmp_name, user_defined_script, script_type)

        # Create a new SageMaker Pipeline
        pipeline_name = f'sm-pipeline-{fg_name}'

        # 1/ Create a Processing Step to run the SageMaker Processing job
        processor = None
        if script_type == 'python':
            processor = SKLearnProcessor(framework_version='0.20.0',
                                             role=self._role,
                                             instance_type=instance_type,
                                             instance_count=instance_count,
                                             base_job_name=pipeline_name)
            proc_inputs = [ProcessingInput(s3_data_type='S3Prefix', 
                                source=s3_uri_prefix, 
                                s3_data_distribution_type='ShardedByS3Key', 
                                destination="/opt/ml/processing/input")]
            proc_outputs = []
            job_args = ['--num_processes', str(max_processes), 
                           '--num_workers', str(max_workers),
                           '--feature_group_name', fg_name,
                           '--region_name', self._region]
        else:
            processor = PySparkProcessor(framework_version='3.0',
                                role=self._role,
                                instance_type=instance_type,  
                                instance_count=instance_count,
                                base_job_name=pipeline_name,
                                env={'AWS_DEFAULT_REGION': self._region,
                                     'mode': 'python'},
                                max_runtime_in_seconds=max_runtime_in_seconds)
            proc_inputs = []
            job_args = ['--feature_group_name', fg_name,
                        '--region_name', self._region,
                        '--s3_uri_prefix', s3_uri_prefix]
            proc_outputs = [ProcessingOutput(output_name='output-1',
                                             source='/opt/ml/processing/spark-events/',
                                             destination=f's3://{default_bucket}/spark-logs',
                                             s3_upload_mode='Continuous')]

        step_process = ProcessingStep(
            name='FeaturePipelineStep',
            processor=processor,
            inputs=proc_inputs,
            outputs=proc_outputs,
            job_arguments=job_args,
            code=self._pipeline_script_tmp_name 
            #,spark_event_logs_s3_uri=f's3://{default_bucket}/spark-logs'
        )

        # 2/ Create a simple pipeline containing that one step
        pipeline = Pipeline(
            name=pipeline_name,
            parameters=[],
            steps=[step_process],
        )

        # 3/ Save the new pipeline
        pipeline.upsert(role_arn=self._role)

        # 4/ To ensure the pipeline gets triggered immediately, disable and re-enable after short wait
        if enable:
            self.disable_feature_pipeline(fg_name)
            time.sleep(3)
            self.enable_feature_pipeline(fg_name)
        return