data-preparation/sagemaker-processing/local_pyspark_example.ipynb (596 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Run PySpark locally on SageMaker Studio\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"---\n",
"\n",
"This notebook's CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook. \n",
"\n",
"\n",
"\n",
"---"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"This notebook shows you how to run PySpark code locally within a SageMaker Studio notebook. The dependencies are installed in the notebook, so you can run this notebook on any image/kernel, including BYO images. For this example, you can choose the Data Science image and Python 3 kernel."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# import sagemaker SDK\n",
"import sagemaker\n",
"\n",
"print(sagemaker.__version__)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"# setup - install JDK\n",
"# you only need to run this once per KernelApp\n",
"%conda install openjdk -y"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"# install PySpark\n",
"%pip install pyspark==3.1.1"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# import PySpark and build Spark session\n",
"from pyspark.sql import SparkSession\n",
"\n",
"spark = (\n",
" SparkSession.builder.appName(\"PySparkApp\")\n",
" .config(\"spark.jars.packages\", \"org.apache.hadoop:hadoop-aws:3.2.2\")\n",
" .config(\n",
" \"fs.s3a.aws.credentials.provider\",\n",
" \"com.amazonaws.auth.ContainerCredentialsProvider\",\n",
" )\n",
" .getOrCreate()\n",
")\n",
"\n",
"print(spark.version)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> 1. If you see an exception in running the cell above similar to this - `Exception: Java gateway process exited before sending the driver its port number`, restart your JupyterServer app to make sure you're on the latest version of Studio. \n",
"> 2. If you are running this notebook in a SageMaker Studio notebook, run the above cell as-is. If you are running on a SageMaker notebook instance, replace `com.amazonaws.auth.ContainerCredentialsProvider` with `com.amazonaws.auth.InstanceProfileCredentialsProvider`."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create and run user-defined functions\n",
"\n",
"Now that you have installed PySpark and initiated a Spark session, let's try out a couple of sample Pandas user defined functions (UDF)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql.types import *\n",
"from pyspark.sql.functions import (\n",
" col,\n",
" count,\n",
" rand,\n",
" collect_list,\n",
" explode,\n",
" struct,\n",
" count,\n",
" lit,\n",
")\n",
"from pyspark.sql.functions import pandas_udf, PandasUDFType\n",
"\n",
"# generate random data\n",
"df = (\n",
" spark.range(0, 10 * 100 * 100)\n",
" .withColumn(\"id\", (col(\"id\") / 100).cast(\"integer\"))\n",
" .withColumn(\"v\", rand())\n",
")\n",
"df.cache()\n",
"df.count()\n",
"\n",
"df.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# sample pandas udf to return squared value\n",
"@pandas_udf(\"double\", PandasUDFType.SCALAR)\n",
"def pandas_squared(v):\n",
" return v * v\n",
"\n",
"\n",
"df.withColumn(\"v2\", pandas_squared(df.v))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In this next example, you'll run Ordinary least squares (OLS) linear regression by group using [statsmodels](https://www.statsmodels.org/stable/examples/notebooks/generated/ols.html)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df2 = (\n",
" df.withColumn(\"y\", rand())\n",
" .withColumn(\"x1\", rand())\n",
" .withColumn(\"x2\", rand())\n",
" .select(\"id\", \"y\", \"x1\", \"x2\")\n",
")\n",
"df2.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import statsmodels.api as sm\n",
"\n",
"group_column = \"id\"\n",
"y_column = \"y\"\n",
"x_columns = [\"x1\", \"x2\"]\n",
"schema = df2.select(group_column, *x_columns).schema"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# sample UDF with input and output data frames\n",
"@pandas_udf(schema, PandasUDFType.GROUPED_MAP)\n",
"def ols(pdf):\n",
" group_key = pdf[group_column].iloc[0]\n",
" y = pdf[y_column]\n",
" X = pdf[x_columns]\n",
" X = sm.add_constant(X)\n",
" model = sm.OLS(y, X).fit()\n",
" return pd.DataFrame(\n",
" [[group_key] + [model.params[i] for i in x_columns]],\n",
" columns=[group_column] + x_columns,\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# run ols grouped by the \"id\" group column\n",
"beta = df2.groupby(group_column).apply(ols)\n",
"beta.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Run Spark processing scripts locally\n",
"\n",
"You can run Spark processing scripts on your notebook like below. You'll read the sample `abalone` dataset from an S3 location and perform preprocessing on the dataset. You will - \n",
"1. Apply transforms on the data such as one-hot encoding, merge columns to a single vector\n",
"2. Create a preprocessing pipeline\n",
"3. Fit and transform the dataset\n",
"4. Split into a training and validation set\n",
"5. Save the files to local storage"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql.types import (\n",
" DoubleType,\n",
" StringType,\n",
" StructField,\n",
" StructType,\n",
")\n",
"from pyspark.ml.feature import (\n",
" OneHotEncoder,\n",
" StringIndexer,\n",
" VectorAssembler,\n",
" VectorIndexer,\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"schema = StructType(\n",
" [\n",
" StructField(\"sex\", StringType(), True),\n",
" StructField(\"length\", DoubleType(), True),\n",
" StructField(\"diameter\", DoubleType(), True),\n",
" StructField(\"height\", DoubleType(), True),\n",
" StructField(\"whole_weight\", DoubleType(), True),\n",
" StructField(\"shucked_weight\", DoubleType(), True),\n",
" StructField(\"viscera_weight\", DoubleType(), True),\n",
" StructField(\"shell_weight\", DoubleType(), True),\n",
" StructField(\"rings\", DoubleType(), True),\n",
" ]\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"data_uri = \"s3a://sagemaker-sample-files/datasets/tabular/uci_abalone/abalone.csv\"\n",
"\n",
"abalone_df = spark.read.csv(data_uri, header=False, schema=schema)\n",
"abalone_df.show(2)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# StringIndexer on the sex column which has categorical value\n",
"sex_indexer = StringIndexer(inputCol=\"sex\", outputCol=\"indexed_sex\")\n",
"\n",
"# one-hot encoding on the string-indexed sex column (indexed_sex)\n",
"sex_encoder = OneHotEncoder(inputCol=\"indexed_sex\", outputCol=\"sex_vec\")\n",
"\n",
"# vector-assembler will bring all the features to a 1D vector to save easily into CSV format\n",
"assembler = VectorAssembler(\n",
" inputCols=[\n",
" \"sex_vec\",\n",
" \"length\",\n",
" \"diameter\",\n",
" \"height\",\n",
" \"whole_weight\",\n",
" \"shucked_weight\",\n",
" \"viscera_weight\",\n",
" \"shell_weight\",\n",
" ],\n",
" outputCol=\"features\",\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pyspark.ml import Pipeline\n",
"\n",
"pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler])\n",
"model = pipeline.fit(abalone_df)\n",
"\n",
"# apply transforms to the data frame\n",
"transformed_df = model.transform(abalone_df)\n",
"transformed_df.show(2)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"# split into train and test set, and save to file\n",
"(train_df, validation_df) = transformed_df.randomSplit([0.8, 0.2])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# write features to csv\n",
"from pyspark.ml.functions import vector_to_array\n",
"\n",
"# extract only rings and features\n",
"train_df_final = train_df.withColumn(\"feature\", vector_to_array(\"features\")).select(\n",
" [\"rings\"] + [col(\"feature\")[i] for i in range(9)]\n",
")\n",
"\n",
"val_df_final = validation_df.withColumn(\"feature\", vector_to_array(\"features\")).select(\n",
" [\"rings\"] + [col(\"feature\")[i] for i in range(9)]\n",
")\n",
"\n",
"# write to csv\n",
"train_df_final.write.csv(\"train\")\n",
"val_df_final.write.csv(\"validation\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Print the first five rows of the preprocessed output file."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import pandas as pd\n",
"\n",
"files = os.listdir(\"./train\")\n",
"file_name = [f for f in files if f.endswith(\".csv\")]\n",
"\n",
"print(\"Top 5 rows from the train file\")\n",
"pd.read_csv(f\"./train/{file_name[0]}\", header=None).head(5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Run the script as a SageMaker processing job\n",
"\n",
"Once experimentation is complete, you can run the script as a SageMaker processing job. SageMaker processing jobs let you perform data pre-processing, post-processing, feature engineering, and data validation on infrastructure fully managed by SageMaker. \n",
"\n",
"`./code/preprocess.py` script adds the preprocessing we've done above locally to a script that can be used to run a standalone processing job. Let's view the file contents below."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"!pygmentize ./code/preprocess.py"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We'll now use the `PySparkProcessor` class to define a Spark job and run it using SageMaker processing. For detailed reference, see [Data Processing with Spark](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_processing.html#data-processing-with-spark)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import logging\n",
"from time import strftime, gmtime\n",
"from sagemaker.session import Session\n",
"from sagemaker.spark.processing import PySparkProcessor\n",
"from sagemaker.processing import ProcessingInput, ProcessingOutput\n",
"\n",
"sagemaker_logger = logging.getLogger(\"sagemaker\")\n",
"sagemaker_logger.setLevel(logging.INFO)\n",
"sagemaker_logger.addHandler(logging.StreamHandler())\n",
"\n",
"sagemaker_session = Session()\n",
"bucket = sagemaker_session.default_bucket()\n",
"role = sagemaker.get_execution_role()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"# fetch the dataset from the SageMaker bucket\n",
"import boto3\n",
"\n",
"s3 = boto3.client(\"s3\")\n",
"s3.download_file(\n",
" f\"sagemaker-sample-files\", \"datasets/tabular/uci_abalone/abalone.csv\", \"abalone.csv\"\n",
")\n",
"\n",
"# upload the raw input dataset to a unique S3 location\n",
"timestamp_prefix = strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n",
"prefix = \"sagemaker/local-pyspark/{}\".format(timestamp_prefix)\n",
"input_prefix_abalone = \"{}/abalone-preprocess/input\".format(prefix)\n",
"input_preprocessed_prefix_abalone = \"{}/abalone-preprocess/output\".format(prefix)\n",
"\n",
"sagemaker_session.upload_data(path=\"abalone.csv\", bucket=bucket, key_prefix=input_prefix_abalone)\n",
"\n",
"# run the processing job\n",
"spark_processor = PySparkProcessor(\n",
" base_job_name=\"local-pyspark\",\n",
" framework_version=\"3.1\",\n",
" role=role,\n",
" instance_count=2,\n",
" instance_type=\"ml.m5.xlarge\",\n",
" max_runtime_in_seconds=1200,\n",
" tags=[{\"Key\": \"tag-key\", \"Value\": \"tag-value\"}],\n",
")\n",
"\n",
"spark_processor.run(\n",
" submit_app=\"./code/preprocess.py\",\n",
" arguments=[\n",
" \"--s3_input_bucket\",\n",
" bucket,\n",
" \"--s3_input_key_prefix\",\n",
" input_prefix_abalone,\n",
" \"--s3_output_bucket\",\n",
" bucket,\n",
" \"--s3_output_key_prefix\",\n",
" input_preprocessed_prefix_abalone,\n",
" ],\n",
" spark_event_logs_s3_uri=\"s3://{}/{}/spark_event_logs\".format(bucket, prefix),\n",
" logs=False,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Inspect the first five rows of the preprocessed output file. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# get output file name from S3 and print the first five records\n",
"train_output_key = \"\"\n",
"response = s3.list_objects_v2(Bucket=bucket, Prefix=f\"{input_preprocessed_prefix_abalone}/train\")\n",
"\n",
"for cont in response[\"Contents\"]:\n",
" if cont[\"Key\"].endswith(\".csv\"):\n",
" train_output_key = cont[\"Key\"]\n",
"\n",
"if train_output_key == \"\":\n",
" print(\"Preprocessing train file not found. Check to make sure the job ran successfully.\")\n",
"else:\n",
" print(\"Top 5 rows from s3://{}/{}/train/\".format(bucket, input_preprocessed_prefix_abalone))\n",
" s3.download_file(bucket, train_output_key, \"train_output.csv\")\n",
" print(pd.read_csv(\"train_output.csv\", header=None).head())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Conclusion and Cleanup\n",
"\n",
"In this notebook, we installed PySpark on Studio notebook and created a spark session to run PySpark code locally within Studio. You can use this as a starting point to prototype your Spark code on a smaller sample of your data before running the SageMaker processing job on your entire dataset. You can extend this example to preprocess your data for machine learning.\n",
"\n",
"To avoid incurring costs, remember to shut down the SageMaker Studio app, or stop the notebook instance as necessary."
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Notebook CI Test Results\n",
"\n",
"This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n"
]
}
],
"metadata": {
"instance_type": "ml.t3.medium",
"kernelspec": {
"display_name": "Python 3 (Data Science)",
"language": "python",
"name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.10"
}
},
"nbformat": 4,
"nbformat_minor": 4
}