public DynamicTableSink createDynamicTableSink()

in flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSourceSinkFactory.java [160:182]


    public DynamicTableSink createDynamicTableSink(Context context) {
        ReadableConfig config = getReadableConfig(context);
        String masterAddresses = config.get(KUDU_MASTERS);
        String tableName = config.get(KUDU_TABLE);
        Optional<Long> operationTimeout = config.getOptional(KUDU_OPERATION_TIMEOUT);
        Optional<Integer> flushInterval = config.getOptional(KUDU_FLUSH_INTERVAL);
        Optional<Integer> bufferSize = config.getOptional(KUDU_MAX_BUFFER_SIZE);
        Optional<Boolean> ignoreNotFound = config.getOptional(KUDU_IGNORE_NOT_FOUND);
        Optional<Boolean> ignoreDuplicate = config.getOptional(KUDU_IGNORE_DUPLICATE);
        TableSchema schema = context.getCatalogTable().getSchema();
        TableSchema physicalSchema = KuduTableUtils.getSchemaWithSqlTimestamp(schema);
        KuduTableInfo tableInfo = KuduTableUtils.createTableInfo(tableName, schema,
                context.getCatalogTable().toProperties());

        KuduWriterConfig.Builder configBuilder = KuduWriterConfig.Builder
                .setMasters(masterAddresses);
        operationTimeout.ifPresent(configBuilder::setOperationTimeout);
        flushInterval.ifPresent(configBuilder::setFlushInterval);
        bufferSize.ifPresent(configBuilder::setMaxBufferSize);
        ignoreNotFound.ifPresent(configBuilder::setIgnoreNotFound);
        ignoreDuplicate.ifPresent(configBuilder::setIgnoreDuplicate);
        return new KuduDynamicTableSink(configBuilder, physicalSchema, tableInfo);
    }