in 07_sparkml/experiment.py [0:0]
def run_experiment(BUCKET, SCALE_AND_CLIP, WITH_TIME, WITH_ORIGIN):
# Create spark session
sc = SparkContext('local', 'experimentation')
spark = SparkSession \
.builder \
.appName("Logistic regression w/ Spark ML") \
.getOrCreate()
# read dataset
traindays = spark.read \
.option("header", "true") \
.csv('gs://{}/flights/trainday.csv'.format(BUCKET))
traindays.createOrReplaceTempView('traindays')
#inputs = 'gs://{}/flights/tzcorr/all_flights-00000-*'.format(BUCKET) # 1/30th
inputs = 'gs://{}/flights/tzcorr/all_flights-*'.format(BUCKET) # FULL
flights = spark.read.json(inputs)
# this view can now be queried
flights.createOrReplaceTempView('flights')
# separate training and validation data
from pyspark.sql.functions import rand
SEED=13
traindays = traindays.withColumn("holdout", rand(SEED) > 0.8) # 80% of data is for training
traindays.createOrReplaceTempView('traindays')
# logistic regression
trainquery = """
SELECT
ORIGIN, DEP_DELAY, TAXI_OUT, ARR_DELAY, DISTANCE, DEP_TIME, DEP_AIRPORT_TZOFFSET
FROM flights f
JOIN traindays t
ON f.FL_DATE == t.FL_DATE
WHERE
t.is_train_day == 'True' AND
t.holdout == False AND
f.CANCELLED == 'False' AND
f.DIVERTED == 'False'
"""
traindata = spark.sql(trainquery).repartition(NUM_PARTITIONS)
def to_example(fields):
features = [
fields['DEP_DELAY'],
fields['DISTANCE'],
fields['TAXI_OUT'],
]
if SCALE_AND_CLIP:
def clip(x):
if x < -1:
return -1
if x > 1:
return 1
return x
features = [
clip(float(fields['DEP_DELAY']) / 30),
clip((float(fields['DISTANCE']) / 1000) - 1),
clip((float(fields['TAXI_OUT']) / 10) - 1),
]
if WITH_TIME:
features.extend(
get_local_hour(fields['DEP_TIME'], fields['DEP_AIRPORT_TZOFFSET']))
if WITH_ORIGIN:
features.extend(fields['origin_onehot'])
return LabeledPoint(
float(fields['ARR_DELAY'] < 15), #ontime
features)
def add_origin(df, trained_model=None):
from pyspark.ml.feature import OneHotEncoder, StringIndexer
if not trained_model:
indexer = StringIndexer(inputCol='ORIGIN', outputCol='origin_index')
trained_model = indexer.fit(df)
indexed = trained_model.transform(df)
encoder = OneHotEncoder(inputCol='origin_index', outputCol='origin_onehot')
return trained_model, encoder.fit(indexed).transform(indexed)
if WITH_ORIGIN:
index_model, traindata = add_origin(traindata)
examples = traindata.rdd.map(to_example)
lrmodel = LogisticRegressionWithLBFGS.train(examples, intercept=True)
lrmodel.clearThreshold() # return probabilities
# save model
MODEL_FILE='gs://' + BUCKET + '/flights/sparkmloutput/model'
lrmodel.save(sc, MODEL_FILE)
logging.info("Saved trained model to {}".format(MODEL_FILE))
# evaluate model on the heldout data
evalquery = trainquery.replace("t.holdout == False", "t.holdout == True")
evaldata = spark.sql(evalquery).repartition(NUM_PARTITIONS)
if WITH_ORIGIN:
evaldata = add_origin(evaldata, index_model)
examples = evaldata.rdd.map(to_example)
labelpred = examples.map(lambda p: (p.label, lrmodel.predict(p.features)))
logging.info(eval(labelpred))