in eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkIncrementConnector.java [531:770]
private Exception doCall() {
RuntimeException error = null;
ExecuteResult exeResult = null;
if (sinkConfig.isGTIDMode()) {
int retryCount = 0;
final List<CanalConnectRecord> toExecuteRecords = new ArrayList<>();
try {
if (!CollectionUtils.isEmpty(failedRecords)) {
// if failedRecords not empty, make it retry
toExecuteRecords.addAll(failedRecords);
} else {
toExecuteRecords.addAll(records);
// add to failed record first, maybe get lob or datasource error
failedRecords.addAll(toExecuteRecords);
}
JdbcTemplate template = dbDialect.getJdbcTemplate();
String sourceGtid = context.getGtid();
if (StringUtils.isNotEmpty(sourceGtid) && !sinkConfig.isMariaDB()) {
String setMySQLGtid = "SET @@session.gtid_next = '" + sourceGtid + "';";
template.execute(setMySQLGtid);
} else if (StringUtils.isNotEmpty(sourceGtid) && sinkConfig.isMariaDB()) {
throw new RuntimeException("unsupport gtid mode for mariaDB");
} else {
log.error("gtid is empty in gtid mode");
throw new RuntimeException("gtid is empty in gtid mode");
}
final LobCreator lobCreator = dbDialect.getLobHandler().getLobCreator();
int affect = (Integer) dbDialect.getTransactionTemplate().execute((TransactionCallback) status -> {
try {
failedRecords.clear();
processedRecords.clear();
int affect1 = 0;
for (CanalConnectRecord record : toExecuteRecords) {
int affects = template.update(record.getSql(), new PreparedStatementSetter() {
public void setValues(PreparedStatement ps) throws SQLException {
doPreparedStatement(ps, dbDialect, lobCreator, record);
}
});
affect1 = affect1 + affects;
processStat(record, affects, false);
}
return affect1;
} catch (Exception e) {
// rollback
status.setRollbackOnly();
throw new RuntimeException("Failed to executed", e);
} finally {
lobCreator.close();
}
});
// reset gtid
if (sinkConfig.isMariaDB()) {
throw new RuntimeException("unsupport gtid mode for mariaDB");
} else {
String resetMySQLGtid = "SET @@session.gtid_next = 'AUTOMATIC';";
dbDialect.getJdbcTemplate().execute(resetMySQLGtid);
}
error = null;
exeResult = ExecuteResult.SUCCESS;
} catch (DeadlockLoserDataAccessException ex) {
error = new RuntimeException(ExceptionUtils.getFullStackTrace(ex));
exeResult = ExecuteResult.RETRY;
} catch (Throwable ex) {
error = new RuntimeException(ExceptionUtils.getFullStackTrace(ex));
exeResult = ExecuteResult.ERROR;
}
if (ExecuteResult.SUCCESS == exeResult) {
allFailedRecords.addAll(failedRecords);
allProcessedRecords.addAll(processedRecords);
failedRecords.clear();
processedRecords.clear();
} else if (ExecuteResult.RETRY == exeResult) {
retryCount = retryCount + 1;
processedRecords.clear();
failedRecords.clear();
failedRecords.addAll(toExecuteRecords);
int retry = 3;
if (retryCount >= retry) {
processFailedDatas(toExecuteRecords.size());
throw new RuntimeException(String.format("execute retry %s times failed", retryCount), error);
} else {
try {
int retryWait = 3000;
int wait = retryCount * retryWait;
wait = Math.max(wait, retryWait);
Thread.sleep(wait);
} catch (InterruptedException ex) {
Thread.interrupted();
processFailedDatas(toExecuteRecords.size());
throw new RuntimeException(ex);
}
}
} else {
processedRecords.clear();
failedRecords.clear();
failedRecords.addAll(toExecuteRecords);
processFailedDatas(toExecuteRecords.size());
throw error;
}
} else {
int index = 0;
while (index < records.size()) {
final List<CanalConnectRecord> toExecuteRecords = new ArrayList<>();
if (useBatch && canBatch) {
int end = Math.min(index + batchSize, records.size());
toExecuteRecords.addAll(records.subList(index, end));
index = end;
} else {
toExecuteRecords.add(records.get(index));
index = index + 1;
}
int retryCount = 0;
while (true) {
try {
if (!CollectionUtils.isEmpty(failedRecords)) {
toExecuteRecords.clear();
toExecuteRecords.addAll(failedRecords);
} else {
failedRecords.addAll(toExecuteRecords);
}
final LobCreator lobCreator = dbDialect.getLobHandler().getLobCreator();
if (useBatch && canBatch) {
JdbcTemplate template = dbDialect.getJdbcTemplate();
final String sql = toExecuteRecords.get(0).getSql();
int[] affects = new int[toExecuteRecords.size()];
affects = (int[]) dbDialect.getTransactionTemplate().execute((TransactionCallback) status -> {
try {
failedRecords.clear();
processedRecords.clear();
int[] affects1 = template.batchUpdate(sql, new BatchPreparedStatementSetter() {
public void setValues(PreparedStatement ps, int idx) throws SQLException {
doPreparedStatement(ps, dbDialect, lobCreator, toExecuteRecords.get(idx));
}
public int getBatchSize() {
return toExecuteRecords.size();
}
});
return affects1;
} catch (Exception e) {
// rollback
status.setRollbackOnly();
throw new RuntimeException("Failed to execute batch ", e);
} finally {
lobCreator.close();
}
});
for (int i = 0; i < toExecuteRecords.size(); i++) {
assert affects != null;
processStat(toExecuteRecords.get(i), affects[i], true);
}
} else {
final CanalConnectRecord record = toExecuteRecords.get(0);
JdbcTemplate template = dbDialect.getJdbcTemplate();
int affect = 0;
affect = (Integer) dbDialect.getTransactionTemplate().execute((TransactionCallback) status -> {
try {
failedRecords.clear();
processedRecords.clear();
int affect1 = template.update(record.getSql(), new PreparedStatementSetter() {
public void setValues(PreparedStatement ps) throws SQLException {
doPreparedStatement(ps, dbDialect, lobCreator, record);
}
});
return affect1;
} catch (Exception e) {
// rollback
status.setRollbackOnly();
throw new RuntimeException("Failed to executed", e);
} finally {
lobCreator.close();
}
});
processStat(record, affect, false);
}
error = null;
exeResult = ExecuteResult.SUCCESS;
} catch (DeadlockLoserDataAccessException ex) {
error = new RuntimeException(ExceptionUtils.getFullStackTrace(ex));
exeResult = ExecuteResult.RETRY;
} catch (Throwable ex) {
error = new RuntimeException(ExceptionUtils.getFullStackTrace(ex));
exeResult = ExecuteResult.ERROR;
}
if (ExecuteResult.SUCCESS == exeResult) {
allFailedRecords.addAll(failedRecords);
allProcessedRecords.addAll(processedRecords);
failedRecords.clear();
processedRecords.clear();
break; // do next eventData
} else if (ExecuteResult.RETRY == exeResult) {
retryCount = retryCount + 1;
processedRecords.clear();
failedRecords.clear();
failedRecords.addAll(toExecuteRecords);
int retry = 3;
if (retryCount >= retry) {
processFailedDatas(index);
throw new RuntimeException(String.format("execute retry %s times failed", retryCount), error);
} else {
try {
int retryWait = 3000;
int wait = retryCount * retryWait;
wait = Math.max(wait, retryWait);
Thread.sleep(wait);
} catch (InterruptedException ex) {
Thread.interrupted();
processFailedDatas(index);
throw new RuntimeException(ex);
}
}
} else {
processedRecords.clear();
failedRecords.clear();
failedRecords.addAll(toExecuteRecords);
processFailedDatas(index);
throw error;
}
}
}
}
context.getFailedRecords().addAll(allFailedRecords);
context.getProcessedRecords().addAll(allProcessedRecords);
return null;
}