in utilities/feature_store_helper.py [0:0]
def schedule_feature_pipeline(self, s3_uri_prefix: str, fg_name: str,
user_defined_script: str=None,
schedule: str='rate(1 day)',
script_type: str='python',
enable: bool=True,
instance_type: str='ml.m5.xlarge', instance_count: int=1,
max_processes: int=16, max_workers: int=4,
max_runtime_in_seconds: int=3600) -> None:
"""Creates a brand new feature pipeline for an existing feature group.
Creates a SageMaker Pipeline containing a single step representing a SageMaker Processing job.
Uses SKLearn processor or PySpark processor depending on the script type. Creates a new IAM policy
to allow Amazon EventBridge to start the SageMaker Pipeline. Creates an EventBridge rule based
on the schedule provided. Associates the schedule rule with the Pipeline. Pipeline will be started
each time the schedule is triggered. Pipeline triggering can be enabled or disabled.
For script_type='python', the user_defined_script must contain an apply_transforms function that
takes in a Pandas dataframe and returns a transformed Pandas dataframe containing the exact set of
columns needed for ingesting to the target feature group.
For script_type='pyspark_sql', the user_defined_script must contain an transform_query function that
takes in a feature group name, and returns a SQL query that uses that feature group name in its FROM
clause to return the exact set of columns needed for ingesting to the target feature group.
The processing script will prepare a Spark dataframe with data from the data source, and will run
the SQL query that is returned by transform_query. The resulting Spark dataframe will be ingested.
If no user_defined_script is provided, the feature pipeline will simply ingest the features provided
from the data source (s3 in this case). Otherwise, the transformations will be performed on the input
data, and the resulting features will be ingested.
Args:
s3_uri_prefix (str): Raw data source, can be prefix for multiple files, currently only CSV supported.
fg_name (str): Name of the feature group this pipeline is associated with.
user_defined_script (str): Optional, filename for processing script.
schedule (str): Optional, cron scheduling expression compatible with Amazon EventBridge.
script_type (str): Optional, 'python' or 'pyspark_sql' indicating type of script to be created.
enable (bool): Optional, whether or not to immediately enable the pipeline.
instance_type (str): Optional, instance type for the processing job.
instance_count (int): Optional, number of instances to use for the processing job.
max_processes (int): Optional, max number of processes to use for feature ingestion.
max_workers (int): Optional, max number of workers to use in each process.
max_runtime_in_seconds (int): Optional, max number of seconds permitted for this processing job to run, defaults to one hour.
"""
self._create_script(self._pipeline_script_tmp_name, user_defined_script, script_type)
# Create a new SageMaker Pipeline
pipeline_name = f'sm-pipeline-{fg_name}'
# 1/ Create a Processing Step to run the SageMaker Processing job
processor = None
if script_type == 'python':
processor = SKLearnProcessor(framework_version='0.20.0',
role=self._role,
instance_type=instance_type,
instance_count=instance_count,
base_job_name=pipeline_name)
proc_inputs = [ProcessingInput(s3_data_type='S3Prefix',
source=s3_uri_prefix,
s3_data_distribution_type='ShardedByS3Key',
destination="/opt/ml/processing/input")]
proc_outputs = []
job_args = ['--num_processes', str(max_processes),
'--num_workers', str(max_workers),
'--feature_group_name', fg_name,
'--region_name', sagemaker.Session().boto_region_name]
else:
processor = PySparkProcessor(framework_version='3.0',
role=self._role,
instance_type=instance_type,
instance_count=instance_count,
base_job_name=pipeline_name,
env={'AWS_DEFAULT_REGION': self._region,
'mode': 'python'},
max_runtime_in_seconds=max_runtime_in_seconds)
proc_inputs = []
job_args = ['--feature_group_name', fg_name,
'--region_name', self._region,
'--s3_uri_prefix', s3_uri_prefix]
proc_outputs = [ProcessingOutput(output_name='output-1',
source='/opt/ml/processing/spark-events/',
destination=f's3://{default_bucket}/spark-logs',
s3_upload_mode='Continuous')]
step_process = ProcessingStep(
name='FeaturePipelineStep',
processor=processor,
inputs=proc_inputs,
outputs=proc_outputs,
job_arguments=job_args,
code=self._pipeline_script_tmp_name
#,spark_event_logs_s3_uri=f's3://{default_bucket}/spark-logs'
)
# 2/ Create a simple pipeline containing that one step
pipeline = Pipeline(
name=pipeline_name,
parameters=[],
steps=[step_process],
)
# 3/ Save the new pipeline
pipeline.upsert(role_arn=self._role)
# 4/ Get the pipeline arn
pipeline_descr = pipeline.describe()
pipeline_arn = pipeline_descr['PipelineArn']
# Create an IAM policy to enable EventBridge to start the SageMaker Pipeline
timestamp = int(time.time())
iam_client = boto3.client('iam')
role_name = f'Amazon_EventBridge_Invoke_SageMaker_Pipeline_{timestamp}'
FeatureStore._iam_pipeline_start_policy['Statement'][0]['Resource'] = pipeline_arn
event_bridge_role = iam_client.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(FeatureStore._iam_trust_policy),
Description='Policy to allow Amazon EventBridge to start a specific SageMaker Pipeline'
)
policy_res = iam_client.create_policy(
PolicyName=f'Amazon_EventBridge_Invoke_SageMaker_Pipeline_Policy_{timestamp}',
PolicyDocument=json.dumps(FeatureStore._iam_pipeline_start_policy)
)
policy_arn = policy_res['Policy']['Arn']
policy_attach_res = iam_client.attach_role_policy(
RoleName=event_bridge_role['Role']['RoleName'],
PolicyArn=policy_arn
)
# Create the new EventBridge rule to run the pipeline on a schedule
events_client = boto3.client('events')
rule_name = f'scheduled-{pipeline_name}'
events_client.put_rule(
Name=rule_name,
ScheduleExpression=schedule,
State='DISABLED',
Description='Automated creation of scheduled pipeline',
RoleArn=self._role,
EventBusName='default'
)
# Add the SageMaker Pipeline as a target to the EventBridge scheduled rule
tmp_id = str(uuid.uuid4())
events_client.put_targets(
Rule=rule_name,
Targets=[{'Id': tmp_id,
'Arn': pipeline_arn,
'RoleArn': event_bridge_role['Role']['Arn']
}]
)
if enable:
self.enable_feature_pipeline(fg_name)
return