public void openForMerge()

in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sink/AdbpgOutputFormat.java [785:821]


    public void openForMerge(GpssGrpc.GpssBlockingStub bStub, Session mSession, String schemaName, String tableName) {
        /** open a table for write */
        // create an insert option builder
        MergeOption.Builder mBuilder = MergeOption.newBuilder();


        for (String pk: primaryFieldNamesStr) {
            mBuilder.addInsertColumns(pk);
            mBuilder.addMatchColumns(pk);
        }
        for (String nonPk: nonPrimaryFieldNamesStr) {
            mBuilder.addInsertColumns(nonPk);
            mBuilder.addUpdateColumns(nonPk);
        }
        // TODO MergeOption.Builder.setCondition,  MergeOption.Builder.setErrorLimit, MergeOption.Builder.setErrorLimitPercentage
//        mBuilder.setErrorLimitCount();
//        mBuilder.setErrorLimitPercentage();
//        mBuilder.setCondition();
        mBuilder.setEnablePrimaryKeyOnTempTable(true);

        MergeOption mOpt = mBuilder.build();

        // create an open request builder
        OpenRequest oReq = OpenRequest.newBuilder()
                .setSession(mSession)
                .setSchemaName(schemaName)
                .setTableName(tableName)
                //.setPreSQL("")
                //.setPostSQL("")
                //.setEncoding("")
                //.setStagingSchema("")
                .setMergeOption(mOpt)
                .build();

        // use the blocking stub to call the Open service; it returns nothing
        bStub.open(oReq);
    }