def execute()

in rides-and-fares/src/main/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresExercise.scala [44:68]


    def execute(): JobExecutionResult = {
      val env = StreamExecutionEnvironment.getExecutionEnvironment

      val rides = env
        .addSource(rideSource)
        .filter { ride =>
          ride.isStart
        }
        .keyBy { ride =>
          ride.rideId
        }

      val fares = env
        .addSource(fareSource)
        .keyBy { fare =>
          fare.rideId
        }

      rides
        .connect(fares)
        .flatMap(new EnrichmentFunction())
        .addSink(sink)

      env.execute()
    }