# Build a geospatial pipeline with SageMaker Pipelines

---

This notebook's CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook. 

![This us-west-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-west-2/sagemaker-geospatial|geospatial-processing-pipeline|geospatial_pipeline_processing.ipynb)

---

## Introduction

The following notebook shows you how to build a geospatial data processing workflow with Amazon SageMaker Pipelines. SageMaker Pipelines is a purpose-built workflow orchestration service to automate all phases of machine learning from data pre-processing to model monitoring. With an intuitive UI and Python SDK you can manage repeatable end-to-end ML pipelines at scale. In this example, a workflow is created to query Sentinel-2 imagery based on a list of area of interests (AOIs) and then generate a data cube for the boundaries provided by each AOI.

## Prerequisites

This notebook runs with the Geospatial 1.0 kernel with a `ml.geospatial.interactive` instance. Note that the following policies need to be attached to the execution role that you used to run this notebook:
- AmazonSageMakerFullAccess
- AmazonSageMakerGeospatialFullAccess

You can see the policies attached to the role in the IAM console under the permissions tab. If required, add the roles using the 'Add Permissions' button.

In addition to these policies, ensure that the execution role's trust policy allows the SageMaker-GeoSpatial service to assume the role. This can be done by adding the following trust policy using the 'Trust relationships' tab:

```
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "sagemaker.amazonaws.com",
                    "sagemaker-geospatial.amazonaws.com"
                ]
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
```

### SageMaker Processing Service Quota

In this example, `ml.m5.xlarge` instances are used to execute the Processing Jobs within the pipeline. If you're running this example in a AWS hosted event environment, the quota should be already adapted and you can skip to the next step.

If you're running this example in your AWS account, you might need to request a service quota increase. You can learn more about requesting a quota increase [here](https://docs.aws.amazon.com/servicequotas/latest/userguide/request-quota-increase.html).

To request a service quota increase for this particular example, follow the steps below:

