def ingest_with_dw()

in utilities/Utils.py [0:0]


def ingest_with_dw(new_file_to_ingest, feature_group_name, 
                   instance_count=1, instance_type='ml.m5.4xlarge', prefix='data_wrangler_flows',
                   bucket=None, iam_role=None, processing_job_name=None, ):
    sess = sagemaker.Session()
    if bucket is None:
        bucket = sess.default_bucket()
    if iam_role is None:
        iam_role = sagemaker.get_execution_role()
    if processing_job_name is None:
        curr_timestamp = int(datetime.now().timestamp())    
        processing_job_name = f'dw-ingest-{curr_timestamp}'

    if region == 'us-east-1':
        container_uri = "663277389841.dkr.ecr.us-east-1.amazonaws.com/sagemaker-data-wrangler-container:1.3.1"
    elif region == 'us-east-2':
        container_uri = "415577184552.dkr.ecr.us-east-2.amazonaws.com/sagemaker-data-wrangler-container:1.3.0"
    processing_dir = "/opt/ml/processing"

    flow_id = f"{time.strftime('%d-%H-%M-%S', time.gmtime())}-{str(uuid.uuid4())[:8]}"
    flow_name = f'flow-{flow_id}'
    flow_location = f'{prefix}/{flow_name}.flow'
    flow_uri = f's3://{bucket}/{flow_location}'

    flow = _update_flow(new_file_to_ingest, bucket, flow_location)
    processingResources = {
            'ClusterConfig': {
                'InstanceCount': instance_count,
                'InstanceType': instance_type,
                'VolumeSizeInGB': 30
            }
        }

    appSpecification = {'ImageUri': container_uri}

    sagemaker_client = boto3.client("sagemaker")
    sagemaker_client.create_processing_job(
            ProcessingInputs=_create_processing_inputs(processing_dir, flow, flow_uri),
            ProcessingOutputConfig={
                'Outputs': [
                    {
                        'OutputName': 'e6a71ea2-dd1e-477f-964a-03238f974a35.default',
                        'FeatureStoreOutput': {
                            'FeatureGroupName': feature_group_name
                        },
                        'AppManaged': True
                    }
                ],
            },
            ProcessingJobName=processing_job_name,
            ProcessingResources=processingResources,
            AppSpecification=appSpecification,
            RoleArn=iam_role
        )
    return processing_job_name