CustomKeystore/src/main/java/com/amazonaws/services/kinesisanalytics/KDAFlinkStreamingJob.java [33:55]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
	private static FlinkKafkaProducer<String> createKafkaSinkFromApplicationProperties() throws IOException {
		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();

		KeyedSerializationSchema keyedSerializationSchema =
				new KeyedSerializationSchemaWrapper(new SimpleStringSchema());

		// Configure FlinkProducer for exactly-once semantics
		FlinkKafkaProducer<String> sink = new FlinkKafkaProducer<>(
				(String) applicationProperties.get("KafkaSink").get("topic"),
				keyedSerializationSchema,
				applicationProperties.get("KafkaSink"),
				FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
		return sink;
	}

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		DataStream<String> input = createKafkaSourceFromApplicationProperties(env);

		// Add sink
		input.addSink(createKafkaSinkFromApplicationProperties());
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



KafkaConnectors/src/main/java/com/amazonaws/services/kinesisanalytics/KafkaGettingStartedJob.java [25:47]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
	private static FlinkKafkaProducer<String> createKafkaSinkFromApplicationProperties() throws IOException {
		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		KeyedSerializationSchema keyedSerializationSchema =
				new KeyedSerializationSchemaWrapper(new SimpleStringSchema());

		// Configure FlinkProducer for exactly-once semantics
		FlinkKafkaProducer<String> sink = new FlinkKafkaProducer<>(
				(String) applicationProperties.get("KafkaSink").get("topic"),
				keyedSerializationSchema,
				applicationProperties.get("KafkaSink"),
				FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

		return sink;
	}

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		DataStream<String> input = createKafkaSourceFromApplicationProperties(env);

		// Add sink
		input.addSink(createKafkaSinkFromApplicationProperties());
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



