private KuduTableSink createTableSink()

in flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java [135:158]


    private KuduTableSink createTableSink(String tableName, TableSchema schema, Map<String, String> props) {
        DescriptorProperties properties = new DescriptorProperties();
        properties.putProperties(props);
        String masterAddresses = props.get(KUDU_MASTERS);
        TableSchema physicalSchema = KuduTableUtils.getSchemaWithSqlTimestamp(schema);
        KuduTableInfo tableInfo = KuduTableUtils.createTableInfo(tableName, schema, props);

        KuduWriterConfig.Builder configBuilder = KuduWriterConfig.Builder
                .setMasters(masterAddresses);

        Optional<Long> operationTimeout = properties.getOptionalLong(KUDU_OPERATION_TIMEOUT);
        Optional<Integer> flushInterval = properties.getOptionalInt(KUDU_FLUSH_INTERVAL);
        Optional<Integer> bufferSize = properties.getOptionalInt(KUDU_MAX_BUFFER_SIZE);
        Optional<Boolean> ignoreNotFound = properties.getOptionalBoolean(KUDU_IGNORE_NOT_FOUND);
        Optional<Boolean> ignoreDuplicate = properties.getOptionalBoolean(KUDU_IGNORE_DUPLICATE);

        operationTimeout.ifPresent(time -> configBuilder.setOperationTimeout(time));
        flushInterval.ifPresent(interval -> configBuilder.setFlushInterval(interval));
        bufferSize.ifPresent(size -> configBuilder.setMaxBufferSize(size));
        ignoreNotFound.ifPresent(i -> configBuilder.setIgnoreNotFound(i));
        ignoreDuplicate.ifPresent(i -> configBuilder.setIgnoreDuplicate(i));

        return new KuduTableSink(configBuilder, tableInfo, physicalSchema);
    }