in flink-taxi-stream-processor/src/main/java/com/amazonaws/flink/refarch/ProcessTaxiStream.java [64:167]
public static void main(String[] args) throws Exception {
ParameterTool pt = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
if (! pt.has("noeventtime")) {
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
}
Properties kinesisConsumerConfig = new Properties();
kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, pt.get("region", DEFAULT_REGION));
kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "10000");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000");
DataStream<Event> kinesisStream = env.addSource(new FlinkKinesisConsumer<>(
pt.getRequired("stream"),
new EventSchema(),
kinesisConsumerConfig)
);
DataStream<TripEvent> trips = kinesisStream
.rebalance()
.assignTimestampsAndWatermarks(new PunctuatedAssigner())
.filter(event -> TripEvent.class.isAssignableFrom(event.getClass()))
.map(event -> (TripEvent) event)
.filter(GeoUtils::hasValidCoordinates)
.filter(GeoUtils::nearNYC);
DataStream<PickupCount> pickupCounts = trips
.map(trip -> new Tuple1<>(GeoHash.geoHashStringWithCharacterPrecision(trip.pickupLat, trip.pickupLon, GEOHASH_PRECISION)))
.keyBy(0)
.timeWindow(Time.minutes(10))
.apply((Tuple tuple, TimeWindow window, Iterable<Tuple1<String>> input, Collector<PickupCount> out) -> {
long count = Iterables.size(input);
String position = Iterables.get(input, 0).f0;
out.collect(new PickupCount(position, count, window.maxTimestamp()));
})
.filter(geo -> geo.pickupCount >= MIN_PICKUP_COUNT);
DataStream<TripDuration> tripDurations = trips
.flatMap((TripEvent trip, Collector<Tuple3<String, String, Long>> out) -> {
String pickupLocation = GeoHash.geoHashStringWithCharacterPrecision(trip.pickupLat, trip.pickupLon, GEOHASH_PRECISION);
long tripDuration = new Duration(trip.pickupDatetime, trip.dropoffDatetime).getStandardMinutes();
if (GeoUtils.nearJFK(trip.dropoffLat, trip.dropoffLon)) {
out.collect(new Tuple3<>(pickupLocation, "JFK", tripDuration));
} else if (GeoUtils.nearLGA(trip.dropoffLat, trip.dropoffLon)) {
out.collect(new Tuple3<>(pickupLocation, "LGA", tripDuration));
}
})
.keyBy(0,1)
.timeWindow(Time.minutes(10))
.apply((Tuple tuple, TimeWindow window, Iterable<Tuple3<String,String,Long>> input, Collector<TripDuration> out) -> {
if (Iterables.size(input) > 1) {
String location = Iterables.get(input, 0).f0;
String airportCode = Iterables.get(input, 0).f1;
long sumDuration = StreamSupport
.stream(input.spliterator(), false)
.mapToLong(trip -> trip.f2)
.sum();
double avgDuration = (double) sumDuration / Iterables.size(input);
out.collect(new TripDuration(location, airportCode, sumDuration, avgDuration, window.maxTimestamp()));
}
});
if (pt.has("checkpoint")) {
env.enableCheckpointing(5_000);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
env.setStateBackend(new RocksDBStateBackend(new URI(pt.getRequired("checkpoint"))));
LOG.info("writing checkpoints to {}", pt.get("checkpoint"));
}
if (pt.has("es-endpoint")) {
final String indexName = pt.get("es-index", ES_DEFAULT_INDEX);
final ImmutableMap<String, String> config = ImmutableMap.<String, String>builder()
.put("es-endpoint", pt.getRequired("es-endpoint"))
.put("region", pt.get("region", DEFAULT_REGION))
.build();
pickupCounts.addSink(new ElasticsearchJestSink<>(config, indexName, "pickup_count"));
tripDurations.addSink(new ElasticsearchJestSink<>(config, indexName, "trip_duration"));
}
LOG.info("Starting to consume events from stream {}", pt.getRequired("stream"));
env.execute();
}