in rides-and-fares/src/solution/scala/org/apache/flink/training/solutions/ridesandfares/scala/RidesAndFaresSolution.scala [44:68]
def execute(): JobExecutionResult = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val rides = env
.addSource(rideSource)
.filter { ride =>
ride.isStart
}
.keyBy { ride =>
ride.rideId
}
val fares = env
.addSource(fareSource)
.keyBy { fare =>
fare.rideId
}
rides
.connect(fares)
.flatMap(new EnrichmentFunction)
.addSink(sink)
env.execute()
}