private void executeMergeWithStreamingServer()

in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sink/AdbpgOutputFormat.java [823:900]


    private void executeMergeWithStreamingServer(List<RowData> rows, StreamingServerRowConverter rowDataConverter, boolean del) throws InterruptedException {
        long start = System.currentTimeMillis();
        int retryTime = 0;
        while(retryTime < maxRetryTime) {
            if(retryTime > 0) {
                LOG.warn("Retry " + retryTime + " times to executeMergeWithStreamingServer");
            }

            ManagedChannel channel = null;
            GpssGrpc.GpssBlockingStub bStub = null;
            Session mSession = null;
            long bps = 0;
            try {
                // check if hostname and port are already set
                if (database == null && hostname == null && port == 0) {
                    extractParametersWithURL(url);
                }

                // connect to ADBSS gRPC service instance; create a channel and a blocking stub
                channel = ManagedChannelBuilder.forAddress(adbssHost, adbssPort).usePlaintext().build();
                bStub = GpssGrpc.newBlockingStub(channel);
                // use the blocking stub to call the Connect service
                mSession = buildSession(bStub, hostname, port, userName, password, database);
                LOG.info("Got streaming server session id:" + mSession.getID());
                // setup column names and use blocking stub to call the Open service
                openForMerge(bStub, mSession, targetSchema, tableName);

                //put Flink-RowData into Adbss-RowData
                List<org.apache.flink.connector.jdbc.table.sink.api.RowData> ssRows = new ArrayList<>();
                for (RowData row : rows) {
                        bps += rowDataConverter.toExternal(row, ssRows);
                }

                /** create a write request builder */
                WriteRequest wReq = WriteRequest.newBuilder()
                        .setSession(mSession)
                        .addAllRows(ssRows)
                        .build();

                // use the blocking stub to call the Write service; it returns nothing
                LOG.info("Start writing row data with adbss...");
                bStub.write(wReq);
                LOG.info("Finished writing row data with adbss...");

                /** create a close request builder */
                TransferStats tStats;
                CloseRequest cReq = CloseRequest.newBuilder()
                        .setSession(mSession)
                        //.setMaxErrorRows(15)
                        .build();
                /** use the blocking stub to call the Close service */
                tStats = bStub.close(cReq);
                /** display the result to stdout */
                LOG.info("CloseRequest tStats: " + tStats.toString());

                long end = System.currentTimeMillis();
                reportMetric(rows, start, end, bps);

                return;
            } catch (Exception e) {
                retryTime++;
                LOG.error("Exception in executeMergeWithStreamingServer: ", e);
            } finally {
                if (channel != null &&
                        bStub != null &&
                        mSession != null) {
                    /** use the blocking stub to call the Disconnect service */
                    LOG.info("Disconnecting adbss with sessionID " + mSession.getID() + " ...");
                    bStub.disconnect(mSession);
                    // shutdown the channel
                    channel.shutdown().awaitTermination(7, TimeUnit.SECONDS);
                }
			}
        }
        if (retryTime >= maxRetryTime) {
            throw new RuntimeException("Failed to executeMergeWithStreamingServer after " + maxRetryTime + " retries");
        }
    }