in utilities/feature_store_helper.py [0:0]
def update_feature_pipeline(self, s3_uri_prefix: str, fg_name: str,
user_defined_script: str=None,
script_type: str='python',
enable: bool=True,
instance_type: str='ml.m5.xlarge', instance_count: int=1,
max_processes: int=16, max_workers: int=4,
max_runtime_in_seconds: int=3600) -> None:
"""Updates an existing feature pipeline to use a new script.
See schedule_feature_pipeline for details. Note that update_feature_pipeline takes
advantage of the existing schedule. To change the schedule, you must remove and
re-create the feature pipeline.
Args:
s3_uri_prefix (str): Raw data source, can be prefix for multiple files, currently only CSV supported.
fg_name (str): Name of the feature group this pipeline is associated with.
user_defined_script (str): Optional, filename for processing script.
script_type (str): Optional, 'python' or 'pyspark_sql' indicating type of script to be created.
enable (bool): Optional, whether or not to immediately enable the pipeline.
instance_type (str): Optional, instance type for the processing job.
instance_count (int): Optional, number of instances to use for the processing job.
max_processes (int): Optional, max number of processes to use for feature ingestion.
max_workers (int): Optional, max number of workers to use in each process.
max_runtime_in_seconds (int): Optional, max number of seconds permitted for this processing job to run, defaults to one hour.
"""
self._create_script(self._pipeline_script_tmp_name, user_defined_script, script_type)
# Create a new SageMaker Pipeline
pipeline_name = f'sm-pipeline-{fg_name}'
# 1/ Create a Processing Step to run the SageMaker Processing job
processor = None
if script_type == 'python':
processor = SKLearnProcessor(framework_version='0.20.0',
role=self._role,
instance_type=instance_type,
instance_count=instance_count,
base_job_name=pipeline_name)
proc_inputs = [ProcessingInput(s3_data_type='S3Prefix',
source=s3_uri_prefix,
s3_data_distribution_type='ShardedByS3Key',
destination="/opt/ml/processing/input")]
proc_outputs = []
job_args = ['--num_processes', str(max_processes),
'--num_workers', str(max_workers),
'--feature_group_name', fg_name,
'--region_name', self._region]
else:
processor = PySparkProcessor(framework_version='3.0',
role=self._role,
instance_type=instance_type,
instance_count=instance_count,
base_job_name=pipeline_name,
env={'AWS_DEFAULT_REGION': self._region,
'mode': 'python'},
max_runtime_in_seconds=max_runtime_in_seconds)
proc_inputs = []
job_args = ['--feature_group_name', fg_name,
'--region_name', self._region,
'--s3_uri_prefix', s3_uri_prefix]
proc_outputs = [ProcessingOutput(output_name='output-1',
source='/opt/ml/processing/spark-events/',
destination=f's3://{default_bucket}/spark-logs',
s3_upload_mode='Continuous')]
step_process = ProcessingStep(
name='FeaturePipelineStep',
processor=processor,
inputs=proc_inputs,
outputs=proc_outputs,
job_arguments=job_args,
code=self._pipeline_script_tmp_name
#,spark_event_logs_s3_uri=f's3://{default_bucket}/spark-logs'
)
# 2/ Create a simple pipeline containing that one step
pipeline = Pipeline(
name=pipeline_name,
parameters=[],
steps=[step_process],
)
# 3/ Save the new pipeline
pipeline.upsert(role_arn=self._role)
# 4/ To ensure the pipeline gets triggered immediately, disable and re-enable after short wait
if enable:
self.disable_feature_pipeline(fg_name)
time.sleep(3)
self.enable_feature_pipeline(fg_name)
return