in KeyspacesSink/src/main/java/com/amazonaws/services/kinesisanalytics/TurbineSpeedAggregator.java [35:78]
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> input = createSourceFromStaticConfig(env);
DataStream<TurbineAggregatedRecord> result = input
.map(new WindTurbineInputMap())
.keyBy(t -> t.turbineId)
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.reduce(new AggregateReducer())
.map(new AggregateMap());
QueryOptionsSerializable queryOptions = new QueryOptionsSerializable();
queryOptions.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
CassandraSink.addSink(result)
.setClusterBuilder(
new ClusterBuilder() {
private static final long serialVersionUID = 2793938419775311824L;
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder
.addContactPoint("cassandra.us-east-1.amazonaws.com")
.withPort(9142)
.withSSL()
.withAuthProvider(new SigV4AuthProvider(region))
.withLoadBalancingPolicy(
DCAwareRoundRobinPolicy
.builder()
.withLocalDc(region)
.build())
.withQueryOptions(queryOptions)
.build();
}
})
.setMapperOptions(() -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)})
.setDefaultKeyspace("sensor_data")
.build();
env.execute("Wind Turbine Data Aggregator");
}