protected void attemptFlush()

in inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java [626:736]


    protected void attemptFlush() throws IOException {
        for (Map.Entry<String, List<GenericRowData>> entry : recordsMap.entrySet()) {
            String tableIdentifier = entry.getKey();
            boolean stopTableIdentifierWhenException = stopWritingWhenTableException
                    && (null != tableExceptionMap.get(tableIdentifier));
            if (stopTableIdentifierWhenException) {
                continue;
            }
            List<GenericRowData> tableIdRecordList = entry.getValue();
            if (CollectionUtils.isEmpty(tableIdRecordList)) {
                continue;
            }
            JdbcExec jdbcStatementExecutor;
            Boolean flushFlag = false;
            Exception tableException = null;
            try {
                getAndSetPkNamesFromDb(tableIdentifier);
                jdbcStatementExecutor = getOrCreateStatementExecutor(tableIdentifier);
                Long totalDataSize = 0L;
                for (GenericRowData record : tableIdRecordList) {
                    totalDataSize = totalDataSize + record.toString().getBytes(StandardCharsets.UTF_8).length;
                    jdbcStatementExecutor.addToBatch((JdbcIn) record);
                }
                if (dirtySinkHelper.getDirtySink() != null) {
                    fillDirtyData(jdbcStatementExecutor, tableIdentifier);
                }
                jdbcStatementExecutor.executeBatch();
                flushFlag = true;
                if (dirtySinkHelper.getDirtySink() == null) {
                    outputMetrics(tableIdentifier, Long.valueOf(tableIdRecordList.size()),
                            totalDataSize, false);
                } else {
                    try {
                        outputMetrics(tableIdentifier);
                    } catch (Exception e) {
                        outputMetrics(tableIdentifier, Long.valueOf(tableIdRecordList.size()),
                                totalDataSize, false);
                    }
                }
            } catch (Exception e) {
                tableException = e;
                LOG.warn("Flush all data for tableIdentifier:{} get err:", tableIdentifier, e);
                getAndSetPkFromErrMsg(e.getMessage(), tableIdentifier);
                updateOneExecutor(true, tableIdentifier);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    throw new IOException(
                            "unable to flush; interrupted while doing another attempt", e);
                }
            }

            if (!flushFlag) {
                for (GenericRowData record : tableIdRecordList) {
                    for (int retryTimes = 1; retryTimes <= executionOptions.getMaxRetries(); retryTimes++) {
                        try {
                            jdbcStatementExecutor = getOrCreateStatementExecutor(tableIdentifier);
                            jdbcStatementExecutor.addToBatch((JdbcIn) record);
                            jdbcStatementExecutor.executeBatch();
                            Long totalDataSize =
                                    Long.valueOf(record.toString().getBytes(StandardCharsets.UTF_8).length);
                            if (dirtySinkHelper.getDirtySink() == null) {
                                outputMetrics(tableIdentifier, (long) tableIdRecordList.size(),
                                        totalDataSize, false);
                            } else {
                                try {
                                    outputMetrics(tableIdentifier);
                                } catch (Exception e) {
                                    LOG.error("JDBC table metric calculation exception", e);
                                    outputMetrics(tableIdentifier, (long) tableIdRecordList.size(),
                                            totalDataSize, false);
                                }
                            }
                            flushFlag = true;
                            break;
                        } catch (Exception e) {
                            LOG.warn("Flush one record tableIdentifier:{} ,retryTimes:{} get err:",
                                    tableIdentifier, retryTimes, e);
                            getAndSetPkFromErrMsg(e.getMessage(), tableIdentifier);
                            tableException = e;
                            updateOneExecutor(true, tableIdentifier);
                            try {
                                Thread.sleep(1000 * retryTimes);
                            } catch (InterruptedException ex) {
                                Thread.currentThread().interrupt();
                                throw new IOException(
                                        "unable to flush; interrupted while doing another attempt", e);
                            }
                        }
                    }
                    if (!flushFlag && null != tableException) {
                        LOG.info("Put tableIdentifier:{} exception:{}",
                                tableIdentifier, tableException.getMessage());
                        if (dirtySinkHelper.getDirtySink() == null &&
                                !schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.THROW_WITH_STOP)) {
                            outputMetrics(tableIdentifier, Long.valueOf(tableIdRecordList.size()),
                                    1L, true);
                        }
                        tableExceptionMap.put(tableIdentifier, tableException);
                        if (stopWritingWhenTableException) {
                            LOG.info("Stop write table:{} because occur exception",
                                    tableIdentifier);
                            break;
                        }
                    }
                }
            }
            tableIdRecordList.clear();
        }
    }