in 06_dataproc/bayes_on_spark.py [0:0]
def run_bayes(BUCKET):
spark = SparkSession \
.builder \
.appName("Bayes classification using Spark") \
.getOrCreate()
# read flights data
inputs = 'gs://{}/flights/tzcorr/all_flights-*'.format(BUCKET) # FULL
flights = spark.read.json(inputs)
flights.createOrReplaceTempView('flights')
# which days are training days?
traindays = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv('gs://{}/flights/trainday.csv'.format(BUCKET))
traindays.createOrReplaceTempView('traindays')
# create training dataset
statement = """
SELECT
f.FL_DATE AS date,
CAST(distance AS FLOAT) AS distance,
dep_delay,
IF(arr_delay < 15, 1, 0) AS ontime
FROM flights f
JOIN traindays t
ON f.FL_DATE == t.FL_DATE
WHERE
t.is_train_day AND
f.dep_delay IS NOT NULL
ORDER BY
f.dep_delay DESC
"""
flights = spark.sql(statement)
# quantiles
distthresh = flights.approxQuantile('distance', list(np.arange(0, 1.0, 0.2)), 0.02)
distthresh[-1] = float('inf')
delaythresh = range(10, 20)
logging.info("Computed distance thresholds: {}".format(distthresh))
# bayes in each bin
df = pd.DataFrame(columns=['dist_thresh', 'delay_thresh', 'frac_ontime'])
for m in range(0, len(distthresh) - 1):
for n in range(0, len(delaythresh) - 1):
bdf = flights[(flights['distance'] >= distthresh[m])
& (flights['distance'] < distthresh[m + 1])
& (flights['dep_delay'] >= delaythresh[n])
& (flights['dep_delay'] < delaythresh[n + 1])]
ontime_frac = bdf.agg(F.sum('ontime')).collect()[0][0] / bdf.agg(F.count('ontime')).collect()[0][0]
print(m, n, ontime_frac)
df = df.append({
'dist_thresh': distthresh[m],
'delay_thresh': delaythresh[n],
'frac_ontime': ontime_frac
}, ignore_index=True)
# lookup table
df['score'] = abs(df['frac_ontime'] - 0.7)
bayes = df.sort_values(['score']).groupby('dist_thresh').head(1).sort_values('dist_thresh')
bayes.to_csv('gs://{}/flights/bayes.csv'.format(BUCKET), index=False)
logging.info("Wrote lookup table: {}".format(bayes.head()))