public static FlinkKinesisProducer createKinesisSink()

in demo-apps/demo-kinesis-driver/src/main/java/com/amazonaws/services/kinesisanalytics/DemoKinesisDriver.java [43:67]


	public static FlinkKinesisProducer<EmployeeInfo> createKinesisSink(StreamExecutionEnvironment env, ParameterTool parameter) throws Exception {

		String awsRegion = parameter.get("AWS_REGION", "us-east-2");

		Properties producerConfig = new Properties();

		// Required configs
		producerConfig.put(AWSConfigConstants.AWS_REGION, awsRegion);

		FlinkKinesisProducer<EmployeeInfo> kinesis =
				new FlinkKinesisProducer<EmployeeInfo>(new AvroSerializationFn(), producerConfig);

		kinesis.setFailOnError(true);
		kinesis.setDefaultStream(parameter.get("KINESIS_STREAM", "AmazonKinesisStream1"));

		kinesis.setCustomPartitioner(new KinesisPartitioner<EmployeeInfo>() {
			@Override
			public String getPartitionId(EmployeeInfo s) {
				// we dont' care about shard affinity in this app
				return UUID.randomUUID().toString();
			}
		});

		return kinesis;
	}