public static File parallelBackup()

in compress/src/main/java/jetbrains/exodus/util/CompressBackupUtil.java [231:448]


    public static File parallelBackup(@NotNull final Backupable source,
                                      @NotNull final File target) throws Exception {
        if (target.exists()) {
            throw new IOException("Backup file already exists:" + target.getAbsolutePath());
        }

        final BackupStrategy strategy = source.getBackupStrategy();
        try {
            strategy.beforeBackup();

            final XXHashFactory hashFactory = XXHashFactory.fastestInstance();
            final XXHash32 hash32 = hashFactory.hash32();
            final AtomicLong compressedFilePosition = new AtomicLong();

            try (final FileChannel backupChannel = FileChannel.open(target.toPath(), StandardOpenOption.CREATE_NEW,
                    StandardOpenOption.WRITE)) {
                writeLZ4EmptyFrame(backupChannel, hash32);

                writeLZ4FrameHeader(backupChannel, hash32, compressedFilePosition);
            }


            final boolean zeroCompression = strategy.isEncrypted();
            if (zeroCompression && logger.isInfoEnabled()) {
                logger.info("Backup content is encrypted and will not be compressed.");
            }

            final int processors = Runtime.getRuntime().availableProcessors();
            final int threadLimit = Math.max(2, processors / 4);

            final long freeMemory = Runtime.getRuntime().freeMemory();
            final long memoryLimit = Math.max(freeMemory / 6, 64L * 1024 * 1024);
            final int compressorsLimit = (int) Math.min(threadLimit, memoryLimit / LZ4_MAX_BLOCK_SIZE);

            if (logger.isInfoEnabled()) {
                logger.info(String.format("%,d Mb of free heap memory was detected into the system, %,d Mb is allowed" +
                        " to be used for backup.", freeMemory / (1024 * 1024), memoryLimit / (1024 * 1024)));
                logger.info(String.format("%d processors were detected. %d is allowed to be used for backup.",
                        processors, threadLimit));
                logger.info(String.format("Amount of threads used for backup is set to %d.", compressorsLimit));
            }

            final ExecutorService streamMachinery =
                    Executors.newFixedThreadPool(compressorsLimit, r -> {
                        Thread thread = new Thread(r);
                        thread.setDaemon(true);

                        thread.setName("Parallel compressed backup thread");

                        return thread;
                    });

            final AtomicBoolean generatorStopped = new AtomicBoolean();
            final ConcurrentLinkedQueue<Pair<VirtualFileDescriptor, Long>> descriptors =
                    new ConcurrentLinkedQueue<>();
            final int queueCapacity = (int) Math.min(Integer.MAX_VALUE, Math.max(1024L, (freeMemory / 100) / 512));
            final Semaphore queueSemaphore = new Semaphore(queueCapacity);

            if (logger.isInfoEnabled()) {
                logger.info(String.format("Capacity of the backup queue is set to %d files", queueCapacity));
            }


            final ArrayList<Future<Void>> threads = new ArrayList<>();

            final ZipEncoding zipEncoding = ZipEncodingHelper.getZipEncoding(null);

            final LZ4Factory factory = LZ4Factory.safeInstance();
            final LZ4Compressor compressor = factory.fastCompressor();
            final int maxCompressedBlockSize = 2 * Integer.BYTES + compressor.maxCompressedLength(LZ4_MAX_BLOCK_SIZE);

            for (int i = 0; i < compressorsLimit; i++) {
                threads.add(streamMachinery.submit(() -> {
                    try (final FileChannel backupChannel = FileChannel.open(target.toPath(), StandardOpenOption.WRITE)) {
                        final ByteBuffer buffer = ByteBuffer.allocate(LZ4_MAX_BLOCK_SIZE);

                        final int compressionBufferSize = 2 * maxCompressedBlockSize;
                        final ByteBuffer compressedBuffer =
                                ByteBuffer.allocateDirect(compressionBufferSize).order(ByteOrder.LITTLE_ENDIAN);

                        while (true) {
                            boolean genStopped = generatorStopped.get();
                            Pair<VirtualFileDescriptor, Long> pair = descriptors.poll();

                            if (pair != null) {
                                queueSemaphore.release(1);
                                VirtualFileDescriptor fd = pair.first;
                                long fileSize = pair.second;

                                if (fd.hasContent()) {
                                    try (final InputStream fileStream = fd.getInputStream()) {
                                        long bytesWritten = 0;
                                        int fileIndex = 0;

                                        while (bytesWritten < fileSize) {
                                            final int chunkSize = (int) Math.min(buffer.remaining() -
                                                            TarConstants.DEFAULT_RCDSIZE,
                                                    fileSize - bytesWritten);

                                            if (chunkSize > 0) {
                                                final String fullPath;

                                                final String fdName = fd.getName();
                                                final int extensionIndex = fdName.lastIndexOf('.');

                                                if (extensionIndex >= 0 && extensionIndex < fdName.length() - 1) {
                                                    fullPath = String.format("%s%s-%08X%s", fd.getPath(),
                                                            fdName.substring(0, extensionIndex), fileIndex, fdName.substring(extensionIndex));
                                                } else {
                                                    fullPath = String.format("%s%s-%08X", fd.getPath(), fdName, fileIndex);
                                                }


                                                writeTarFileHeader(buffer, fullPath, chunkSize, fd.getTimeStamp(), zipEncoding);

                                                int bytesRead = 0;
                                                final byte[] bufferArray = buffer.array();
                                                int bufferOffset = buffer.arrayOffset();
                                                int bufferPosition = buffer.position();

                                                while (bytesRead < chunkSize) {
                                                    int r = fileStream.read(bufferArray, bufferOffset + bufferPosition,
                                                            chunkSize - bytesRead);

                                                    if (r == -1) {
                                                        break;
                                                    }

                                                    bufferPosition += r;
                                                    bytesRead += r;
                                                }

                                                if (bytesRead > 0) {
                                                    bytesWritten += bytesRead;
                                                    buffer.position(bufferPosition);
                                                } else {
                                                    throw new ExodusException("Invalid file size");
                                                }

                                                tarPadding(buffer);
                                                fileIndex++;
                                            } else {
                                                writeLZ4Block(buffer, compressedBuffer, compressor, hash32,
                                                        zeroCompression);

                                                if (compressedBuffer.remaining() < maxCompressedBlockSize) {
                                                    writeChunk(compressedBuffer, compressedFilePosition, backupChannel);
                                                }
                                            }
                                        }
                                    }
                                }
                            } else if (genStopped) {
                                break;
                            }
                        }

                        tarPadding(buffer);

                        writeLZ4Block(buffer, compressedBuffer, compressor, hash32, zeroCompression);
                        writeChunk(compressedBuffer, compressedFilePosition, backupChannel);
                    }

                    return null;
                }));
            }


            for (final VirtualFileDescriptor fd : strategy.getContents()) {
                if (strategy.isInterrupted()) {
                    break;
                }

                if (fd.hasContent()) {
                    final long fileSize = Math.min(fd.getFileSize(), strategy.acceptFile(fd));
                    if (fileSize > 0) {
                        queueSemaphore.acquire();
                        descriptors.offer(new Pair<>(fd, fileSize));
                    }
                }
            }
            generatorStopped.set(true);

            try {
                for (Future<Void> thread : threads) {
                    thread.get();
                }
            } finally {
                streamMachinery.shutdown();
            }

            try (final FileChannel backupChannel = FileChannel.open(target.toPath(), StandardOpenOption.WRITE)) {
                final ByteBuffer lastBlock = ByteBuffer.allocate(2 * TarConstants.DEFAULT_RCDSIZE);
                final ByteBuffer compressedBuffer = ByteBuffer.allocateDirect(2 * Integer.BYTES +
                        compressor.maxCompressedLength(lastBlock.remaining())).order(ByteOrder.LITTLE_ENDIAN);
                lastBlock.position(lastBlock.remaining());

                writeLZ4Block(lastBlock, compressedBuffer, compressor, hash32, false);
                writeChunk(compressedBuffer, compressedFilePosition, backupChannel);

                writeLZ4FrameEndMark(backupChannel);
            }

            if (strategy.isInterrupted()) {
                logger.info("Backup interrupted, deleting \"" + target.getName() + "\"...");
                IOUtil.deleteFile(target);
            } else {
                logger.info("Backup file \"" + target.getName() + "\" created.");
            }
        } catch (
                Throwable t) {
            strategy.onError(t);
            throw ExodusException.toExodusException(t, "Backup failed");
        } finally {
            strategy.afterBackup();
        }
        return target;
    }