repos/build_pipeline/infra/sm_pipeline_utils.py [11:90]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
logger = logging.getLogger()
logger.setLevel(logging.INFO)


def get_pipeline_props(file_path: Union[str, Path]) -> dict:
    with file_path.open("r") as f:
        pipeline_properties = json.load(f)

    return pipeline_properties


def get_session(region: str, default_bucket: str) -> sagemaker.session.Session:
    """Gets the sagemaker session based on the region.
    Args:
        region: the aws region to start the session
        default_bucket: the bucket to use for storing the artifacts

    Returns:
        `sagemaker.session.Session instance
    """

    boto_session = boto3.Session(region_name=region)

    sagemaker_client = boto_session.client("sagemaker")
    runtime_client = boto_session.client("sagemaker-runtime")
    try:
        sagemaker.session.Session(
            boto_session=boto_session,
            sagemaker_client=sagemaker_client,
            sagemaker_runtime_client=runtime_client,
            default_bucket=default_bucket,
        )
        logger.info("SageMaker Session created")
    except:
        logger.exception("Failed to generate a SageMaker Session")

    return sagemaker.session.Session(
        boto_session=boto_session,
        sagemaker_client=sagemaker_client,
        sagemaker_runtime_client=runtime_client,
        default_bucket=default_bucket,
    )


def generate_pipeline_definition(
    role: str,
    region: str,
    default_bucket: str,
    pipeline_name: str,
    pipeline_conf: dict,
    code_file_path: Union[str, Path],
) -> str:
    """Generates a SageMaker pipeline definition

    Args:
        role (str): ARN of the role assumed by the pipeline steps
        region (str): region
        default_bucket (str): deafult bucket to upload artifacts
        pipeline_name (str): name to give to the pipeline
        pipeline_conf (dict): configuration of the pipeline

    Returns:
        [str]: pipeline definition as a json object
    """
    if not isinstance(code_file_path, Path):
        code_file_path = Path(code_file_path)
    try:
        module = importlib.import_module(
            "." + code_file_path.stem, package=code_file_path.parent.as_posix()
        )
        logger.info("Loading the pipeline definition module")
    except:
        logger.exception("Failed to load the Pipeline definion module")
        return

    logger.info("Creating SageMaker Session")
    sm_session = get_session(region=region, default_bucket=default_bucket)
    logger.info(
        f"Creating SageMaker Pipeline definition. Artifacts to be uploaded in {default_bucket}"
    )
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



repos/serving/infra/sm_pipeline_utils.py [11:90]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
logger = logging.getLogger()
logger.setLevel(logging.INFO)


def get_pipeline_props(file_path: Union[str, Path]) -> dict:
    with file_path.open("r") as f:
        pipeline_properties = json.load(f)

    return pipeline_properties


def get_session(region: str, default_bucket: str) -> sagemaker.session.Session:
    """Gets the sagemaker session based on the region.
    Args:
        region: the aws region to start the session
        default_bucket: the bucket to use for storing the artifacts

    Returns:
        `sagemaker.session.Session instance
    """

    boto_session = boto3.Session(region_name=region)

    sagemaker_client = boto_session.client("sagemaker")
    runtime_client = boto_session.client("sagemaker-runtime")
    try:
        sagemaker.session.Session(
            boto_session=boto_session,
            sagemaker_client=sagemaker_client,
            sagemaker_runtime_client=runtime_client,
            default_bucket=default_bucket,
        )
        logger.info("SageMaker Session created")
    except:
        logger.exception("Failed to generate a SageMaker Session")

    return sagemaker.session.Session(
        boto_session=boto_session,
        sagemaker_client=sagemaker_client,
        sagemaker_runtime_client=runtime_client,
        default_bucket=default_bucket,
    )


def generate_pipeline_definition(
    role: str,
    region: str,
    default_bucket: str,
    pipeline_name: str,
    pipeline_conf: dict,
    code_file_path: Union[str, Path],
) -> str:
    """Generates a SageMaker pipeline definition

    Args:
        role (str): ARN of the role assumed by the pipeline steps
        region (str): region
        default_bucket (str): deafult bucket to upload artifacts
        pipeline_name (str): name to give to the pipeline
        pipeline_conf (dict): configuration of the pipeline

    Returns:
        [str]: pipeline definition as a json object
    """
    if not isinstance(code_file_path, Path):
        code_file_path = Path(code_file_path)
    try:
        module = importlib.import_module(
            "." + code_file_path.stem, package=code_file_path.parent.as_posix()
        )
        logger.info("Loading the pipeline definition module")
    except:
        logger.exception("Failed to load the Pipeline definion module")
        return

    logger.info("Creating SageMaker Session")
    sm_session = get_session(region=region, default_bucket=default_bucket)
    logger.info(
        f"Creating SageMaker Pipeline definition. Artifacts to be uploaded in {default_bucket}"
    )
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



