def run_preprocessing_job()

in source/lambda/data-preprocessing/index.py [0:0]


def run_preprocessing_job(input,
                          output,
                          timestamp,
                          s3_bucket=S3_BUCKET,
                          input_prefix=INPUT_PREFIX,
                          instance_type=INSTANCE_TYPE,
                          image_uri=IMAGE_URI
                          ):
    print("Creating SageMaker Processing job with inputs from {} and outputs to {}".format(input, output))

    sagemaker_client = boto3.client('sagemaker')

    region = boto3.session.Session().region_name
    account_id = boto3.client('sts').get_caller_identity().get('Account')
    ecr_repository_uri = '{}.dkr.ecr.{}.amazonaws.com/{}:latest'.format(account_id, region, image_uri)

    # upload code
    code_file = 'data-preprocessing/graph_data_preprocessor.py'
    code_file_s3_key = os.path.join(input_prefix, timestamp, code_file)
    s3_client.upload_file(code_file, s3_bucket, code_file_s3_key)

    entrypoint = ["python3"] + [os.path.join("/opt/ml/processing/input/code",
                                             os.path.basename(code_file))]

    app_spec = {
        'ImageUri': ecr_repository_uri,
        'ContainerEntrypoint': entrypoint,
        'ContainerArguments': ['--id-cols', 'card1,card2,card3,card4,card5,card6,ProductCD,addr1,addr2,P_emaildomain,R_emaildomain',
                                '--cat-cols','M1,M2,M3,M4,M5,M6,M7,M8,M9']
    }

    processing_inputs = [
        {
            'InputName': 'input1',
            'S3Input': {
                'S3Uri': input,
                'LocalPath': '/opt/ml/processing/input',
                'S3DataType': 'S3Prefix',
                'S3InputMode': 'File',
            }
        },
        {
            'InputName': 'code',
            'S3Input': {
                'S3Uri': get_full_s3_path(s3_bucket, code_file_s3_key),
                'LocalPath': '/opt/ml/processing/input/code',
                'S3DataType': 'S3Prefix',
                'S3InputMode': 'File',
            }
        },

    ]
    processing_output = {'Outputs': [{'OutputName': 'output1',
                                      'S3Output': {'S3Uri': output,
                                                   'LocalPath': '/opt/ml/processing/output',
                                                   'S3UploadMode': 'EndOfJob'}
                                      }]}

    processing_job_name = "sagemaker-graph-fraud-data-processing-{}".format(timestamp)
    resources = {
        'ClusterConfig': {
            'InstanceCount': 1,
            'InstanceType': instance_type,
            'VolumeSizeInGB': 30
        }
    }

    network_config = {'EnableNetworkIsolation': False}
    stopping_condition = {'MaxRuntimeInSeconds': 3600}

    response = sagemaker_client.create_processing_job(ProcessingInputs=processing_inputs,
                                                      ProcessingOutputConfig=processing_output,
                                                      ProcessingJobName=processing_job_name,
                                                      ProcessingResources=resources,
                                                      StoppingCondition=stopping_condition,
                                                      AppSpecification=app_spec,
                                                      NetworkConfig=network_config,
                                                      RoleArn=ROLE_ARN)
    return response