def example_run_inference_pipeline()

in modules/pipeline/inference_pipeline_run.py [0:0]


def example_run_inference_pipeline(workflow_arn, region):
    """
    execute the Workflow, which consists of four steps:

    1. Define job names for pre-processing, training, and evaluation
    2. Upload source code for pre-processing, training, and evaluation
    3. Define URLs for the input, output, and intermediary data
    4. Execute the workflow with populated parameters, and monitor the progress
    5. Inspect the evaluation result when the execution is completed
    """
    inference_pipeline = get_existing_inference_pipeline(workflow_arn)

    # Step 1 - Generate unique names for Pre-Processing Job, Training Job
    # (Batch Transform)
    unique_id = uuid.uuid1().hex
    preprocessing_job_name = f"sklearn-sm-preprocessing-{unique_id}"
    inference_job_name = f"sklearn-sm-inference-{unique_id}"

    # Step 2 - Upload source code (pre-processing, inference) to S3
    PREPROCESSING_SCRIPT_LOCATION = "../../src/mlmax/preprocessing.py"
    INFERENCE_SCRIPT_LOCATION = "../../src/mlmax/inference.py"

    sagemaker_session = sagemaker.Session()
    s3_bucket_base_uri = f"s3://{sagemaker_session.default_bucket()}"
    # upload preprocessing script
    input_preprocessing_code = sagemaker_session.upload_data(
        PREPROCESSING_SCRIPT_LOCATION,
        bucket=sagemaker_session.default_bucket(),
        key_prefix=f"{preprocessing_job_name}/source",
    )
    print(f"Using preprocessing script from {input_preprocessing_code}")
    # upload inference script
    input_inference_code = sagemaker_session.upload_data(
        INFERENCE_SCRIPT_LOCATION,
        bucket=sagemaker_session.default_bucket(),
        key_prefix=f"{inference_job_name}/source",
    )

    # Step 3 - Get the lastest preprocessing and ml models
    # TODO: allow user to pass this as optional input
    proc_model_s3, model_s3 = get_latest_models()
    print(f"Using proc_model_s3: {proc_model_s3}")
    print(f"Using model_s3: {model_s3}")

    # Step 4 - Define data URLs, preprocessed data URLs can
    # be made specifically to this training job
    sagemaker_session = sagemaker.Session()

    input_data = (
        f"s3://sagemaker-sample-data-{region}/processing/census/census-income.csv"
    )

    s3_bucket_base_uri = "{}{}".format("s3://", sagemaker_session.default_bucket())
    output_data = f"{s3_bucket_base_uri}/{preprocessing_job_name}/output"
    preprocessed_training_data = f"{output_data}/train_data"
    preprocessed_test_data = f"{output_data}/test_data"

    # Step 5 - Execute workflow
    print(f"Preprocessing Job Name is {preprocessing_job_name}")
    print(f"Inference Job Name is {inference_job_name}")
    execution = inference_pipeline.execute(
        inputs={
            "InputDataURL": input_data,
            "PreprocessingJobName": preprocessing_job_name,
            "InferenceJobName": inference_job_name,
            "ProcModelS3": proc_model_s3,
            "PreprocessingCodeURL": input_preprocessing_code,
            "InferenceCodeURL": input_inference_code,
            "ModelS3": model_s3,
            "PreprocessedTrainDataURL": preprocessed_training_data,
            "PreprocessedTestDataURL": preprocessed_test_data,
            "OutputPathURL": f"{s3_bucket_base_uri}/{inference_job_name}/output",
        }
    )
    execution.get_output(wait=True)
    execution.render_progress()