public static void main()

in KeyspacesSink/src/main/java/com/amazonaws/services/kinesisanalytics/TurbineSpeedAggregator.java [35:78]


    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> input = createSourceFromStaticConfig(env);

        DataStream<TurbineAggregatedRecord> result = input
                .map(new WindTurbineInputMap())
                .keyBy(t -> t.turbineId)
                .window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
                .reduce(new AggregateReducer())
                .map(new AggregateMap());

        QueryOptionsSerializable queryOptions = new QueryOptionsSerializable();
        queryOptions.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);

        CassandraSink.addSink(result)
                .setClusterBuilder(
                        new ClusterBuilder() {

                            private static final long serialVersionUID = 2793938419775311824L;

                            @Override
                            public Cluster buildCluster(Cluster.Builder builder) {
                                return builder
                                        .addContactPoint("cassandra.us-east-1.amazonaws.com")
                                        .withPort(9142)
                                        .withSSL()
                                        .withAuthProvider(new SigV4AuthProvider(region))
                                        .withLoadBalancingPolicy(
                                                DCAwareRoundRobinPolicy
                                                        .builder()
                                                        .withLocalDc(region)
                                                        .build())
                                        .withQueryOptions(queryOptions)
                                        .build();
                            }
                        })
                .setMapperOptions(() -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)})
                .setDefaultKeyspace("sensor_data")
                .build();

        env.execute("Wind Turbine Data Aggregator");
    }