in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sink/AdbpgOutputFormat.java [823:900]
private void executeMergeWithStreamingServer(List<RowData> rows, StreamingServerRowConverter rowDataConverter, boolean del) throws InterruptedException {
long start = System.currentTimeMillis();
int retryTime = 0;
while(retryTime < maxRetryTime) {
if(retryTime > 0) {
LOG.warn("Retry " + retryTime + " times to executeMergeWithStreamingServer");
}
ManagedChannel channel = null;
GpssGrpc.GpssBlockingStub bStub = null;
Session mSession = null;
long bps = 0;
try {
// check if hostname and port are already set
if (database == null && hostname == null && port == 0) {
extractParametersWithURL(url);
}
// connect to ADBSS gRPC service instance; create a channel and a blocking stub
channel = ManagedChannelBuilder.forAddress(adbssHost, adbssPort).usePlaintext().build();
bStub = GpssGrpc.newBlockingStub(channel);
// use the blocking stub to call the Connect service
mSession = buildSession(bStub, hostname, port, userName, password, database);
LOG.info("Got streaming server session id:" + mSession.getID());
// setup column names and use blocking stub to call the Open service
openForMerge(bStub, mSession, targetSchema, tableName);
//put Flink-RowData into Adbss-RowData
List<org.apache.flink.connector.jdbc.table.sink.api.RowData> ssRows = new ArrayList<>();
for (RowData row : rows) {
bps += rowDataConverter.toExternal(row, ssRows);
}
/** create a write request builder */
WriteRequest wReq = WriteRequest.newBuilder()
.setSession(mSession)
.addAllRows(ssRows)
.build();
// use the blocking stub to call the Write service; it returns nothing
LOG.info("Start writing row data with adbss...");
bStub.write(wReq);
LOG.info("Finished writing row data with adbss...");
/** create a close request builder */
TransferStats tStats;
CloseRequest cReq = CloseRequest.newBuilder()
.setSession(mSession)
//.setMaxErrorRows(15)
.build();
/** use the blocking stub to call the Close service */
tStats = bStub.close(cReq);
/** display the result to stdout */
LOG.info("CloseRequest tStats: " + tStats.toString());
long end = System.currentTimeMillis();
reportMetric(rows, start, end, bps);
return;
} catch (Exception e) {
retryTime++;
LOG.error("Exception in executeMergeWithStreamingServer: ", e);
} finally {
if (channel != null &&
bStub != null &&
mSession != null) {
/** use the blocking stub to call the Disconnect service */
LOG.info("Disconnecting adbss with sessionID " + mSession.getID() + " ...");
bStub.disconnect(mSession);
// shutdown the channel
channel.shutdown().awaitTermination(7, TimeUnit.SECONDS);
}
}
}
if (retryTime >= maxRetryTime) {
throw new RuntimeException("Failed to executeMergeWithStreamingServer after " + maxRetryTime + " retries");
}
}