in compress/src/main/java/jetbrains/exodus/util/CompressBackupUtil.java [231:448]
public static File parallelBackup(@NotNull final Backupable source,
@NotNull final File target) throws Exception {
if (target.exists()) {
throw new IOException("Backup file already exists:" + target.getAbsolutePath());
}
final BackupStrategy strategy = source.getBackupStrategy();
try {
strategy.beforeBackup();
final XXHashFactory hashFactory = XXHashFactory.fastestInstance();
final XXHash32 hash32 = hashFactory.hash32();
final AtomicLong compressedFilePosition = new AtomicLong();
try (final FileChannel backupChannel = FileChannel.open(target.toPath(), StandardOpenOption.CREATE_NEW,
StandardOpenOption.WRITE)) {
writeLZ4EmptyFrame(backupChannel, hash32);
writeLZ4FrameHeader(backupChannel, hash32, compressedFilePosition);
}
final boolean zeroCompression = strategy.isEncrypted();
if (zeroCompression && logger.isInfoEnabled()) {
logger.info("Backup content is encrypted and will not be compressed.");
}
final int processors = Runtime.getRuntime().availableProcessors();
final int threadLimit = Math.max(2, processors / 4);
final long freeMemory = Runtime.getRuntime().freeMemory();
final long memoryLimit = Math.max(freeMemory / 6, 64L * 1024 * 1024);
final int compressorsLimit = (int) Math.min(threadLimit, memoryLimit / LZ4_MAX_BLOCK_SIZE);
if (logger.isInfoEnabled()) {
logger.info(String.format("%,d Mb of free heap memory was detected into the system, %,d Mb is allowed" +
" to be used for backup.", freeMemory / (1024 * 1024), memoryLimit / (1024 * 1024)));
logger.info(String.format("%d processors were detected. %d is allowed to be used for backup.",
processors, threadLimit));
logger.info(String.format("Amount of threads used for backup is set to %d.", compressorsLimit));
}
final ExecutorService streamMachinery =
Executors.newFixedThreadPool(compressorsLimit, r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("Parallel compressed backup thread");
return thread;
});
final AtomicBoolean generatorStopped = new AtomicBoolean();
final ConcurrentLinkedQueue<Pair<VirtualFileDescriptor, Long>> descriptors =
new ConcurrentLinkedQueue<>();
final int queueCapacity = (int) Math.min(Integer.MAX_VALUE, Math.max(1024L, (freeMemory / 100) / 512));
final Semaphore queueSemaphore = new Semaphore(queueCapacity);
if (logger.isInfoEnabled()) {
logger.info(String.format("Capacity of the backup queue is set to %d files", queueCapacity));
}
final ArrayList<Future<Void>> threads = new ArrayList<>();
final ZipEncoding zipEncoding = ZipEncodingHelper.getZipEncoding(null);
final LZ4Factory factory = LZ4Factory.safeInstance();
final LZ4Compressor compressor = factory.fastCompressor();
final int maxCompressedBlockSize = 2 * Integer.BYTES + compressor.maxCompressedLength(LZ4_MAX_BLOCK_SIZE);
for (int i = 0; i < compressorsLimit; i++) {
threads.add(streamMachinery.submit(() -> {
try (final FileChannel backupChannel = FileChannel.open(target.toPath(), StandardOpenOption.WRITE)) {
final ByteBuffer buffer = ByteBuffer.allocate(LZ4_MAX_BLOCK_SIZE);
final int compressionBufferSize = 2 * maxCompressedBlockSize;
final ByteBuffer compressedBuffer =
ByteBuffer.allocateDirect(compressionBufferSize).order(ByteOrder.LITTLE_ENDIAN);
while (true) {
boolean genStopped = generatorStopped.get();
Pair<VirtualFileDescriptor, Long> pair = descriptors.poll();
if (pair != null) {
queueSemaphore.release(1);
VirtualFileDescriptor fd = pair.first;
long fileSize = pair.second;
if (fd.hasContent()) {
try (final InputStream fileStream = fd.getInputStream()) {
long bytesWritten = 0;
int fileIndex = 0;
while (bytesWritten < fileSize) {
final int chunkSize = (int) Math.min(buffer.remaining() -
TarConstants.DEFAULT_RCDSIZE,
fileSize - bytesWritten);
if (chunkSize > 0) {
final String fullPath;
final String fdName = fd.getName();
final int extensionIndex = fdName.lastIndexOf('.');
if (extensionIndex >= 0 && extensionIndex < fdName.length() - 1) {
fullPath = String.format("%s%s-%08X%s", fd.getPath(),
fdName.substring(0, extensionIndex), fileIndex, fdName.substring(extensionIndex));
} else {
fullPath = String.format("%s%s-%08X", fd.getPath(), fdName, fileIndex);
}
writeTarFileHeader(buffer, fullPath, chunkSize, fd.getTimeStamp(), zipEncoding);
int bytesRead = 0;
final byte[] bufferArray = buffer.array();
int bufferOffset = buffer.arrayOffset();
int bufferPosition = buffer.position();
while (bytesRead < chunkSize) {
int r = fileStream.read(bufferArray, bufferOffset + bufferPosition,
chunkSize - bytesRead);
if (r == -1) {
break;
}
bufferPosition += r;
bytesRead += r;
}
if (bytesRead > 0) {
bytesWritten += bytesRead;
buffer.position(bufferPosition);
} else {
throw new ExodusException("Invalid file size");
}
tarPadding(buffer);
fileIndex++;
} else {
writeLZ4Block(buffer, compressedBuffer, compressor, hash32,
zeroCompression);
if (compressedBuffer.remaining() < maxCompressedBlockSize) {
writeChunk(compressedBuffer, compressedFilePosition, backupChannel);
}
}
}
}
}
} else if (genStopped) {
break;
}
}
tarPadding(buffer);
writeLZ4Block(buffer, compressedBuffer, compressor, hash32, zeroCompression);
writeChunk(compressedBuffer, compressedFilePosition, backupChannel);
}
return null;
}));
}
for (final VirtualFileDescriptor fd : strategy.getContents()) {
if (strategy.isInterrupted()) {
break;
}
if (fd.hasContent()) {
final long fileSize = Math.min(fd.getFileSize(), strategy.acceptFile(fd));
if (fileSize > 0) {
queueSemaphore.acquire();
descriptors.offer(new Pair<>(fd, fileSize));
}
}
}
generatorStopped.set(true);
try {
for (Future<Void> thread : threads) {
thread.get();
}
} finally {
streamMachinery.shutdown();
}
try (final FileChannel backupChannel = FileChannel.open(target.toPath(), StandardOpenOption.WRITE)) {
final ByteBuffer lastBlock = ByteBuffer.allocate(2 * TarConstants.DEFAULT_RCDSIZE);
final ByteBuffer compressedBuffer = ByteBuffer.allocateDirect(2 * Integer.BYTES +
compressor.maxCompressedLength(lastBlock.remaining())).order(ByteOrder.LITTLE_ENDIAN);
lastBlock.position(lastBlock.remaining());
writeLZ4Block(lastBlock, compressedBuffer, compressor, hash32, false);
writeChunk(compressedBuffer, compressedFilePosition, backupChannel);
writeLZ4FrameEndMark(backupChannel);
}
if (strategy.isInterrupted()) {
logger.info("Backup interrupted, deleting \"" + target.getName() + "\"...");
IOUtil.deleteFile(target);
} else {
logger.info("Backup file \"" + target.getName() + "\" created.");
}
} catch (
Throwable t) {
strategy.onError(t);
throw ExodusException.toExodusException(t, "Backup failed");
} finally {
strategy.afterBackup();
}
return target;
}