def example_run_training_pipeline()

in modules/pipeline/training_pipeline_run.py [0:0]


def example_run_training_pipeline(workflow_arn, region):
    """
    execute the Workflow, which consists of four steps:

    1. Define job names for pre-processing, training, and evaluation
    2. Upload source code for pre-processing, training, and evaluation
    3. Define URLs for the input, output, and intermediary data
    4. Execute the workflow with populated parameters, and monitor the progress
    5. Inspect the evaluation result when the execution is completed
    """

    training_pipeline = get_existing_training_pipeline(workflow_arn)

    # Step 1 - Generate unique names for Pre-Processing Job, Training Job, and
    unique_id = uuid.uuid1().hex
    # pipeline_job_name = f"pipeline-job-{unique_id}"
    training_job_name = f"scikit-learn-training-{unique_id}"
    preprocessing_job_name = f"scikit-learn-sm-preprocessing-{unique_id}"
    evaluation_job_name = f"scikit-learn-sm-evaluation-{unique_id}"

    # Step 2 - Upload source code (pre-processing, evaluation, and train) to sagemaker
    PREPROCESSING_SCRIPT_LOCATION = "../../src/mlmax/preprocessing.py"
    EVALUATION_SCRIPT_LOCATION = "../../src/mlmax/evaluation.py"
    TRAINING_SCRIPT_LOCATION = "../../src/mlmax/train.py"

    sagemaker_session = sagemaker.Session()
    input_preprocessing_code = sagemaker_session.upload_data(
        PREPROCESSING_SCRIPT_LOCATION,
        bucket=sagemaker_session.default_bucket(),
        key_prefix=f"{preprocessing_job_name}/source",
    )
    input_evaluation_code = sagemaker_session.upload_data(
        EVALUATION_SCRIPT_LOCATION,
        bucket=sagemaker_session.default_bucket(),
        key_prefix=f"{evaluation_job_name}/source",
    )
    s3_bucket_base_uri = f"s3://{sagemaker_session.default_bucket()}"
    sm_submit_dir_url = (
        f"{s3_bucket_base_uri}/{training_job_name}/source/sourcedir.tar.gz"
    )
    tar = tarfile.open("/tmp/sourcedir.tar.gz", "w:gz")
    # TODO need to add directory if source_dir is specified.
    tar.add(TRAINING_SCRIPT_LOCATION, arcname="train.py")
    tar.close()
    sagemaker_session.upload_data(
        "/tmp/sourcedir.tar.gz",
        bucket=sagemaker_session.default_bucket(),
        key_prefix=f"{training_job_name}/source",
    )

    # Step 3 - Define data URLs, preprocessed data URLs can be made
    # specifically to this training job
    input_data = (
        f"s3://sagemaker-sample-data-{region}/processing/census/census-income.csv"
    )
    output_data = f"{s3_bucket_base_uri}/{preprocessing_job_name}/output"
    preprocessed_training_data = f"{output_data}/train_data"
    preprocessed_test_data = f"{output_data}/test_data"
    preprocessed_model_url = f"{s3_bucket_base_uri}/{preprocessing_job_name}/output"
    # Step 4 - Execute workflow
    print(f"Training Job Name is {training_job_name}")
    execution = training_pipeline.execute(
        inputs={
            "InputDataURL": input_data,
            # Each pre processing job (SageMaker processing job) requires a unique name,
            "PreprocessingJobName": preprocessing_job_name,
            "PreprocessingCodeURL": input_preprocessing_code,
            # Each Sagemaker Training job requires a unique name,
            "TrainingJobName": training_job_name,
            "SMSubmitDirURL": sm_submit_dir_url,
            "SMRegion": region,
            # Each SageMaker processing job requires a unique name,
            "EvaluationProcessingJobName": evaluation_job_name,
            "EvaluationCodeURL": input_evaluation_code,
            "EvaluationResultURL": (
                f"{s3_bucket_base_uri}/{training_job_name}/evaluation"
            ),
            "PreprocessedTrainDataURL": preprocessed_training_data,
            "PreprocessedTestDataURL": preprocessed_test_data,
            "PreprocessedModelURL": preprocessed_model_url,
            "SMOutputDataURL": f"{s3_bucket_base_uri}/",
            "SMDebugOutputURL": f"{s3_bucket_base_uri}/",
        }
    )
    execution.get_output(wait=True)
    execution.render_progress()

    # Step 5 - Inspect the output of the Workflow execution
    workflow_execution_output_json = execution.get_output(wait=True)
    import json

    from sagemaker.s3 import S3Downloader

    evaluation_output_config = workflow_execution_output_json["ProcessingOutputConfig"]
    for output in evaluation_output_config["Outputs"]:
        if output["OutputName"] == "evaluation":
            evaluation_s3_uri = "{}/{}".format(
                output["S3Output"]["S3Uri"], "evaluation.json"
            )
            break

    evaluation_output = S3Downloader.read_file(evaluation_s3_uri)
    evaluation_output_dict = json.loads(evaluation_output)
    print(json.dumps(evaluation_output_dict, sort_keys=True, indent=4))