in rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java [67:85]
public JobExecutionResult execute(StreamExecutionEnvironment env) throws Exception {
// A stream of taxi ride START events, keyed by rideId.
DataStream<TaxiRide> rides =
env.addSource(rideSource).filter(ride -> ride.isStart).keyBy(ride -> ride.rideId);
// A stream of taxi fare events, also keyed by rideId.
DataStream<TaxiFare> fares = env.addSource(fareSource).keyBy(fare -> fare.rideId);
// Create the pipeline.
rides.connect(fares)
.flatMap(new EnrichmentFunction())
.uid("enrichment") // uid for this operator's state
.name("enrichment") // name for this operator in the web UI
.addSink(sink);
// Execute the pipeline and return the result.
return env.execute("Join Rides with Fares");
}