private Exception doCall()

in eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkIncrementConnector.java [531:770]


        private Exception doCall() {
            RuntimeException error = null;
            ExecuteResult exeResult = null;

            if (sinkConfig.isGTIDMode()) {
                int retryCount = 0;
                final List<CanalConnectRecord> toExecuteRecords = new ArrayList<>();
                try {
                    if (!CollectionUtils.isEmpty(failedRecords)) {
                        // if failedRecords not empty, make it retry
                        toExecuteRecords.addAll(failedRecords);
                    } else {
                        toExecuteRecords.addAll(records);
                        // add to failed record first, maybe get lob or datasource error
                        failedRecords.addAll(toExecuteRecords);
                    }
                    JdbcTemplate template = dbDialect.getJdbcTemplate();
                    String sourceGtid = context.getGtid();
                    if (StringUtils.isNotEmpty(sourceGtid) && !sinkConfig.isMariaDB()) {
                        String setMySQLGtid = "SET @@session.gtid_next = '" + sourceGtid + "';";
                        template.execute(setMySQLGtid);
                    } else if (StringUtils.isNotEmpty(sourceGtid) && sinkConfig.isMariaDB()) {
                        throw new RuntimeException("unsupport gtid mode for mariaDB");
                    } else {
                        log.error("gtid is empty in gtid mode");
                        throw new RuntimeException("gtid is empty in gtid mode");
                    }

                    final LobCreator lobCreator = dbDialect.getLobHandler().getLobCreator();
                    int affect = (Integer) dbDialect.getTransactionTemplate().execute((TransactionCallback) status -> {
                        try {
                            failedRecords.clear();
                            processedRecords.clear();
                            int affect1 = 0;
                            for (CanalConnectRecord record : toExecuteRecords) {
                                int affects = template.update(record.getSql(), new PreparedStatementSetter() {
                                    public void setValues(PreparedStatement ps) throws SQLException {
                                        doPreparedStatement(ps, dbDialect, lobCreator, record);
                                    }
                                });
                                affect1 = affect1 + affects;
                                processStat(record, affects, false);
                            }
                            return affect1;
                        } catch (Exception e) {
                            // rollback
                            status.setRollbackOnly();
                            throw new RuntimeException("Failed to executed", e);
                        } finally {
                            lobCreator.close();
                        }
                    });

                    // reset gtid
                    if (sinkConfig.isMariaDB()) {
                        throw new RuntimeException("unsupport gtid mode for mariaDB");
                    } else {
                        String resetMySQLGtid = "SET @@session.gtid_next = 'AUTOMATIC';";
                        dbDialect.getJdbcTemplate().execute(resetMySQLGtid);
                    }

                    error = null;
                    exeResult = ExecuteResult.SUCCESS;
                } catch (DeadlockLoserDataAccessException ex) {
                    error = new RuntimeException(ExceptionUtils.getFullStackTrace(ex));
                    exeResult = ExecuteResult.RETRY;
                } catch (Throwable ex) {
                    error = new RuntimeException(ExceptionUtils.getFullStackTrace(ex));
                    exeResult = ExecuteResult.ERROR;
                }

                if (ExecuteResult.SUCCESS == exeResult) {
                    allFailedRecords.addAll(failedRecords);
                    allProcessedRecords.addAll(processedRecords);
                    failedRecords.clear();
                    processedRecords.clear();
                } else if (ExecuteResult.RETRY == exeResult) {
                    retryCount = retryCount + 1;
                    processedRecords.clear();
                    failedRecords.clear();
                    failedRecords.addAll(toExecuteRecords);
                    int retry = 3;
                    if (retryCount >= retry) {
                        processFailedDatas(toExecuteRecords.size());
                        throw new RuntimeException(String.format("execute retry %s times failed", retryCount), error);
                    } else {
                        try {
                            int retryWait = 3000;
                            int wait = retryCount * retryWait;
                            wait = Math.max(wait, retryWait);
                            Thread.sleep(wait);
                        } catch (InterruptedException ex) {
                            Thread.interrupted();
                            processFailedDatas(toExecuteRecords.size());
                            throw new RuntimeException(ex);
                        }
                    }
                } else {
                    processedRecords.clear();
                    failedRecords.clear();
                    failedRecords.addAll(toExecuteRecords);
                    processFailedDatas(toExecuteRecords.size());
                    throw error;
                }
            } else {
                int index = 0;
                while (index < records.size()) {
                    final List<CanalConnectRecord> toExecuteRecords = new ArrayList<>();
                    if (useBatch && canBatch) {
                        int end = Math.min(index + batchSize, records.size());
                        toExecuteRecords.addAll(records.subList(index, end));
                        index = end;
                    } else {
                        toExecuteRecords.add(records.get(index));
                        index = index + 1;
                    }

                    int retryCount = 0;
                    while (true) {
                        try {
                            if (!CollectionUtils.isEmpty(failedRecords)) {
                                toExecuteRecords.clear();
                                toExecuteRecords.addAll(failedRecords);
                            } else {
                                failedRecords.addAll(toExecuteRecords);
                            }

                            final LobCreator lobCreator = dbDialect.getLobHandler().getLobCreator();
                            if (useBatch && canBatch) {
                                JdbcTemplate template = dbDialect.getJdbcTemplate();
                                final String sql = toExecuteRecords.get(0).getSql();

                                int[] affects = new int[toExecuteRecords.size()];

                                affects = (int[]) dbDialect.getTransactionTemplate().execute((TransactionCallback) status -> {
                                    try {
                                        failedRecords.clear();
                                        processedRecords.clear();
                                        int[] affects1 = template.batchUpdate(sql, new BatchPreparedStatementSetter() {

                                            public void setValues(PreparedStatement ps, int idx) throws SQLException {
                                                doPreparedStatement(ps, dbDialect, lobCreator, toExecuteRecords.get(idx));
                                            }

                                            public int getBatchSize() {
                                                return toExecuteRecords.size();
                                            }
                                        });
                                        return affects1;
                                    } catch (Exception e) {
                                        // rollback
                                        status.setRollbackOnly();
                                        throw new RuntimeException("Failed to execute batch ", e);
                                    } finally {
                                        lobCreator.close();
                                    }
                                });

                                for (int i = 0; i < toExecuteRecords.size(); i++) {
                                    assert affects != null;
                                    processStat(toExecuteRecords.get(i), affects[i], true);
                                }
                            } else {
                                final CanalConnectRecord record = toExecuteRecords.get(0);
                                JdbcTemplate template = dbDialect.getJdbcTemplate();
                                int affect = 0;
                                affect = (Integer) dbDialect.getTransactionTemplate().execute((TransactionCallback) status -> {
                                    try {
                                        failedRecords.clear();
                                        processedRecords.clear();
                                        int affect1 = template.update(record.getSql(), new PreparedStatementSetter() {

                                            public void setValues(PreparedStatement ps) throws SQLException {
                                                doPreparedStatement(ps, dbDialect, lobCreator, record);
                                            }
                                        });
                                        return affect1;
                                    } catch (Exception e) {
                                        // rollback
                                        status.setRollbackOnly();
                                        throw new RuntimeException("Failed to executed", e);
                                    } finally {
                                        lobCreator.close();
                                    }
                                });
                                processStat(record, affect, false);
                            }

                            error = null;
                            exeResult = ExecuteResult.SUCCESS;
                        } catch (DeadlockLoserDataAccessException ex) {
                            error = new RuntimeException(ExceptionUtils.getFullStackTrace(ex));
                            exeResult = ExecuteResult.RETRY;
                        } catch (Throwable ex) {
                            error = new RuntimeException(ExceptionUtils.getFullStackTrace(ex));
                            exeResult = ExecuteResult.ERROR;
                        }

                        if (ExecuteResult.SUCCESS == exeResult) {
                            allFailedRecords.addAll(failedRecords);
                            allProcessedRecords.addAll(processedRecords);
                            failedRecords.clear();
                            processedRecords.clear();
                            break; // do next eventData
                        } else if (ExecuteResult.RETRY == exeResult) {
                            retryCount = retryCount + 1;
                            processedRecords.clear();
                            failedRecords.clear();
                            failedRecords.addAll(toExecuteRecords);
                            int retry = 3;
                            if (retryCount >= retry) {
                                processFailedDatas(index);
                                throw new RuntimeException(String.format("execute retry %s times failed", retryCount), error);
                            } else {
                                try {
                                    int retryWait = 3000;
                                    int wait = retryCount * retryWait;
                                    wait = Math.max(wait, retryWait);
                                    Thread.sleep(wait);
                                } catch (InterruptedException ex) {
                                    Thread.interrupted();
                                    processFailedDatas(index);
                                    throw new RuntimeException(ex);
                                }
                            }
                        } else {
                            processedRecords.clear();
                            failedRecords.clear();
                            failedRecords.addAll(toExecuteRecords);
                            processFailedDatas(index);
                            throw error;
                        }
                    }
                }
            }

            context.getFailedRecords().addAll(allFailedRecords);
            context.getProcessedRecords().addAll(allProcessedRecords);
            return null;
        }