In [None]:
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Housing prices prediction from real estate assessments (Decision Tree Regression)

<table align="left">

<a href="https://github.com/GoogleCloudPlatform/ai-ml-recipes/blob/main/notebooks/regression/decision_tree_regression/housing_prices_prediction.ipynb">
<img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
View on GitHub
</a>
</td>
<td>
<a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/ai-ml-recipes/main/notebooks/regression/decision_tree_regression/housing_prices_prediction.ipynb">
<img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo">
Open in Vertex AI Workbench
</a>
</td>
</table>

## Overview

This notebook shows how to predict housing prices based on location and other characteristics using Decision Tree Regression.

#### **Steps**
Using Spark, 
1) It reads the table [State of Connecticut - Real Estate Sales](https://catalog.data.gov/dataset/real-estate-sales-2001-2018) from [gs://dataproc-metastore-public-binaries/real_estate_sales](https://console.cloud.google.com/storage/browser/dataproc-metastore-public-binaries/real_estate_sales).  
2) It parses process the dataset to choose features and train the ML model (fits the decision tree regression model) to predict a target value.  
    **Features**: list_year, property_type, residential_type, longitude, latitude  
    **Target**: assessed_value  
3) It evaluates and plot the results.

#### **Details of the dataset**
- The dataset contains listing of real estate sales with a sales price of $2,000 or greater that occur between October 1 and September 30 of each year (2001 to 2020). 
- The dataset contains data for some towns from the State of Connecticut, like:
  - Danbury, New Milford, New Haven, Norwalk, Hartford, East Haven, Montville, Bridgeport, Southington, Vernon, Wolcott, ...
- For each sale record, the file includes information such as town, property address, location, date of sale, property type (residential, apartment, commercial, industrial or vacant land), sales price, and property assessment.

### Setup

#### Identity and Access Management (IAM)

Make sure the service account running this notebook has the required permissions:

- **Run the notebook**
  - AI Platform Notebooks Service Agent
  - Notebooks Admin
  - Vertex AI Administrator
- **Read files from bucket**
  - Storage Object Viewer
- **Run Dataproc jobs**
  - Dataproc Service Agent
  - Dataproc Worker

In [None]:
#### Import dependencies
import pandas as pd
import matplotlib.pyplot as plt

from pyspark.sql.types import IntegerType, FloatType
from pyspark.sql.functions import round, desc, corr, col

from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import Bucketizer, StringIndexer, VectorAssembler
from pyspark.ml.linalg import Vectors

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder \
    .appName("Housing prices prediction with Decision Tree Regression") \
    .enableHiveSupport() \
    .getOrCreate()

In [None]:
raw_dataset = spark.read.parquet("gs://dataproc-metastore-public-binaries/real_estate_sales/")

### Pre-process dataset / filter values

In [None]:
#### Filters
filters = [
"residential_type IS NOT NULL AND residential_type != 'Unknown'",
"property_type IS NOT NULL AND property_type != 'Unknown'",
"assessed_value IS NOT NULL",
"longitude IS NOT NULL",
"latitude IS NOT NULL",
"sale_amount IS NOT NULL"
]
filters = " AND ".join(filters)
filtered_dataset = raw_dataset.filter(filters)

In [None]:
# Towns
filtered_dataset.groupBy("town").count().orderBy(desc("count")).limit(100).show(10,100)
print(f'Number of distinct towns: {filtered_dataset.select("town").distinct().count()}')
print(f'Number of towns: {filtered_dataset.select("town").count()}')

### Select town

In [None]:
SELECTED_TOWN = "Waterbury"
pre_processed_dataset = filtered_dataset.filter(f"town == '{SELECTED_TOWN}'")
pre_processed_dataset.show(5,20)

