public DynamicTableSink createDynamicTableSink()

in flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/KuduDynamicTableFactory.java [95:116]


    public DynamicTableSink createDynamicTableSink(Context context) {
        final ReadableConfig config = getValidatedConfig(context);

        final String tableName =
                config.getOptional(TABLE_NAME)
                        .orElse(context.getObjectIdentifier().getObjectName());
        final ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
        final KuduTableInfo tableInfo =
                KuduTableUtils.createTableInfo(
                        tableName, schema, context.getCatalogTable().toProperties());

        final KuduWriterConfig.Builder configBuilder =
                KuduWriterConfig.Builder.setMasters(config.get(MASTERS))
                        .setOperationTimeout(config.get(OPERATION_TIMEOUT).toMillis())
                        .setConsistency(config.get(FLUSH_MODE))
                        .setFlushInterval((int) config.get(FLUSH_INTERVAL).toMillis())
                        .setMaxBufferSize(config.get(MAX_BUFFER_SIZE))
                        .setIgnoreNotFound(config.get(IGNORE_NOT_FOUND))
                        .setIgnoreDuplicate(config.get(IGNORE_DUPLICATE));

        return new KuduDynamicTableSink(configBuilder, tableInfo, schema);
    }