in rides-and-fares/src/main/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresExercise.scala [44:68]
def execute(): JobExecutionResult = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val rides = env
.addSource(rideSource)
.filter { ride =>
ride.isStart
}
.keyBy { ride =>
ride.rideId
}
val fares = env
.addSource(fareSource)
.keyBy { fare =>
fare.rideId
}
rides
.connect(fares)
.flatMap(new EnrichmentFunction())
.addSink(sink)
env.execute()
}