in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/offsetfile/FileOffsetStorage.java [451:574]
private boolean checkAndRecoverStgFiles() {
String fileContent;
GroupOffsetStgInfo tmpGroupStgInfo;
File midFile = new File(this.offsetsFileBase + OFFSET_FILENAME_SUFFIX_MID);
File dstFile = new File(this.offsetsFileBase + OFFSET_FILENAME_SUFFIX_FORMAL);
File oldFile = new File(this.offsetsFileBase + OFFSET_FILENAME_SUFFIX_OLD);
// delete tmp file
FileUtils.deleteQuietly(new File(this.offsetsFileBase + OFFSET_FILENAME_SUFFIX_TMP));
try {
if (midFile.exists()) {
fileContent = getConfigFromFile(midFile);
if (fileContent == null) {
logger.error("[File offsets] load mid file {} failure!", midFile.getAbsoluteFile());
return false;
}
tmpGroupStgInfo = getOffsetStgInfoFromConfig(fileContent);
if (tmpGroupStgInfo == null) {
logger.error("[File offsets] parse loaded mid file {} failure!", midFile.getAbsoluteFile());
return false;
}
tmpGroupStgInfo.clear();
if (dstFile.exists()) {
if (!dstFile.delete()) {
logger.error("[File offsets] delete residual file1 {} failed!", dstFile.getAbsoluteFile());
return false;
}
}
if (oldFile.exists()) {
if (!oldFile.delete()) {
logger.error("[File offsets] delete residual file2 {} failed!", oldFile.getAbsoluteFile());
return false;
}
}
if (!storeConfigToFile(fileContent, oldFile)) {
logger.error("[File offsets] backup content to file {} failed!", oldFile.getAbsoluteFile());
return false;
}
if (!oldFile.exists()) {
logger.error("[File offsets] backup file {} not found!", oldFile.getAbsoluteFile());
return false;
}
FileUtils.moveFile(midFile, dstFile);
if (!dstFile.exists()) {
logger.error("[File offsets] moved formal file {} not found!", dstFile.getAbsoluteFile());
return false;
}
if (!oldFile.delete()) {
logger.error("[File offsets] delete backup file {} failure!", oldFile.getAbsoluteFile());
return false;
}
return true;
}
if (dstFile.exists()) {
fileContent = getConfigFromFile(dstFile);
if (fileContent == null) {
logger.error("[File offsets] load formal file {} failure!", dstFile.getAbsoluteFile());
return false;
}
tmpGroupStgInfo = getOffsetStgInfoFromConfig(fileContent);
if (tmpGroupStgInfo == null) {
logger.error("[File offsets] parse loaded formal file {} failure!", dstFile.getAbsoluteFile());
return false;
}
if (oldFile.exists()) {
if (!oldFile.delete()) {
logger.error("[File offsets] delete residual file {} failed!", oldFile.getAbsoluteFile());
return false;
}
}
return true;
}
if (oldFile.exists()) {
fileContent = getConfigFromFile(oldFile);
if (fileContent == null) {
logger.error("[File offsets] load backup file {} failure!",
oldFile.getAbsoluteFile());
return false;
}
tmpGroupStgInfo = getOffsetStgInfoFromConfig(fileContent);
if (tmpGroupStgInfo == null) {
logger.error("[File offsets] parse loaded backup content {} failure!",
oldFile.getAbsoluteFile());
return false;
}
tmpGroupStgInfo.clear();
if (!storeConfigToFile(fileContent, dstFile)) {
logger.error("[File offsets] recover formal file {} failed!",
dstFile.getAbsoluteFile());
return false;
}
if (!dstFile.exists()) {
logger.error("[File offsets] recovered formal file {} not found!",
dstFile.getAbsoluteFile());
return false;
}
fileContent = getConfigFromFile(dstFile);
if (fileContent == null) {
logger.error("[File offsets] load recovered file {} failure!",
dstFile.getAbsoluteFile());
return false;
}
tmpGroupStgInfo = getOffsetStgInfoFromConfig(fileContent);
if (tmpGroupStgInfo == null) {
logger.error("[File offsets] parse recovered file content {} failure!",
dstFile.getAbsoluteFile());
return false;
}
tmpGroupStgInfo.clear();
if (!oldFile.delete()) {
logger.error("[File offsets] delete backup file {} failed!",
oldFile.getAbsoluteFile());
return false;
}
}
return true;
} catch (Throwable ex) {
if (ex instanceof IOException || ex instanceof SecurityException) {
ServiceStatusHolder.addWriteIOErrCnt();
BrokerSrvStatsHolder.incDiskIOExcCnt();
}
logger.error("[File offsets] recover offset storage files failed!", ex);
return false;
}
}