in utilities/feature_store_helper.py [0:0]
def ingest_with_dw(self, new_file_to_ingest, feature_group_name,
instance_count=1, instance_type='ml.m5.4xlarge', prefix='data_wrangler_flows',
bucket=None, iam_role=None, processing_job_name=None):
if bucket is None:
bucket = self._default_bucket
if iam_role is None:
iam_role = self._role
if processing_job_name is None:
curr_timestamp = int(datetime.now().timestamp())
processing_job_name = f'dw-ingest-{curr_timestamp}'
if region == 'us-east-1':
container_uri = "663277389841.dkr.ecr.us-east-1.amazonaws.com/sagemaker-data-wrangler-container:1.3.1"
elif region == 'us-east-2':
container_uri = "415577184552.dkr.ecr.us-east-2.amazonaws.com/sagemaker-data-wrangler-container:1.3.0"
processing_dir = "/opt/ml/processing"
flow_id = f"{time.strftime('%d-%H-%M-%S', time.gmtime())}-{str(uuid.uuid4())[:8]}"
flow_name = f'flow-{flow_id}'
flow_location = f'{prefix}/{flow_name}.flow'
flow_uri = f's3://{bucket}/{flow_location}'
flow = self._update_flow(new_file_to_ingest, bucket, flow_location)
processingResources = {
'ClusterConfig': {
'InstanceCount': instance_count,
'InstanceType': instance_type,
'VolumeSizeInGB': 30
}
}
appSpecification = {'ImageUri': container_uri}
self._sm_client.create_processing_job(
ProcessingInputs=_create_processing_inputs(processing_dir, flow, flow_uri),
ProcessingOutputConfig={
'Outputs': [
{
'OutputName': 'e6a71ea2-dd1e-477f-964a-03238f974a35.default',
'FeatureStoreOutput': {
'FeatureGroupName': feature_group_name
},
'AppManaged': True
}
],
},
ProcessingJobName=processing_job_name,
ProcessingResources=processingResources,
AppSpecification=appSpecification,
RoleArn=iam_role
)
return processing_job_name