in flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/KuduDynamicTableFactory.java [95:116]
public DynamicTableSink createDynamicTableSink(Context context) {
final ReadableConfig config = getValidatedConfig(context);
final String tableName =
config.getOptional(TABLE_NAME)
.orElse(context.getObjectIdentifier().getObjectName());
final ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
final KuduTableInfo tableInfo =
KuduTableUtils.createTableInfo(
tableName, schema, context.getCatalogTable().toProperties());
final KuduWriterConfig.Builder configBuilder =
KuduWriterConfig.Builder.setMasters(config.get(MASTERS))
.setOperationTimeout(config.get(OPERATION_TIMEOUT).toMillis())
.setConsistency(config.get(FLUSH_MODE))
.setFlushInterval((int) config.get(FLUSH_INTERVAL).toMillis())
.setMaxBufferSize(config.get(MAX_BUFFER_SIZE))
.setIgnoreNotFound(config.get(IGNORE_NOT_FOUND))
.setIgnoreDuplicate(config.get(IGNORE_DUPLICATE));
return new KuduDynamicTableSink(configBuilder, tableInfo, schema);
}