|serial_number|list_year|date_recorded|     town|         address|assessed_value|sale_amount|sales_ratio|property_type|residential_type|non_use_code|assessor_remarks|opm_remarks|longitude|latitude|
|-------------|---------|-------------|---------|----------------|--------------|-----------|-----------|-------------|----------------|------------|----------------|-----------|---------|--------|
|       201374|     2020|   2021-04-06|Waterbury| 88 ELLSMERE AVE|       62560.0|   210000.0|     0.2979|  Residential|   Single Family|     Unknown|         Unknown|    Unknown|-72.99548|41.54391|
|       200990|     2020|   2021-01-27|Waterbury| 25 MACARTHUR DR|       50110.0|   105000.0|     0.4772|  Residential|   Single Family|     Unknown|         Unknown|    Unknown|-73.03644|41.57748|
|       201065|     2020|   2021-02-09|Waterbury|   38 KELLOGG ST|       34240.0|    97900.0|     0.3497|  Residential|   Single Family|     Unknown|         Unknown|    Unknown|-73.04534|41.56237|
|       201242|     2020|   2021-03-10|Waterbury|200 SOUTHWEST RD|      164390.0|   310000.0|     0.5302|  Residential|   Single Family|     Unknown|         Unknown|    Unknown|-73.07834|41.53038|
|       200407|     2020|   2020-11-12|Waterbury|  30 KENMORE AVE|      108210.0|   225000.0|     0.4809|  Residential|   Single Family|     Unknown|         Unknown|    Unknown|-73.08336|41.57112|

### Exploratory Data Analysis

In [None]:
# Count, Mean, Min, Max of numeric columns
numeric_columns = ["list_year","assessed_value","sale_amount"]
pre_processed_dataset.select(numeric_columns).describe().select('summary', *[round(c, 2).alias(c) for c in numeric_columns]).show()

|summary|list_year|assessed_value|sale_amount|
|-------|---------|--------------|-----------|
|  count|   3836.0|        3836.0|     3836.0|
|   mean|  2014.19|      86038.21|  120607.07|
| stddev|     4.89|      61597.34|   125087.8|
|    min|   2006.0|        3500.0|     2000.0|
|    max|   2020.0|     1889750.0|  5091000.0|

In [None]:
# Towns
pre_processed_dataset.groupBy("town").count().orderBy(desc("count")).limit(100).show(10,100)
print(f'Number of distinct towns: {pre_processed_dataset.select("town").distinct().count()}')
print(f'Number of towns: {pre_processed_dataset.select("town").count()}')

In [None]:
# Property Type
pre_processed_dataset.groupBy("property_type").count().orderBy(desc("count")).limit(12).show(12,100)
print(f'Number of property type: {pre_processed_dataset.select("property_type").count()}')
print(f'Number of distinct property type: {pre_processed_dataset.select("property_type").distinct().count()}')

In [None]:
# Residential Type
pre_processed_dataset.groupBy("residential_type").count().orderBy(desc("count")).limit(12).show(12,100)
print(f'Number of residential type: {pre_processed_dataset.select("residential_type").count()}')
print(f'Number of distinct residential type: {pre_processed_dataset.select("residential_type").distinct().count()}')

In [None]:
# Years
pre_processed_dataset.groupBy("list_year").count().orderBy(desc("count")).limit(10).show(10,100)

### Process dataset to create features

In [None]:
target_column = 'assessed_value'
columns = [target_column,'list_year','property_type','residential_type', 'longitude', 'latitude']
categorical_columns = ['list_year','property_type','residential_type']
feature_columns = ['indexed_list_year','indexed_property_type','indexed_residential_type', 'longitude', 'latitude']

#### Select only relevant columns
sub_dataset = pre_processed_dataset.select(*columns)

In [None]:
sub_dataset.printSchema()

In [None]:
sub_dataset.show(5,20)

