in flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactory.java [51:86]
public DynamicTableSink createDynamicTableSink(Context context) {
AsyncDynamicSinkContext factoryContext = new AsyncDynamicSinkContext(this, context);
KinesisStreamsConnectorOptionsUtils optionsUtils =
new KinesisStreamsConnectorOptionsUtils(
factoryContext.getResolvedOptions(),
factoryContext.getTableOptions(),
(RowType) factoryContext.getPhysicalDataType().getLogicalType(),
factoryContext.getPartitionKeys(),
context.getClassLoader());
// validate the data types of the table options
factoryContext
.getFactoryHelper()
.validateExcept(optionsUtils.getNonValidatedPrefixes().toArray(new String[0]));
// Validate option values
validateKinesisPartitioner(
factoryContext.getTableOptions(), factoryContext.isPartitioned());
Properties properties = optionsUtils.getValidatedSinkConfigurations();
KinesisDynamicSink.KinesisDynamicTableSinkBuilder builder =
new KinesisDynamicSink.KinesisDynamicTableSinkBuilder();
builder.setStream((String) properties.get(STREAM.key()))
.setKinesisClientProperties(
(Properties) properties.get(KINESIS_CLIENT_PROPERTIES_KEY))
.setEncodingFormat(factoryContext.getEncodingFormat())
.setConsumedDataType(factoryContext.getPhysicalDataType())
.setPartitioner(
(PartitionKeyGenerator<RowData>) properties.get(SINK_PARTITIONER.key()));
addAsyncOptionsToBuilder(properties, builder);
Optional.ofNullable((Boolean) properties.get(SINK_FAIL_ON_ERROR.key()))
.ifPresent(builder::setFailOnError);
return builder.build();
}