public DynamicTableSink createDynamicTableSink()

in flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactory.java [51:86]


    public DynamicTableSink createDynamicTableSink(Context context) {

        AsyncDynamicSinkContext factoryContext = new AsyncDynamicSinkContext(this, context);

        KinesisStreamsConnectorOptionsUtils optionsUtils =
                new KinesisStreamsConnectorOptionsUtils(
                        factoryContext.getResolvedOptions(),
                        factoryContext.getTableOptions(),
                        (RowType) factoryContext.getPhysicalDataType().getLogicalType(),
                        factoryContext.getPartitionKeys(),
                        context.getClassLoader());
        // validate the data types of the table options
        factoryContext
                .getFactoryHelper()
                .validateExcept(optionsUtils.getNonValidatedPrefixes().toArray(new String[0]));

        // Validate option values
        validateKinesisPartitioner(
                factoryContext.getTableOptions(), factoryContext.isPartitioned());
        Properties properties = optionsUtils.getValidatedSinkConfigurations();

        KinesisDynamicSink.KinesisDynamicTableSinkBuilder builder =
                new KinesisDynamicSink.KinesisDynamicTableSinkBuilder();

        builder.setStream((String) properties.get(STREAM.key()))
                .setKinesisClientProperties(
                        (Properties) properties.get(KINESIS_CLIENT_PROPERTIES_KEY))
                .setEncodingFormat(factoryContext.getEncodingFormat())
                .setConsumedDataType(factoryContext.getPhysicalDataType())
                .setPartitioner(
                        (PartitionKeyGenerator<RowData>) properties.get(SINK_PARTITIONER.key()));
        addAsyncOptionsToBuilder(properties, builder);
        Optional.ofNullable((Boolean) properties.get(SINK_FAIL_ON_ERROR.key()))
                .ifPresent(builder::setFailOnError);
        return builder.build();
    }