|assessed_value|list_year|property_type|residential_type|longitude|latitude|
|--------------|---------|-------------|----------------|---------|--------|
|       62560.0|     2020|  Residential|   Single Family|-72.99548|41.54391|
|       50110.0|     2020|  Residential|   Single Family|-73.03644|41.57748|
|       34240.0|     2020|  Residential|   Single Family|-73.04534|41.56237|
|      164390.0|     2020|  Residential|   Single Family|-73.07834|41.53038|
|      108210.0|     2020|  Residential|   Single Family|-73.08336|41.57112|
|       49840.0|     2020|  Residential|           Condo|-72.97195|41.55385|

In [None]:
#### Index categorical columns
indexers = [StringIndexer(inputCol=column, outputCol="indexed_" + column).fit(sub_dataset) for column in categorical_columns]
pipeline = Pipeline(stages=indexers)
indexed_dataset = pipeline.fit(sub_dataset).transform(sub_dataset)
indexed_dataset = indexed_dataset.drop(*categorical_columns)
indexed_dataset.show(5,20)

|assessed_value|longitude|latitude|indexed_list_year|indexed_property_type|indexed_residential_type|
|--------------|---------|--------|-----------------|---------------------|------------------------|
|       62560.0|-72.99548|41.54391|              0.0|                  1.0|                     0.0|
|       50110.0|-73.03644|41.57748|              0.0|                  1.0|                     0.0|
|       34240.0|-73.04534|41.56237|              0.0|                  1.0|                     0.0|
|      164390.0|-73.07834|41.53038|              0.0|                  1.0|                     0.0|
|      108210.0|-73.08336|41.57112|              0.0|                  1.0|                     0.0|

### Transform features to LIBSVM format

In [None]:
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
dataset = assembler.transform(indexed_dataset)

dataset = dataset.select(target_column,'features')
dataset = dataset.withColumnRenamed(target_column,'label')

dataset.show(5,100)

|   label|                                          features|
|--------|--------------------------------------------------|
| 62560.0| [0.0,1.0,0.0,-72.9954833984375,41.54391098022461]|
| 50110.0|[0.0,1.0,0.0,-73.03643798828125,41.57748031616211]|
| 34240.0|[0.0,1.0,0.0,-73.04534149169922,41.56237030029297]|
|164390.0|[0.0,1.0,0.0,-73.07833862304688,41.53038024902344]|
|108210.0|[0.0,1.0,0.0,-73.08335876464844,41.57112121582031]|

### Train/Fit the model

In [None]:
(trainingData, testData) = dataset.randomSplit([0.8, 0.2])

dt = DecisionTreeRegressor(featuresCol="features", maxDepth = 10, maxBins = 12)

pipeline = Pipeline(stages=[dt])

# Train model
model = pipeline.fit(trainingData)

### Evaluate the model

In [None]:
# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(10)

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

|prediction|  label|            features|
|----------|-------|--------------------|
|   28001.6|21700.0|[1.0,2.0,1.0,-72....|
|   27911.1|21820.0|[0.0,1.0,1.0,-72....|
|   27911.1|22640.0|[0.0,1.0,1.0,-73....|
|   97573.3|22850.0|[4.0,2.0,1.0,-72....|
|   27911.1|24170.0|[0.0,1.0,1.0,-73....|

In [None]:
#### Count, Mean, Min, Max of predictions
predictions.select(["prediction", "label"]).describe().select('summary', *[round(c, 2).alias(c) for c in ["prediction", "label"]]).show()

In [None]:
#### Model results
treeModel = model.stages[0]
print(treeModel)
print(treeModel.featureImportances)

In [None]:
#### Plot predictions against target
x = range(0, predictions.count())
y_pred=predictions.select("prediction").collect()
y_target=predictions.select("label").collect()
 
plt.plot(x, y_target, label="label")
plt.plot(x, y_pred, label="prediction")
plt.title("Test and predicted data")

plt.xlabel('x axis')
plt.ylabel('y axis')

plt.legend(loc='best',fancybox=True, shadow=False)
plt.show() 