in 07_sparkml/logistic.py [0:0]
def run_logistic(BUCKET):
# Create spark session
sc = SparkContext('local', 'logistic')
spark = SparkSession \
.builder \
.appName("Logistic regression w/ Spark ML") \
.getOrCreate()
# read dataset
traindays = spark.read \
.option("header", "true") \
.csv('gs://{}/flights/trainday.csv'.format(BUCKET))
traindays.createOrReplaceTempView('traindays')
# inputs = 'gs://{}/flights/tzcorr/all_flights-00000-*'.format(BUCKET) # 1/30th
inputs = 'gs://{}/flights/tzcorr/all_flights-*'.format(BUCKET) # FULL
flights = spark.read.json(inputs)
# this view can now be queried ...
flights.createOrReplaceTempView('flights')
# logistic regression
trainquery = """
SELECT
DEP_DELAY, TAXI_OUT, ARR_DELAY, DISTANCE
FROM flights f
JOIN traindays t
ON f.FL_DATE == t.FL_DATE
WHERE
t.is_train_day == 'True' AND
f.CANCELLED == 'False' AND
f.DIVERTED == 'False'
"""
traindata = spark.sql(trainquery)
def to_example(fields):
return LabeledPoint(\
float(fields['ARR_DELAY'] < 15), #ontime \
[ \
fields['DEP_DELAY'], # DEP_DELAY \
fields['TAXI_OUT'], # TAXI_OUT \
fields['DISTANCE'], # DISTANCE \
])
examples = traindata.rdd.map(to_example)
lrmodel = LogisticRegressionWithLBFGS.train(examples, intercept=True)
lrmodel.setThreshold(0.7)
# save model
MODEL_FILE='gs://{}/flights/sparkmloutput/model'.format(BUCKET)
lrmodel.save(sc, MODEL_FILE)
logging.info('Logistic regression model saved in {}'.format(MODEL_FILE))
# evaluate
testquery = trainquery.replace("t.is_train_day == 'True'","t.is_train_day == 'False'")
testdata = spark.sql(testquery)
examples = testdata.rdd.map(to_example)
# Evaluate model
lrmodel.clearThreshold() # so it returns probabilities
labelpred = examples.map(lambda p: (p.label, lrmodel.predict(p.features)))
logging.info('All flights: {}'.format(eval_model(labelpred)))
# keep only those examples near the decision threshold
labelpred = labelpred.filter(lambda data: data[1] > 0.65 and data[1] < 0.75)
logging.info('Flights near decision threshold: {}'.format(eval_model(labelpred)))