in source/lambda/data-preprocessing/index.py [0:0]
def run_preprocessing_job(input,
output,
timestamp,
s3_bucket=S3_BUCKET,
input_prefix=INPUT_PREFIX,
instance_type=INSTANCE_TYPE,
image_uri=IMAGE_URI
):
print("Creating SageMaker Processing job with inputs from {} and outputs to {}".format(input, output))
sagemaker_client = boto3.client('sagemaker')
region = boto3.session.Session().region_name
account_id = boto3.client('sts').get_caller_identity().get('Account')
ecr_repository_uri = '{}.dkr.ecr.{}.amazonaws.com/{}:latest'.format(account_id, region, image_uri)
# upload code
code_file = 'data-preprocessing/graph_data_preprocessor.py'
code_file_s3_key = os.path.join(input_prefix, timestamp, code_file)
s3_client.upload_file(code_file, s3_bucket, code_file_s3_key)
entrypoint = ["python3"] + [os.path.join("/opt/ml/processing/input/code",
os.path.basename(code_file))]
app_spec = {
'ImageUri': ecr_repository_uri,
'ContainerEntrypoint': entrypoint,
'ContainerArguments': ['--id-cols', 'card1,card2,card3,card4,card5,card6,ProductCD,addr1,addr2,P_emaildomain,R_emaildomain',
'--cat-cols','M1,M2,M3,M4,M5,M6,M7,M8,M9']
}
processing_inputs = [
{
'InputName': 'input1',
'S3Input': {
'S3Uri': input,
'LocalPath': '/opt/ml/processing/input',
'S3DataType': 'S3Prefix',
'S3InputMode': 'File',
}
},
{
'InputName': 'code',
'S3Input': {
'S3Uri': get_full_s3_path(s3_bucket, code_file_s3_key),
'LocalPath': '/opt/ml/processing/input/code',
'S3DataType': 'S3Prefix',
'S3InputMode': 'File',
}
},
]
processing_output = {'Outputs': [{'OutputName': 'output1',
'S3Output': {'S3Uri': output,
'LocalPath': '/opt/ml/processing/output',
'S3UploadMode': 'EndOfJob'}
}]}
processing_job_name = "sagemaker-graph-fraud-data-processing-{}".format(timestamp)
resources = {
'ClusterConfig': {
'InstanceCount': 1,
'InstanceType': instance_type,
'VolumeSizeInGB': 30
}
}
network_config = {'EnableNetworkIsolation': False}
stopping_condition = {'MaxRuntimeInSeconds': 3600}
response = sagemaker_client.create_processing_job(ProcessingInputs=processing_inputs,
ProcessingOutputConfig=processing_output,
ProcessingJobName=processing_job_name,
ProcessingResources=resources,
StoppingCondition=stopping_condition,
AppSpecification=app_spec,
NetworkConfig=network_config,
RoleArn=ROLE_ARN)
return response