in rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java [65:81]
public JobExecutionResult execute() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// A stream of taxi ride START events, keyed by rideId.
DataStream<TaxiRide> rides =
env.addSource(rideSource).filter(ride -> ride.isStart).keyBy(ride -> ride.rideId);
// A stream of taxi fare events, also keyed by rideId.
DataStream<TaxiFare> fares = env.addSource(fareSource).keyBy(fare -> fare.rideId);
// Create the pipeline.
rides.connect(fares).flatMap(new EnrichmentFunction()).addSink(sink);
// Execute the pipeline and return the result.
return env.execute("Join Rides with Fares");
}