in inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java [626:736]
protected void attemptFlush() throws IOException {
for (Map.Entry<String, List<GenericRowData>> entry : recordsMap.entrySet()) {
String tableIdentifier = entry.getKey();
boolean stopTableIdentifierWhenException = stopWritingWhenTableException
&& (null != tableExceptionMap.get(tableIdentifier));
if (stopTableIdentifierWhenException) {
continue;
}
List<GenericRowData> tableIdRecordList = entry.getValue();
if (CollectionUtils.isEmpty(tableIdRecordList)) {
continue;
}
JdbcExec jdbcStatementExecutor;
Boolean flushFlag = false;
Exception tableException = null;
try {
getAndSetPkNamesFromDb(tableIdentifier);
jdbcStatementExecutor = getOrCreateStatementExecutor(tableIdentifier);
Long totalDataSize = 0L;
for (GenericRowData record : tableIdRecordList) {
totalDataSize = totalDataSize + record.toString().getBytes(StandardCharsets.UTF_8).length;
jdbcStatementExecutor.addToBatch((JdbcIn) record);
}
if (dirtySinkHelper.getDirtySink() != null) {
fillDirtyData(jdbcStatementExecutor, tableIdentifier);
}
jdbcStatementExecutor.executeBatch();
flushFlag = true;
if (dirtySinkHelper.getDirtySink() == null) {
outputMetrics(tableIdentifier, Long.valueOf(tableIdRecordList.size()),
totalDataSize, false);
} else {
try {
outputMetrics(tableIdentifier);
} catch (Exception e) {
outputMetrics(tableIdentifier, Long.valueOf(tableIdRecordList.size()),
totalDataSize, false);
}
}
} catch (Exception e) {
tableException = e;
LOG.warn("Flush all data for tableIdentifier:{} get err:", tableIdentifier, e);
getAndSetPkFromErrMsg(e.getMessage(), tableIdentifier);
updateOneExecutor(true, tableIdentifier);
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IOException(
"unable to flush; interrupted while doing another attempt", e);
}
}
if (!flushFlag) {
for (GenericRowData record : tableIdRecordList) {
for (int retryTimes = 1; retryTimes <= executionOptions.getMaxRetries(); retryTimes++) {
try {
jdbcStatementExecutor = getOrCreateStatementExecutor(tableIdentifier);
jdbcStatementExecutor.addToBatch((JdbcIn) record);
jdbcStatementExecutor.executeBatch();
Long totalDataSize =
Long.valueOf(record.toString().getBytes(StandardCharsets.UTF_8).length);
if (dirtySinkHelper.getDirtySink() == null) {
outputMetrics(tableIdentifier, (long) tableIdRecordList.size(),
totalDataSize, false);
} else {
try {
outputMetrics(tableIdentifier);
} catch (Exception e) {
LOG.error("JDBC table metric calculation exception", e);
outputMetrics(tableIdentifier, (long) tableIdRecordList.size(),
totalDataSize, false);
}
}
flushFlag = true;
break;
} catch (Exception e) {
LOG.warn("Flush one record tableIdentifier:{} ,retryTimes:{} get err:",
tableIdentifier, retryTimes, e);
getAndSetPkFromErrMsg(e.getMessage(), tableIdentifier);
tableException = e;
updateOneExecutor(true, tableIdentifier);
try {
Thread.sleep(1000 * retryTimes);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IOException(
"unable to flush; interrupted while doing another attempt", e);
}
}
}
if (!flushFlag && null != tableException) {
LOG.info("Put tableIdentifier:{} exception:{}",
tableIdentifier, tableException.getMessage());
if (dirtySinkHelper.getDirtySink() == null &&
!schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.THROW_WITH_STOP)) {
outputMetrics(tableIdentifier, Long.valueOf(tableIdRecordList.size()),
1L, true);
}
tableExceptionMap.put(tableIdentifier, tableException);
if (stopWritingWhenTableException) {
LOG.info("Stop write table:{} because occur exception",
tableIdentifier);
break;
}
}
}
}
tableIdRecordList.clear();
}
}