- From the AWS Console, navigate to `Service Quotas` (or follow this [link](https://us-west-2.console.aws.amazon.com/servicequotas/home?region=us-west-2))
- Click on `AWS Services` in the left nav menu
- Search for `Amazon SageMaker` in the `find services` box and click on the service item
- On the next page, search for `ml.m5.xlarge for processing job usage`
- Select the quota item and click on "Request quota increase" and enter an amount of 8 or higher

## Create a Pipeline

To orchestrate your workflows with Amazon SageMaker Model Building Pipelines, you need to generate a directed acyclic graph (DAG) in the form of a JSON pipeline definition. You can generate the JSON pipeline definition using the SageMaker Python SDK. The following steps show how to generate a pipeline definition for a pipeline that uses SageMaker Processing jobs to query and preprocess Sentinel-2 data.

### Import SageMaker SDK and dependencies

In [None]:
import sagemaker
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import ScriptProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.functions import Join

sagemaker_session = sagemaker.Session()
execution_role = get_execution_role()

geospatial_image_uri = (
    "081189585635.dkr.ecr.us-west-2.amazonaws.com/sagemaker-geospatial-v1-0:latest"
)

### Define Parameters to Parametrize Pipeline Execution

You can define pipeline parameters to enable customization of pipeline executions and scheduling without needing to alter the pipeline definition itself. Parameters allow for flexible pipeline executions by setting customizable options.

The types of parameters supported are:

- `ParameterString`, representing the Python `str` type.
- `ParameterInteger`, representing the Python `int` type.
- `ParameterFloat`, representing the Python `float` type.

These parameters allow for a default value to be provided, which can be overridden during pipeline execution. The default value should match the parameter type.

The parameters included in this workflow are:

- `start_date`: The start date for the time range of interest in ISO representation
- `end_date`: The end date for the time range of interest in ISO representation
- `object_boundaries_s3_location`: The S3 bucket URI containing the AOI data (object boundaries)
- `object_boundaries_file_name`: The name of the file containing the AOI data (object boundaries), with "object_boundaries.json" as default setting
- `s3_bucket_output_data`: The S3 bucket name for the output data, with the SageMaker default bucket as default setting

In [None]:
param_start_date = ParameterString("start_date")
param_end_date = ParameterString("end_date")
param_object_boundaries_file_name = ParameterString(
    "object_boundaries_file_name", default_value="object_boundaries.json"
)
param_object_boundaries_s3_location = ParameterString("object_boundaries_s3_location")
param_s3_bucket_output_data = ParameterString(
    "s3_bucket_output_data", default_value=sagemaker_session.default_bucket()
)

### Define a Processing Step for querying Sentinel-2 data

The following cell writes a file `fetch_aoi_meta_data.py`, which contains a script to collect Sentinel-2 meta based on the AOIs provided in the object boundary file. You can update the script, and rerun this cell to overwrite.

In [None]:
%%writefile fetch_aoi_meta_data.py

import os
import pickle
import sys
import subprocess
import json
import time
import geopandas
import shapely
import shapely.geometry
from shapely.ops import unary_union
import logging
from datetime import datetime, timedelta
import boto3


def get_logger(log_level):
    logger = logging.getLogger("processing")

    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
    console_handler.setLevel(log_level)

    logger.addHandler(console_handler)
    logger.setLevel(log_level)
    return logger


def parse_date(date_str):
    # string starts with '-', assumes delta in days (e.g. -1 for yesterday)
    if date_str.startswith("-"):
        days_delta = int(date_str) * -1
        target_date = datetime.today() - timedelta(days=days_delta)
        date_str = target_date.strftime("%Y-%m-%d")
    # convert to datetime to validate format
    date_time_obj = datetime.strptime(date_str, "%Y-%m-%d")
    return date_str


def get_date_range(args):
    start_date = parse_date(args[2].strip())
    end_date = parse_date(args[3].strip())
    return start_date, end_date


def get_s2_items(objects_gdf, start_date, end_date):
    session = boto3.Session()
    geospatial_client = session.client(service_name="sagemaker-geospatial", region_name="us-west-2")

    s2_items = {}

    for i, row in objects_gdf.iterrows():
        aoi_geometry = row["geometry"]
        if type(row["geometry"]) == shapely.geometry.multipolygon.MultiPolygon:
            aoi_geometry = unary_union(row["geometry"])

        bbox = aoi_geometry.bounds
        aoi_bbox = shapely.geometry.box(*bbox, ccw=True)

        search_params = {
            "Arn": "arn:aws:sagemaker-geospatial:us-west-2:378778860802:raster-data-collection/public/nmqj48dcu3g7ayw8",  # Sentinel-2 L2A data
            "RasterDataCollectionQuery": {
                "AreaOfInterest": {
                    "AreaOfInterestGeometry": {
                        "PolygonGeometry": {
                            "Coordinates": shapely.geometry.mapping(aoi_bbox)["coordinates"]
                        }
                    }
                },
                "TimeRangeFilter": {
                    "StartTime": f"{start_date}T00:00:00Z",
                    "EndTime": f"{end_date}T23:59:59Z",
                },
                "PropertyFilters": {
                    "Properties": [
                        {"Property": {"EoCloudCover": {"LowerBound": 0.0, "UpperBound": 50.0}}}
                    ],
                    "LogicalOperator": "AND",
                },
            },
        }

        next_token = True
        item_count = 0
        while next_token:
            search_result = geospatial_client.search_raster_data_collection(**search_params)
            for item in search_result["Items"]:
                if item["Id"] not in s2_items:
                    s2_items[item["Id"]] = {"objects": [], "data": item}
                s2_items[item["Id"]]["objects"].append(row["objectid"])
                item_count += len(search_result["Items"])
            next_token = search_result.get("NextToken")
            search_params["NextToken"] = next_token

        logger.debug("Found {} items for object {}".format(item_count, row["objectid"]))

    return s2_items


if __name__ == "__main__":
    logger = get_logger(logging.INFO)

    logger.info("Starting processing")
    logger.debug(f"Argument List: {str(sys.argv)}")

    object_boundaries_file_name = sys.argv[1]
    start_date, end_date = get_date_range(sys.argv)

    logger.info(f"Executing for date range: [{start_date}, {end_date}]")

    sys.stdout.flush()

    objects_gdf = geopandas.read_file(
        f"/opt/ml/processing/input/objects/{object_boundaries_file_name}"
    )
    s2_items = get_s2_items(objects_gdf, start_date, end_date)

    logger.info(
        "Found {} Sentinel-2 tiles from {} to {}".format(len(s2_items), start_date, end_date)
    )

    output_path = "/opt/ml/processing/output"

    for scene_id, item in s2_items.items():
        if "_0_L2A" not in scene_id:
            continue
        output_file_path = f"{output_path}/{scene_id}.json"
        item["objects"] = list(set(item["objects"]))
        with open(output_file_path, "w", encoding="utf8") as f:
            json.dump(item, f, default=str)
            logger.debug(f"Written output: {output_file_path}")

    logger.info("Written all outputs")

This Processing step executes the script on the file containing the AOIs and the date range provided as pipeline parameters. The output will be written to S3 and passed to the next Processing step as an input.

In [None]:
processor = ScriptProcessor(
    command=["python3"],
    image_uri=geospatial_image_uri,
    role=execution_role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
)

step_process_fetch_data = ProcessingStep(
    name="data-fetch",
    processor=processor,
    code="fetch_aoi_meta_data.py",
    inputs=[
        ProcessingInput(
            source=param_object_boundaries_s3_location,
            destination="/opt/ml/processing/input/objects/",
            s3_data_distribution_type="FullyReplicated",
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="scene_metadata",
            source="/opt/ml/processing/output/",
            destination=Join(
                on="/",
                values=[
                    "s3:/",
                    param_s3_bucket_output_data,
                    "processing-geospatial-pipeline-example",
                    ExecutionVariables.PIPELINE_EXECUTION_ID,
                    "output/scene_metadata",
                ],
            ),
        )
    ],
    job_arguments=[param_object_boundaries_file_name, param_start_date, param_end_date],
)

### Define Processing Step to generate data cubes

For the second Processing step, the next cell writes the file `generate_data_cube_clipped.py`. This script will use the inputs from the first Processing step and generate a data cube, including RGB and near infrared (NIR) bands. In addition, the Normalized difference vegetation index (NDVI) will be computed and added to the data cube. For each AOI, a dedicated data cube is created, clipped to the boundaries for the corresponding object.

In [None]:
%%writefile generate_data_cube_clipped.py

import os
import pickle
import sys
import subprocess
import warnings
import json
import time
import geopandas
import pandas as pd
import numpy as np
import shapely
from shapely.geometry import shape
import xarray as xr
import rioxarray
from rioxarray.exceptions import NoDataInBounds
import gc
import logging
import datetime

MAX_CLOUD_COVER_AOI = 0.3

# Cloud mask values:
# 8 - Cloud medium probability
# 9 - Cloud high probability
# 10 - Thin cirrus
# For details see here: https://sentinels.copernicus.eu/web/sentinel/technical-guides/sentinel-2-msi/level-2a/algorithm-overview
SCL_CLOUD_MASK_CLASSES = [8, 9, 10]
SCL_MASK_CLOUD_FREE_CLASSES = [x for x in list(range(0, 12)) if x not in SCL_CLOUD_MASK_CLASSES]


def get_logger(log_level):
    logger = logging.getLogger("processing")

    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
    console_handler.setLevel(log_level)

    logger.addHandler(console_handler)
    logger.setLevel(log_level)
    return logger


def s2_scene_id_to_cog_path(scene_id):
    parts = scene_id.split("_")
    s2_qualifier = "{}/{}/{}/{}/{}/{}".format(
        parts[1][0:2],
        parts[1][2],
        parts[1][3:5],
        parts[2][0:4],
        str(int(parts[2][4:6])),
        "_".join(parts),
    )
    return f"https://sentinel-cogs.s3.us-west-2.amazonaws.com/sentinel-s2-l2a-cogs/{s2_qualifier}/"


def scene_id_to_datetime(scene_id):
    dt = pd.to_datetime(scene_id.split("_")[-3])
    return dt


def get_aoi_cloud_free_ratio(scl_raster, aoi):
    kwargs = {"nodata": np.nan}
    scl_raster = scl_raster.rio.reproject("EPSG:4326", **kwargs)
    # clip to AOI
    scl_raster_clipped = scl_raster.rio.clip(geometries=aoi)
    # get cloud-free ratio
    scl_mask_pixel_count = scl_raster_clipped.SCL.data.size - np.count_nonzero(
        np.isnan(scl_raster_clipped.SCL.data)
    )  # get size of SCL mask in num pixels (excl. any nans)
    scl_cloud_free_pixel_count = np.isin(
        scl_raster_clipped.SCL.data, SCL_MASK_CLOUD_FREE_CLASSES
    ).sum()  # count pixels that are non-cloud class
    cloud_free_ratio = scl_cloud_free_pixel_count / scl_mask_pixel_count

    return cloud_free_ratio


if __name__ == "__main__":
    logger = get_logger(logging.INFO)

    logger.info("Starting processing")
    logger.debug(f"Argument List: {str(sys.argv)}")

    sys.stdout.flush()

    object_boundaries_file_name = sys.argv[1]
    objects_gdf = geopandas.read_file(
        f"/opt/ml/processing/input/objects/{object_boundaries_file_name}"
    )

    scene_meta_data_path = "/opt/ml/processing/input/scene_meta_data/"
    scene_meta_items = []
    for current_path, sub_dirs, files in os.walk(scene_meta_data_path):
        for file in files:
            if file.endswith(".json"):
                full_file_path = os.path.join(scene_meta_data_path, current_path, file)
                with open(full_file_path, "r") as f:
                    scene_meta_items.append(json.load(f))

    item_count_total = len(scene_meta_items)
    item_count_current = 0
    elapsed_time_batch = 0
    logger.info("Received {} scenes to process".format(item_count_total))

    for scene_meta_item in scene_meta_items:
        if item_count_current > 0 and item_count_current % 5 == 0:
            logger.info(
                "Processed {}/{} scenes ({}s per scene)".format(
                    item_count_current,
                    item_count_total,
                    round(elapsed_time_batch / item_count_current, 2),
                )
            )
        item_count_current += 1

        item = scene_meta_item["data"]
        logger.debug("Processing scene: {}".format(item["Id"]))

        start = time.time()

        s2_scene_id = item["Id"]
        s2_cog_prefix = s2_scene_id_to_cog_path(s2_scene_id)
        date = scene_id_to_datetime(s2_scene_id)

        band_ids = [
            "B02",
            "B03",
            "B04",
            "B08",
        ]

        bands = []
        for band_id in band_ids:
            band_data = rioxarray.open_rasterio(
                f"{s2_cog_prefix}/{band_id}.tif", masked=True, band_as_variable=True
            )
            band_data = band_data.rename(name_dict={"band_1": band_id})
            bands.append(band_data)

        scl_mask = rioxarray.open_rasterio(
            f"{s2_cog_prefix}/SCL.tif", masked=True, band_as_variable=True
        )
        scl_mask = scl_mask.rename(name_dict={"band_1": "SCL"})
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            scl_mask = scl_mask.interp(x=bands[0]["x"], y=bands[0]["y"])

        bands.append(scl_mask)
        s2_cube = xr.merge(objects=bands)
        del bands
        gc.collect()

        # assign time dimension
        s2_cube = s2_cube.assign_coords(time=date)  # call this 'time'
        # reproject to EPSG:4326
        kwargs = {"nodata": np.nan}
        s2_cube = s2_cube.rio.reproject("EPSG:4326", **kwargs)

        # loop over objects that intersect with s2 item
        intersect_gdf = objects_gdf.loc[objects_gdf["objectid"].isin(scene_meta_item["objects"])]
        for index, row in intersect_gdf.iterrows():
            object_id = row["objectid"]

            # check cloud-free ratio
            geometries = [row["geometry"]]
            cloud_free_ratio = get_aoi_cloud_free_ratio(scl_mask, geometries)
            if (1 - float(cloud_free_ratio)) > MAX_CLOUD_COVER_AOI:
                logger.debug(
                    f"AOI cloud cover ratio too high ({round(1-cloud_free_ratio,3)}), skipping object {object_id} in scene {s2_scene_id}..."
                )
                del cloud_free_ratio
            else:
                logger.debug(
                    f"AOI cloud cover ratio below threshold ({round(1-cloud_free_ratio,3)}), processing object {object_id} in scene {s2_scene_id}..."
                )
                try:
                    clipped = s2_cube.rio.clip(geometries=geometries)
                except NoDataInBounds as e:
                    logger.warn(
                        "Skipping {} in {}: no data in bounds".format(object_id, s2_scene_id)
                    )
                    continue

                clipped_cloud_free = clipped.where(clipped.SCL.isin(SCL_MASK_CLOUD_FREE_CLASSES))
                # calculate index and add back to the original data cube
                clipped["NDVI"] = (clipped_cloud_free.B08 - clipped_cloud_free.B04) / (
                    clipped_cloud_free.B08 + clipped_cloud_free.B04
                )

                file_name = f"{object_id}-{s2_scene_id}.nc"
                output_file_path = f"/opt/ml/processing/output/{file_name}"

                clipped.to_netcdf(output_file_path)

                logger.debug(f"Written output: {output_file_path}")

                del clipped
                del geometries
                del cloud_free_ratio
                gc.collect()

        # explicit dereference to keep memory usage low
        del s2_cube
        del scl_mask
        sys.stdout.flush()
        gc.collect()

        elapsed_time = time.time() - start
        elapsed_time_batch += elapsed_time

        logger.debug("Processed scene {}: {}s".format(s2_scene_id, elapsed_time))

The second Processing step will depend on the first step and its input. The second step will perform the computations and write the preprocessed data to S3.

In [None]:
processor = ScriptProcessor(
    command=["python3"],
    image_uri=geospatial_image_uri,
    role=execution_role,
    instance_count=8,
    instance_type="ml.m5.xlarge",
)

step_process_gen_data_cube = ProcessingStep(
    name="generate-data-cube-clip",
    processor=processor,
    code="generate_data_cube_clipped.py",
    inputs=[
        ProcessingInput(
            source=param_object_boundaries_s3_location,
            destination="/opt/ml/processing/input/objects/",
            s3_data_distribution_type="FullyReplicated",
        ),
        ProcessingInput(
            source=Join(
                on="/",
                values=[
                    "s3:/",
                    param_s3_bucket_output_data,
                    "processing-geospatial-pipeline-example",
                    ExecutionVariables.PIPELINE_EXECUTION_ID,
                    "output/scene_metadata",
                ],
            ),
            destination="/opt/ml/processing/input/scene_meta_data/",
            s3_data_distribution_type="ShardedByS3Key",
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="data_cube_clipped",
            source="/opt/ml/processing/output/",
            destination=Join(
                on="/",
                values=[
                    "s3:/",
                    param_s3_bucket_output_data,
                    "processing-geospatial-pipeline-example",
                    ExecutionVariables.PIPELINE_EXECUTION_ID,
                    "output/processed",
                ],
            ),
        )
    ],
    job_arguments=[param_object_boundaries_file_name],
    depends_on=[step_process_fetch_data],
)

### Define the Pipeline layout

In this section, the previously created steps and parameters will be combined into a Pipeline so it can be executed.

A pipeline requires a `name`, `parameters`, and `steps`. The name of a pipeline must be unique within an `(account, region)` pair.

In [None]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_steps = [step_process_fetch_data, step_process_gen_data_cube]

pipeline_parameters = [
    param_start_date,
    param_end_date,
    param_object_boundaries_file_name,
    param_object_boundaries_s3_location,
    param_s3_bucket_output_data,
]

pipeline = Pipeline(
    name="processing-geospatial-pipeline",
    parameters=pipeline_parameters,
    steps=pipeline_steps,
)

### (Optional) Examining the pipeline definition

The JSON of the pipeline definition can be examined to confirm the pipeline is well-defined and the parameters and step properties resolve correctly.

In [None]:
import json


definition = json.loads(pipeline.definition())
definition

### Upsert Pipeline to persist definition

Submit the pipeline definition to the Pipeline service. The Pipeline service uses the role that is passed in to create all the jobs defined in the steps.

In [None]:
pipeline.upsert(role_arn=execution_role)

After the pipeline has been created, you are able to inspect the created pipeline.

For this, you can navigate to the SageMaker Studio Resources tab in the left menu and click on `Pipelines`.

You should be able to see the "processing-geospatial-pipeline" in the list. You can click on it and then navigate to the `Graph` tab to see a visual representation of the created pipeline.

![SageMaker Pipeline](images/processing-geospatial-pipeline.png)

## Execute the Pipeline

After the pipeline has been created, a pipeline execution can be triggered either via the Pipeline UI, or by using either the Python SDK or the service API via boto3.

Before an execution is submitted, we'll upload an example set of AOI data to S3 which serves as an input for the created pipeline.

In [None]:
import geopandas

gdf = geopandas.read_file("data/object_boundaries.json")
gdf

In [None]:
import boto3
import json

bucket_name = sagemaker_session.default_bucket()
file_name_object_boundaries = "object_boundaries.json"
bucket_prefix_input_object_boundaries = f"processing-geospatial-pipeline-example/input/aoi"

# upload crop field boundaries to S3
s3 = boto3.resource("s3")
s3object = s3.Object(
    bucket_name, f"{bucket_prefix_input_object_boundaries}/{file_name_object_boundaries}"
)
response = s3object.put(Body=open(f"data/{file_name_object_boundaries}", "rb"))

### (Optional) Visualize the AOI input data

The following cells will create an interactive map with the Amazon SageMaker geospatial Map SDK. The input data in the geopandas dataframe will be visualized in the embedded map.

In [None]:
import boto3
import sagemaker_geospatial_map

session = boto3.Session()
geospatial_client = session.client(service_name="sagemaker-geospatial")

Map = sagemaker_geospatial_map.create_map({"is_raster": True})
Map.set_sagemaker_geospatial_client(geospatial_client)

In [None]:
Map.render()

In [None]:
dataset = Map.add_dataset(
    {"data": gdf, "label": "Object boundaries (AOIs)"}, auto_create_layers=True
)

### Start a pipeline execution

After the input data has been uploaded to S3, an execution of the Pipeline can be triggered by providing the mandatory Pipeline parameters and invoke the `pipeline.start` function.

In [None]:
pipeline_execution_parameters = {
    "start_date": "2017-07-01",
    "end_date": "2018-10-01",
    "object_boundaries_s3_location": f"s3://{bucket_name}/{bucket_prefix_input_object_boundaries}/",
}

execution = pipeline.start(parameters=pipeline_execution_parameters)

After the pipeline execution has been started, you can see it as well in the Pipelines UI.

Navigate back to the "processing-geospatial-pipeline" in the Pipelines UI and you can see the execution in the `Executions` tab. You can double-click it to see the details of this execution.

![Pipeline Execution](images/pipeline-execution.png)

### (Optional) Alternative way to execute the created Pipeline

Apart from the Pipeline Python SDK, you can also execute a Pipeline by using the boto3 library. Comment out the cell below to trigger a Pipeline execution via boto3.

In [None]:
"""
import boto3
import time

session = boto3.Session()
sagemaker_client = session.client(service_name="sagemaker")

pipeline_execution_parameters = {
    "start_date": "2017-07-01",
    "end_date": "2018-10-01",
    "object_boundaries_s3_location": f"s3://{bucket_name}/{bucket_prefix_input_object_boundaries}/",
}

# transform the parameter dictionary into a list format
pipeline_execution_parameters_list = []
for key, value in pipeline_execution_parameters.items():
    pipeline_execution_parameters_list.append({"Name": key, "Value": value})

response = sagemaker_client.start_pipeline_execution(
    PipelineName="processing-geospatial-pipeline",
    PipelineExecutionDisplayName=f"execution-{int(time.time())}",
    PipelineParameters=pipeline_execution_parameters_list,
)
"""

### Pipeline Operations: Examining and Waiting for Pipeline Execution

After the execution has been started, you can examine the state of execution with the following command.

In [None]:
execution.describe()

By default, the Pipeline execution is not blocking the notebook execution. If you want to wait and block the notebook execution until the Pipeline execution is finished, you can use the `execution.wait()` function. Execute the next cell to be able to follow the further instructions in this notebook.

In [None]:
execution.wait()

List the steps in the execution. These are the steps in the pipeline that have been resolved by the step executor service.

In [None]:
execution.list_steps()

### Inspecting Pipeline execution results

After the Pipeline execution has been finished, we can access the meta data of the underlying Processing jobs and identify the S3 output path of the job.

In [None]:
execution.list_steps()[1]["Metadata"]["ProcessingJob"]["Arn"]

In [None]:
import boto3

session = boto3.Session()
sagemaker_client = session.client(service_name="sagemaker")

output_job_name = execution.list_steps()[0]["Metadata"]["ProcessingJob"]["Arn"].split("/")[-1]
job_descriptor = sagemaker_client.describe_processing_job(ProcessingJobName=output_job_name)

s3_output_uri = job_descriptor["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
s3_output_uri

We'll load the generated data cubes from S3 into memory for a specific AOI (the AOI of object 1), and will inspect the generated data cube.

In [None]:
output_bucket_name = s3_output_uri.split("/")[2]
output_bucket_prefix = "/".join(s3_output_uri.split("/")[3:])

s3_bucket = session.resource("s3").Bucket(output_bucket_name)

raster_keys = []
for s3_object in s3_bucket.objects.filter(Prefix=output_bucket_prefix).all():
    filename = str(s3_object.key).split("/")[-1]
    if filename.startswith("1-"):
        raster_keys.append(str(s3_object.key))

In [None]:
!pip install -q --root-user-action=ignore s3fs

In [None]:
import xarray as xr
import s3fs

fs = s3fs.S3FileSystem(anon=False)

rasters = []
for raster_key in raster_keys:
    s3_path = f"s3://{output_bucket_name}/{raster_key}"
    with fs.open(s3_path) as file_obj:
        try:
            with xr.open_dataset(file_obj, engine="h5netcdf", decode_coords="all") as raster:
                rasters.append(raster.load())
        except ValueError:
            print("Error while loading {}. Skipping.".format(raster_key))

data_cube = xr.concat(objs=rasters, coords="minimal", dim="time", join="outer")
data_cube = data_cube.sortby("time")

In [None]:
data_cube

In [None]:
data_cube.NDVI.sel(time=data_cube.time.min()).plot(cmap="RdYlGn", vmin=0, vmax=1)

In [None]:
data_cube.NDVI.sel(time=data_cube.time.max()).plot(cmap="RdYlGn", vmin=0, vmax=1)

In [None]:
import matplotlib.pyplot as plt

# plot ndvi timeseries for midpoint of field
midpoint_x = data_cube["x"][round((len(data_cube["x"])) / 2)]
midpoint_y = data_cube["y"][round((len(data_cube["y"])) / 2)]

plt.plot(data_cube.time, data_cube.NDVI.sel(x=midpoint_x, y=midpoint_y), "-o")

## Notebook CI Test Results

This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.

![This us-east-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://prod.us-west-2.tcx-beacon.docs.aws.dev/sagemaker-nb/us-east-1/sagemaker-geospatial|geospatial-processing-pipeline|geospatial_pipeline_processing.ipynb)

![This us-east-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://prod.us-west-2.tcx-beacon.docs.aws.dev/sagemaker-nb/us-east-2/sagemaker-geospatial|geospatial-processing-pipeline|geospatial_pipeline_processing.ipynb)

![This us-west-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://prod.us-west-2.tcx-beacon.docs.aws.dev/sagemaker-nb/us-west-1/sagemaker-geospatial|geospatial-processing-pipeline|geospatial_pipeline_processing.ipynb)

![This ca-central-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://prod.us-west-2.tcx-beacon.docs.aws.dev/sagemaker-nb/ca-central-1/sagemaker-geospatial|geospatial-processing-pipeline|geospatial_pipeline_processing.ipynb)

![This sa-east-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://prod.us-west-2.tcx-beacon.docs.aws.dev/sagemaker-nb/sa-east-1/sagemaker-geospatial|geospatial-processing-pipeline|geospatial_pipeline_processing.ipynb)

![This eu-west-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://prod.us-west-2.tcx-beacon.docs.aws.dev/sagemaker-nb/eu-west-1/sagemaker-geospatial|geospatial-processing-pipeline|geospatial_pipeline_processing.ipynb)

![This eu-west-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://prod.us-west-2.tcx-beacon.docs.aws.dev/sagemaker-nb/eu-west-2/sagemaker-geospatial|geospatial-processing-pipeline|geospatial_pipeline_processing.ipynb)

![This eu-west-3 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://prod.us-west-2.tcx-beacon.docs.aws.dev/sagemaker-nb/eu-west-3/sagemaker-geospatial|geospatial-processing-pipeline|geospatial_pipeline_processing.ipynb)

![This eu-central-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://prod.us-west-2.tcx-beacon.docs.aws.dev/sagemaker-nb/eu-central-1/sagemaker-geospatial|geospatial-processing-pipeline|geospatial_pipeline_processing.ipynb)

![This eu-north-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://prod.us-west-2.tcx-beacon.docs.aws.dev/sagemaker-nb/eu-north-1/sagemaker-geospatial|geospatial-processing-pipeline|geospatial_pipeline_processing.ipynb)

![This ap-southeast-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://prod.us-west-2.tcx-beacon.docs.aws.dev/sagemaker-nb/ap-southeast-1/sagemaker-geospatial|geospatial-processing-pipeline|geospatial_pipeline_processing.ipynb)

![This ap-southeast-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://prod.us-west-2.tcx-beacon.docs.aws.dev/sagemaker-nb/ap-southeast-2/sagemaker-geospatial|geospatial-processing-pipeline|geospatial_pipeline_processing.ipynb)

![This ap-northeast-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://prod.us-west-2.tcx-beacon.docs.aws.dev/sagemaker-nb/ap-northeast-1/sagemaker-geospatial|geospatial-processing-pipeline|geospatial_pipeline_processing.ipynb)

![This ap-northeast-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://prod.us-west-2.tcx-beacon.docs.aws.dev/sagemaker-nb/ap-northeast-2/sagemaker-geospatial|geospatial-processing-pipeline|geospatial_pipeline_processing.ipynb)

![This ap-south-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://prod.us-west-2.tcx-beacon.docs.aws.dev/sagemaker-nb/ap-south-1/sagemaker-geospatial|geospatial-processing-pipeline|geospatial_pipeline_processing.ipynb)
