in flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSourceSinkFactory.java [160:182]
public DynamicTableSink createDynamicTableSink(Context context) {
ReadableConfig config = getReadableConfig(context);
String masterAddresses = config.get(KUDU_MASTERS);
String tableName = config.get(KUDU_TABLE);
Optional<Long> operationTimeout = config.getOptional(KUDU_OPERATION_TIMEOUT);
Optional<Integer> flushInterval = config.getOptional(KUDU_FLUSH_INTERVAL);
Optional<Integer> bufferSize = config.getOptional(KUDU_MAX_BUFFER_SIZE);
Optional<Boolean> ignoreNotFound = config.getOptional(KUDU_IGNORE_NOT_FOUND);
Optional<Boolean> ignoreDuplicate = config.getOptional(KUDU_IGNORE_DUPLICATE);
TableSchema schema = context.getCatalogTable().getSchema();
TableSchema physicalSchema = KuduTableUtils.getSchemaWithSqlTimestamp(schema);
KuduTableInfo tableInfo = KuduTableUtils.createTableInfo(tableName, schema,
context.getCatalogTable().toProperties());
KuduWriterConfig.Builder configBuilder = KuduWriterConfig.Builder
.setMasters(masterAddresses);
operationTimeout.ifPresent(configBuilder::setOperationTimeout);
flushInterval.ifPresent(configBuilder::setFlushInterval);
bufferSize.ifPresent(configBuilder::setMaxBufferSize);
ignoreNotFound.ifPresent(configBuilder::setIgnoreNotFound);
ignoreDuplicate.ifPresent(configBuilder::setIgnoreDuplicate);
return new KuduDynamicTableSink(configBuilder, physicalSchema, tableInfo);
}