in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sink/AdbpgOutputFormat.java [785:821]
public void openForMerge(GpssGrpc.GpssBlockingStub bStub, Session mSession, String schemaName, String tableName) {
/** open a table for write */
// create an insert option builder
MergeOption.Builder mBuilder = MergeOption.newBuilder();
for (String pk: primaryFieldNamesStr) {
mBuilder.addInsertColumns(pk);
mBuilder.addMatchColumns(pk);
}
for (String nonPk: nonPrimaryFieldNamesStr) {
mBuilder.addInsertColumns(nonPk);
mBuilder.addUpdateColumns(nonPk);
}
// TODO MergeOption.Builder.setCondition, MergeOption.Builder.setErrorLimit, MergeOption.Builder.setErrorLimitPercentage
// mBuilder.setErrorLimitCount();
// mBuilder.setErrorLimitPercentage();
// mBuilder.setCondition();
mBuilder.setEnablePrimaryKeyOnTempTable(true);
MergeOption mOpt = mBuilder.build();
// create an open request builder
OpenRequest oReq = OpenRequest.newBuilder()
.setSession(mSession)
.setSchemaName(schemaName)
.setTableName(tableName)
//.setPreSQL("")
//.setPostSQL("")
//.setEncoding("")
//.setStagingSchema("")
.setMergeOption(mOpt)
.build();
// use the blocking stub to call the Open service; it returns nothing
bStub.open(oReq);
}