public static void main()

in flink-taxi-stream-processor/src/main/java/com/amazonaws/flink/refarch/ProcessTaxiStream.java [64:167]


  public static void main(String[] args) throws Exception {
    ParameterTool pt = ParameterTool.fromArgs(args);

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    if (! pt.has("noeventtime")) {
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    }

    Properties kinesisConsumerConfig = new Properties();
    kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION, pt.get("region", DEFAULT_REGION));
    kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");
    kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "10000");
    kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000");


    DataStream<Event> kinesisStream = env.addSource(new FlinkKinesisConsumer<>(
        pt.getRequired("stream"),
        new EventSchema(),
        kinesisConsumerConfig)
    );


    DataStream<TripEvent> trips = kinesisStream
        .rebalance()
        .assignTimestampsAndWatermarks(new PunctuatedAssigner())
        .filter(event -> TripEvent.class.isAssignableFrom(event.getClass()))
        .map(event -> (TripEvent) event)
        .filter(GeoUtils::hasValidCoordinates)
        .filter(GeoUtils::nearNYC);


    DataStream<PickupCount> pickupCounts = trips
        .map(trip -> new Tuple1<>(GeoHash.geoHashStringWithCharacterPrecision(trip.pickupLat, trip.pickupLon, GEOHASH_PRECISION)))
        .keyBy(0)
        .timeWindow(Time.minutes(10))
        .apply((Tuple tuple, TimeWindow window, Iterable<Tuple1<String>> input, Collector<PickupCount> out) -> {
          long count = Iterables.size(input);
          String position = Iterables.get(input, 0).f0;

          out.collect(new PickupCount(position, count, window.maxTimestamp()));
        })
        .filter(geo -> geo.pickupCount >= MIN_PICKUP_COUNT);


    DataStream<TripDuration> tripDurations = trips
        .flatMap((TripEvent trip, Collector<Tuple3<String, String, Long>> out) -> {
          String pickupLocation = GeoHash.geoHashStringWithCharacterPrecision(trip.pickupLat, trip.pickupLon, GEOHASH_PRECISION);
          long tripDuration = new Duration(trip.pickupDatetime, trip.dropoffDatetime).getStandardMinutes();

          if (GeoUtils.nearJFK(trip.dropoffLat, trip.dropoffLon)) {
            out.collect(new Tuple3<>(pickupLocation, "JFK", tripDuration));
          } else if (GeoUtils.nearLGA(trip.dropoffLat, trip.dropoffLon)) {
            out.collect(new Tuple3<>(pickupLocation, "LGA", tripDuration));
          }
        })
        .keyBy(0,1)
        .timeWindow(Time.minutes(10))
        .apply((Tuple tuple, TimeWindow window, Iterable<Tuple3<String,String,Long>> input, Collector<TripDuration> out) -> {
          if (Iterables.size(input) > 1) {
            String location = Iterables.get(input, 0).f0;
            String airportCode = Iterables.get(input, 0).f1;

            long sumDuration = StreamSupport
                .stream(input.spliterator(), false)
                .mapToLong(trip -> trip.f2)
                .sum();

            double avgDuration = (double) sumDuration / Iterables.size(input);

            out.collect(new TripDuration(location, airportCode, sumDuration, avgDuration, window.maxTimestamp()));
          }
        });


    if (pt.has("checkpoint")) {
      env.enableCheckpointing(5_000);

      CheckpointConfig checkpointConfig = env.getCheckpointConfig();
      checkpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

      env.setStateBackend(new RocksDBStateBackend(new URI(pt.getRequired("checkpoint"))));

      LOG.info("writing checkpoints to {}", pt.get("checkpoint"));
    }


    if (pt.has("es-endpoint")) {
      final String indexName = pt.get("es-index", ES_DEFAULT_INDEX);

      final ImmutableMap<String, String> config = ImmutableMap.<String, String>builder()
          .put("es-endpoint", pt.getRequired("es-endpoint"))
          .put("region", pt.get("region", DEFAULT_REGION))
          .build();

      pickupCounts.addSink(new ElasticsearchJestSink<>(config, indexName, "pickup_count"));
      tripDurations.addSink(new ElasticsearchJestSink<>(config, indexName, "trip_duration"));
    }


    LOG.info("Starting to consume events from stream {}", pt.getRequired("stream"));

    env.execute();
  }