private void init()

in fluss-server/src/main/java/com/alibaba/fluss/server/log/remote/RemoteLogIndexCache.java [351:449]


    private void init() throws IOException {
        long start = System.currentTimeMillis();

        try {
            Files.createDirectory(cacheDir.toPath());
            LOG.info("Created new file {} for RemoteIndexCache", cacheDir);
        } catch (FileAlreadyExistsException e) {
            LOG.info(
                    "RemoteIndexCache directory {} already exists. Re-using the same directory.",
                    cacheDir);
        } catch (Exception e) {
            LOG.error("Unable to create directory {} for RemoteIndexCache.", cacheDir, e);
            throw new FlussRuntimeException(e);
        }

        // Delete any .tmp files remained from the earlier run of the tablet server.
        try (Stream<Path> paths = Files.list(cacheDir.toPath())) {
            paths.forEach(
                    path -> {
                        if (path.endsWith(TMP_FILE_SUFFIX)) {
                            try {
                                if (Files.deleteIfExists(path)) {
                                    LOG.debug("Deleted file path {} on cache initialization", path);
                                }
                            } catch (IOException e) {
                                throw new FlussRuntimeException(e);
                            }
                        }
                    });
        }

        try (Stream<Path> paths = Files.list(cacheDir.toPath())) {
            Iterator<Path> iterator = paths.iterator();
            while (iterator.hasNext()) {
                Path path = iterator.next();
                Path fileNamePath = path.getFileName();
                if (fileNamePath == null) {
                    throw new FlussRuntimeException(
                            "Empty file name in remote index cache directory: " + cacheDir);
                }

                String indexFileName = fileNamePath.toString();
                UUID remoteSegmentId = uuidFromRemoteIndexCacheFileName(indexFileName);

                // It is safe to update the internalCache non-atomically here since this function is
                // always called by a single thread only.
                if (!internalCache.asMap().containsKey(remoteSegmentId)) {
                    String fileNameWithoutSuffix =
                            indexFileName.substring(0, indexFileName.indexOf("."));
                    File offsetIndexFile =
                            new File(cacheDir, fileNameWithoutSuffix + INDEX_FILE_SUFFIX);
                    File timestampIndexFile =
                            new File(cacheDir, fileNameWithoutSuffix + TIME_INDEX_FILE_SUFFIX);

                    // Create entries for each path if all the index files exist.
                    if (Files.exists(offsetIndexFile.toPath())
                            && Files.exists(timestampIndexFile.toPath())) {
                        long offset = offsetFromRemoteIndexCacheFileName(indexFileName);
                        try {
                            OffsetIndex offsetIndex =
                                    new OffsetIndex(
                                            offsetIndexFile, offset, Integer.MAX_VALUE, false);
                            offsetIndex.sanityCheck();

                            TimeIndex timeIndex =
                                    new TimeIndex(
                                            timestampIndexFile, offset, Integer.MAX_VALUE, false);
                            timeIndex.sanityCheck();

                            Entry entry = new Entry(offsetIndex, timeIndex);
                            internalCache.put(remoteSegmentId, entry);
                        } catch (CorruptIndexException e) {
                            LOG.debug(
                                    "Remote offset/time log index is corrupt, delete corrupt index.",
                                    e);
                            // let's delete offset index & time index
                            Files.deleteIfExists(offsetIndexFile.toPath());
                            Files.deleteIfExists(timestampIndexFile.toPath());
                        }
                    } else {
                        // Delete all of them if any one of those indexes is not available for a
                        // specific segment id.
                        tryAll(
                                Arrays.asList(
                                        () -> {
                                            Files.deleteIfExists(offsetIndexFile.toPath());
                                            return null;
                                        },
                                        () -> {
                                            Files.deleteIfExists(timestampIndexFile.toPath());
                                            return null;
                                        }));
                    }
                }
            }
        }

        LOG.info("RemoteIndexCache starts up in {} ms.", System.currentTimeMillis() - start);
    }