in long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java [62:84]
public JobExecutionResult execute() throws Exception {
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start the data generator
DataStream<TaxiRide> rides = env.addSource(source);
// the WatermarkStrategy specifies how to extract timestamps and generate watermarks
WatermarkStrategy<TaxiRide> watermarkStrategy =
WatermarkStrategy.<TaxiRide>forBoundedOutOfOrderness(Duration.ofSeconds(60))
.withTimestampAssigner(
(ride, streamRecordTimestamp) -> ride.getEventTimeMillis());
// create the pipeline
rides.assignTimestampsAndWatermarks(watermarkStrategy)
.keyBy(ride -> ride.rideId)
.process(new AlertFunction())
.addSink(sink);
// execute the pipeline and return the result
return env.execute("Long Taxi Rides");
}