in src/main/java/org/apache/doris/kafka/connector/writer/CopyIntoWriter.java [78:103]
public List<String> listLoadFiles() {
final String SQL_TEMPLATE =
"SHOW COPY FROM %s WHERE TABLENAME = '%s' AND STATE = 'FINISHED' AND FILES LIKE '%%%s%%' ORDER BY CREATETIME DESC LIMIT 100";
final String filePrefix =
FileNameUtils.filePrefix(dorisOptions.getName(), topic, partition);
String offsetQuery = String.format(SQL_TEMPLATE, dbName, tableName, filePrefix);
LOG.info("query offset by sql: {}", offsetQuery);
List<String> loadFileList = new ArrayList<>();
try (Connection connection = connectionProvider.getOrEstablishConnection();
PreparedStatement ps = connection.prepareStatement(offsetQuery);
ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
String filesStr = rs.getString("Files");
String[] files = objectMapper.readValue(filesStr, String[].class);
loadFileList.addAll(Arrays.asList(files));
}
} catch (Exception ex) {
LOG.warn(
"Failed to get copy-into file name, causing the doris kafka connector to not guarantee exactly once.",
ex);
throw new CopyLoadException(
"Failed to get copy-into file name, causing the doris kafka connector to not guarantee exactly once.",
ex);
}
return loadFileList;
}