in demo-apps/demo-kinesis-driver/src/main/java/com/amazonaws/services/kinesisanalytics/DemoKinesisDriver.java [43:67]
public static FlinkKinesisProducer<EmployeeInfo> createKinesisSink(StreamExecutionEnvironment env, ParameterTool parameter) throws Exception {
String awsRegion = parameter.get("AWS_REGION", "us-east-2");
Properties producerConfig = new Properties();
// Required configs
producerConfig.put(AWSConfigConstants.AWS_REGION, awsRegion);
FlinkKinesisProducer<EmployeeInfo> kinesis =
new FlinkKinesisProducer<EmployeeInfo>(new AvroSerializationFn(), producerConfig);
kinesis.setFailOnError(true);
kinesis.setDefaultStream(parameter.get("KINESIS_STREAM", "AmazonKinesisStream1"));
kinesis.setCustomPartitioner(new KinesisPartitioner<EmployeeInfo>() {
@Override
public String getPartitionId(EmployeeInfo s) {
// we dont' care about shard affinity in this app
return UUID.randomUUID().toString();
}
});
return kinesis;
}