in fluss-server/src/main/java/com/alibaba/fluss/server/log/remote/RemoteLogIndexCache.java [351:449]
private void init() throws IOException {
long start = System.currentTimeMillis();
try {
Files.createDirectory(cacheDir.toPath());
LOG.info("Created new file {} for RemoteIndexCache", cacheDir);
} catch (FileAlreadyExistsException e) {
LOG.info(
"RemoteIndexCache directory {} already exists. Re-using the same directory.",
cacheDir);
} catch (Exception e) {
LOG.error("Unable to create directory {} for RemoteIndexCache.", cacheDir, e);
throw new FlussRuntimeException(e);
}
// Delete any .tmp files remained from the earlier run of the tablet server.
try (Stream<Path> paths = Files.list(cacheDir.toPath())) {
paths.forEach(
path -> {
if (path.endsWith(TMP_FILE_SUFFIX)) {
try {
if (Files.deleteIfExists(path)) {
LOG.debug("Deleted file path {} on cache initialization", path);
}
} catch (IOException e) {
throw new FlussRuntimeException(e);
}
}
});
}
try (Stream<Path> paths = Files.list(cacheDir.toPath())) {
Iterator<Path> iterator = paths.iterator();
while (iterator.hasNext()) {
Path path = iterator.next();
Path fileNamePath = path.getFileName();
if (fileNamePath == null) {
throw new FlussRuntimeException(
"Empty file name in remote index cache directory: " + cacheDir);
}
String indexFileName = fileNamePath.toString();
UUID remoteSegmentId = uuidFromRemoteIndexCacheFileName(indexFileName);
// It is safe to update the internalCache non-atomically here since this function is
// always called by a single thread only.
if (!internalCache.asMap().containsKey(remoteSegmentId)) {
String fileNameWithoutSuffix =
indexFileName.substring(0, indexFileName.indexOf("."));
File offsetIndexFile =
new File(cacheDir, fileNameWithoutSuffix + INDEX_FILE_SUFFIX);
File timestampIndexFile =
new File(cacheDir, fileNameWithoutSuffix + TIME_INDEX_FILE_SUFFIX);
// Create entries for each path if all the index files exist.
if (Files.exists(offsetIndexFile.toPath())
&& Files.exists(timestampIndexFile.toPath())) {
long offset = offsetFromRemoteIndexCacheFileName(indexFileName);
try {
OffsetIndex offsetIndex =
new OffsetIndex(
offsetIndexFile, offset, Integer.MAX_VALUE, false);
offsetIndex.sanityCheck();
TimeIndex timeIndex =
new TimeIndex(
timestampIndexFile, offset, Integer.MAX_VALUE, false);
timeIndex.sanityCheck();
Entry entry = new Entry(offsetIndex, timeIndex);
internalCache.put(remoteSegmentId, entry);
} catch (CorruptIndexException e) {
LOG.debug(
"Remote offset/time log index is corrupt, delete corrupt index.",
e);
// let's delete offset index & time index
Files.deleteIfExists(offsetIndexFile.toPath());
Files.deleteIfExists(timestampIndexFile.toPath());
}
} else {
// Delete all of them if any one of those indexes is not available for a
// specific segment id.
tryAll(
Arrays.asList(
() -> {
Files.deleteIfExists(offsetIndexFile.toPath());
return null;
},
() -> {
Files.deleteIfExists(timestampIndexFile.toPath());
return null;
}));
}
}
}
}
LOG.info("RemoteIndexCache starts up in {} ms.", System.currentTimeMillis() - start);
}