in quests/data-science-on-gcp-edition1_tf2/08_dataflow/chapter8/src/main/java/com/google/cloud/training/flights/CreateTrainingDataset8.java [71:198]
public static void main(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
options.setStreaming(true);
Pipeline p = Pipeline.create(options);
// read traindays.csv into memory for use as a side-input
PCollectionView<Map<String, String>> traindays = getTrainDays(p, options.getTraindayCsvPath());
String[] events = {
"2015-09-21,AA,19805,AA,1572,13303,1330303,32467,MIA,12889,1288903,32211,LAS,2015-09-20T22:50:00,2015-09-20T04:03:00,313.00,19.00,2015-09-20T04:22:00,,,2015-09-21T04:08:00,,,0.00,,,2174.00,25.79527778,-80.29000000,-14400.0,36.08000000,-115.15222222,-25200.0,wheelsoff,2015-09-20T04:22:00",
"2015-09-21,AA,19805,AA,2495,11298,1129804,30194,DFW,12892,1289203,32575,LAX,2015-09-21T01:25:00,2015-09-20T06:04:00,279.00,15.00,2015-09-20T06:19:00,,,2015-09-21T04:55:00,,,0.00,,,1235.00,32.89722222,-97.03777778,-18000.0,33.94250000,-118.40805556,-25200.0,wheelsoff,2015-09-20T06:19:00",
"2015-09-21,AA,19805,AA,2495,11298,1129804,30194,DFW,12892,1289203,32575,LAX,2015-09-21T01:25:00,2015-09-20T06:04:00,279.00,15.00,2015-09-20T06:19:00,2015-09-20T09:10:00,5.00,2015-09-21T04:55:00,2015-09-20T09:15:00,260.00,0.00,,0.00,1235.00,32.89722222,-97.03777778,-18000.0,33.94250000,-118.40805556,-25200.0,arrived,2015-09-20T09:15:00" };
PCollection<Flight> flights = p //
.apply("ReadLines",
// TextIO.Read.from(options.getInput())) //
Create.of(Arrays.asList(events))) //
.apply("ParseFlights", ParDo.of(new DoFn<String, Flight>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String line = c.element();
Flight f = Flight.fromCsv(line);
if (f != null) {
c.outputWithTimestamp(f, f.getEventTimestamp());
}
}
})) //
.apply("TrainOnly", ParDo.of(new DoFn<Flight, Flight>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Flight f = c.element();
String date = f.getField(Flight.INPUTCOLS.FL_DATE);
boolean isTrainDay = c.sideInput(traindays).containsKey(date);
if (isTrainDay) {
c.output(f);
}
}
}).withSideInputs(traindays)) //
.apply("GoodFlights", ParDo.of(new DoFn<Flight, Flight>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Flight f = c.element();
if (f.isNotCancelled() && f.isNotDiverted()) {
c.output(f);
}
}
}));
PCollection<Flight> lastHourFlights = //
flights.apply(Window.into(SlidingWindows//
.of(Duration.standardHours(1))//
.every(Duration.standardMinutes(5))));
PCollection<KV<String, Double>> depDelays = flights
.apply("airport:hour->depdelay", ParDo.of(new DoFn<Flight, KV<String, Double>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Flight f = c.element();
if (f.getField(Flight.INPUTCOLS.EVENT).equals("wheelsoff")) {
String key = f.getField(Flight.INPUTCOLS.ORIGIN) + ":" + f.getDepartureHour();
double value = f.getFieldAsFloat(Flight.INPUTCOLS.DEP_DELAY)
+ f.getFieldAsFloat(Flight.INPUTCOLS.TAXI_OUT);
c.output(KV.of(key, value));
}
}
})) //
.apply("avgDepDelay", Mean.perKey());
PCollection<KV<String, Double>> arrDelays = lastHourFlights
.apply("airport->arrdelay", ParDo.of(new DoFn<Flight, KV<String, Double>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Flight f = c.element();
if (f.getField(Flight.INPUTCOLS.EVENT).equals("arrived")) {
String key = f.getField(Flight.INPUTCOLS.DEST);
double value = f.getFieldAsFloat(Flight.INPUTCOLS.ARR_DELAY);
c.output(KV.of(key, value));
}
}
})) //
.apply("avgArrDelay", Mean.perKey());
PCollectionView<Map<String, Double>> avgDepDelay = depDelays.apply("depdelay->map", View.asMap());
PCollectionView<Map<String, Double>> avgArrDelay = arrDelays.apply("arrdelay->map", View.asMap());
flights = lastHourFlights.apply("AddDelayInfo", ParDo.of(new DoFn<Flight, Flight>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Flight f = c.element().newCopy();
String depKey = f.getField(Flight.INPUTCOLS.ORIGIN) + ":" + f.getDepartureHour();
Double depDelay = c.sideInput(avgDepDelay).get(depKey);
String arrKey = f.getField(Flight.INPUTCOLS.DEST);
Double arrDelay = c.sideInput(avgArrDelay).get(arrKey);
f.avgDepartureDelay = (float) ((depDelay == null) ? 0 : depDelay);
f.avgArrivalDelay = (float) ((arrDelay == null) ? 0 : arrDelay);
c.output(f);
}
}).withSideInputs(avgDepDelay, avgArrDelay));
depDelays.apply("DepDelayToCsv", ParDo.of(new DoFn<KV<String, Double>, String>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
KV<String, Double> kv = c.element();
c.output(kv.getKey() + "," + kv.getValue());
}
})) //
.apply("WriteDepDelays", TextIO.write().to(options.getOutput() + "delays8").withSuffix(".csv").withoutSharding());
flights.apply("ToCsv", ParDo.of(new DoFn<Flight, String>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Flight f = c.element();
if (f.getField(INPUTCOLS.EVENT).equals("arrived")) {
c.output(f.toTrainingCsv());
}
}
})) //
.apply("WriteFlights", TextIO.write().to(options.getOutput() + "flights8").withSuffix(".csv").withoutSharding());
p.run();
}