flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java [108:139]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                cache);
    }

    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        TableFactoryHelper helper = createTableFactoryHelper(this, context);
        helper.validateExcept(PROPERTIES_PREFIX);

        final ReadableConfig tableOptions = helper.getOptions();

        validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes());

        String tableName = tableOptions.get(TABLE_NAME);
        Configuration hbaseConf = getHBaseConfiguration(tableOptions);
        HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions);
        String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
        HBaseTableSchema hbaseSchema =
                HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());

        return new HBaseDynamicTableSink(
                tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, nullStringLiteral);
    }

    @Override
    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        Set<ConfigOption<?>> set = new HashSet<>();
        set.add(TABLE_NAME);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java [110:141]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                cache);
    }

    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        TableFactoryHelper helper = createTableFactoryHelper(this, context);
        helper.validateExcept(PROPERTIES_PREFIX);

        final ReadableConfig tableOptions = helper.getOptions();

        validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes());

        String tableName = tableOptions.get(TABLE_NAME);
        Configuration hbaseConf = getHBaseConfiguration(tableOptions);
        HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions);
        String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
        HBaseTableSchema hbaseSchema =
                HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());

        return new HBaseDynamicTableSink(
                tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, nullStringLiteral);
    }

    @Override
    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        Set<ConfigOption<?>> set = new HashSet<>();
        set.add(TABLE_NAME);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



