in flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java [135:158]
private KuduTableSink createTableSink(String tableName, TableSchema schema, Map<String, String> props) {
DescriptorProperties properties = new DescriptorProperties();
properties.putProperties(props);
String masterAddresses = props.get(KUDU_MASTERS);
TableSchema physicalSchema = KuduTableUtils.getSchemaWithSqlTimestamp(schema);
KuduTableInfo tableInfo = KuduTableUtils.createTableInfo(tableName, schema, props);
KuduWriterConfig.Builder configBuilder = KuduWriterConfig.Builder
.setMasters(masterAddresses);
Optional<Long> operationTimeout = properties.getOptionalLong(KUDU_OPERATION_TIMEOUT);
Optional<Integer> flushInterval = properties.getOptionalInt(KUDU_FLUSH_INTERVAL);
Optional<Integer> bufferSize = properties.getOptionalInt(KUDU_MAX_BUFFER_SIZE);
Optional<Boolean> ignoreNotFound = properties.getOptionalBoolean(KUDU_IGNORE_NOT_FOUND);
Optional<Boolean> ignoreDuplicate = properties.getOptionalBoolean(KUDU_IGNORE_DUPLICATE);
operationTimeout.ifPresent(time -> configBuilder.setOperationTimeout(time));
flushInterval.ifPresent(interval -> configBuilder.setFlushInterval(interval));
bufferSize.ifPresent(size -> configBuilder.setMaxBufferSize(size));
ignoreNotFound.ifPresent(i -> configBuilder.setIgnoreNotFound(i));
ignoreDuplicate.ifPresent(i -> configBuilder.setIgnoreDuplicate(i));
return new KuduTableSink(configBuilder, tableInfo, physicalSchema);
}