blogpost/sample-notebook/databrew_jupyter_plugin.ipynb (289 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Use AWS Glue Databrew from within Jupyter notebooks to prepare data for ML models \n",
"\n",
"\n",
"---\n",
"\n",
"This notebook walks through the steps to configure and use open source Jupyterlab extension for AWS Glue Databrew to prepare data for a sample anomaly detection model.\n",
"\n",
"The [electricity consumption dataset](https://archive.ics.uci.edu/ml/datasets/ElectricityLoadDiagrams20112014#) is used in this notebook. A subset of original dataset with 4 customer datapoints is used as a starting point. A series of DataBrew transformations are applied on the dataset to prepare it for Random Cut Forests anomaly detection model. On the prepared dataset, a RCF model is trained and deployed in SageMaker\n",
"\n",
"\n",
"Please make sure the kernel is set to 'python3'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Install the packages needed to run this notebook"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!pip install awswrangler\n",
"!pip install --upgrade sagemaker\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Import the packages "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import boto3\n",
"import sagemaker as sm\n",
"from sagemaker import *\n",
"import awswrangler as wr\n",
"import matplotlib.pyplot as plt\n",
"import os\n",
"import pandas as pd\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### S3 bucket where the raw and transformed data will be stored and the role details"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"session = sm.Session()\n",
"### **** 'data_bucket' should point to bucket name you are using DataBrew and model Training ***** #### \n",
"data_bucket=session.default_bucket() \n",
"#s3_bucket=#input_s3_bucket#\n",
"role_arn=session.get_caller_identity_arn()\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Data Preparation using AWS Glue DataBrew"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Exploring the prepared data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pc_processed_path=os.path.join('s3://',data_bucket,'prefix_where_DataBrew_output_is_stored')\n",
"columns=['timestamp','client_id','hourly_consumption']\n",
"pc_processed_df = wr.s3.read_csv(path=pc_processed_path)\n",
"pc_processed_df=pc_processed_df [columns]\n",
"#columns[0]='timestamp'\n",
"#pc_processed_df.columns=columns\n",
"pc_processed_df.client_id.unique()\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### plotting the raw timeseries electricity consumption"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"figure, axes = plt.subplots(3, 1)\n",
"figure.set_figheight(8)\n",
"figure.set_figwidth(15)\n",
"pc_processed_df[pc_processed_df['client_id']=='MT_012'].plot(ax=axes[0],title='MT_012') \n",
"pc_processed_df[pc_processed_df['client_id']=='MT_013'].plot(ax=axes[1],title='MT_013')\n",
"pc_processed_df[pc_processed_df['client_id']=='MT_132'].plot(ax=axes[2],title='MT_132')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Lets train our model with ***MT_132*** consumption data. Since RCF requires one time series and integer values, lets filter and convert the consumption data to inetger data type"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"train_df=pc_processed_df[(pc_processed_df['client_id']=='MT_132') & (pc_processed_df['timestamp']<'2014-11-01')]\n",
"train_df=train_df.drop(['timestamp','client_id'],axis=1)\n",
"train_df.hourly_consumption=train_df.hourly_consumption.astype('int32')\n",
"train_df.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Train RCF Model"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"s3_train_path=os.path.join('s3://',data_bucket,'databrew_rcf','training','train.csv')\n",
"s3_model_path=os.path.join('s3://',data_bucket,'databrew_rcf','model')\n",
"\n",
"wr.s3.to_csv(df=train_df,path=s3_train_path,header=False,index=False)\n",
"training_channel=sm.inputs.TrainingInput(s3_data=s3_train_path,content_type='text/csv;label_size=0',distribution='ShardedByS3Key')\n",
"channels={'train':training_channel}\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"rcf_algo_uri=image_uris.retrieve('randomcutforest',session.boto_region_name)\n",
"rcf_estimator= sm.estimator.Estimator(rcf_algo_uri,role=role_arn,instance_count=1,instance_type='ml.m5.large',output_path=s3_model_path)\n",
"rcf_estimator.set_hyperparameters(feature_dim=1)\n",
"rcf_estimator.fit(channels)\n",
" "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Deploy the trained model "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"rcf_endpoint_name='databrew-rcf-demo-endpoint'\n",
"rcf_predictor=rcf_estimator.deploy(endpoint_name=rcf_endpoint_name,instance_type='ml.t2.medium',initial_instance_count=1,serializer=serializers.CSVSerializer(),deserializer=deserializers.JSONDeserializer())\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Predictions and Cleanup"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from statistics import mean,stdev\n",
"test_df=pc_processed_df[(pc_processed_df['client_id']=='MT_012') & (pc_processed_df['timestamp'] >= '2014-01-01') &(pc_processed_df['hourly_consumption'] != 0)]\n",
"test_df=test_df.tail(500)\n",
"test_df_values=test_df['hourly_consumption'].astype('str').tolist()\n",
"response=rcf_predictor.predict(test_df_values)\n",
"scores = [datum[\"score\"] for datum in response[\"scores\"]]\n",
"scores_mean=mean(scores)\n",
"scores_std=stdev(scores)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### plot the prediction scores taking mean + or - 2*standard_deviation as the baseline "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"test_df['hourly_consumption'].plot(figsize=(40,10))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"plt.figure(figsize=(40,10))\n",
"plt.plot(scores)\n",
"plt.autoscale(tight=True)\n",
"plt.axhline(y=scores_mean+2*scores_std,color='red')\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Clean up by deleting the endpoint"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"rcf_predictor.delete_endpoint()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "conda_python3",
"language": "python",
"name": "conda_python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.10"
}
},
"nbformat": 4,
"nbformat_minor": 4
}