sources/java-incremental-compilation/intellij-build-zip/src/ZipArchiveOutputStream.kt (412 lines of code) (raw):
// Copyright 2000-2025 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package org.jetbrains.intellij.build.io
import io.netty.buffer.ByteBuf
import org.jetbrains.intellij.build.io.ZipArchiveOutputStream.Companion.FLUSH_THRESHOLD
import org.jetbrains.intellij.build.io.ZipArchiveOutputStream.Companion.INITIAL_BUFFER_CAPACITY
import java.io.IOException
import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.nio.channels.FileChannel
import java.nio.file.Path
import java.util.zip.CRC32
import java.util.zip.ZipEntry
fun zipWriter(
targetFile: Path,
packageIndexBuilder: PackageIndexBuilder?,
overwrite: Boolean = false,
): ZipArchiveOutputStream {
return ZipArchiveOutputStream(
dataWriter = fileDataWriter(file = targetFile, overwrite = overwrite, isTemp = false),
zipIndexWriter = ZipIndexWriter(packageIndexBuilder),
)
}
class ZipArchiveOutputStream(
private val dataWriter: DataWriter,
private val zipIndexWriter: ZipIndexWriter,
) : AutoCloseable {
companion object {
private val emptyByteArray = ByteArray(0)
// visible for tests
/**
* Defines the threshold for flushing the buffer to disk (1MB).
* When the amount of data in the buffer exceeds this threshold, the buffer is flushed to disk.
*/
const val FLUSH_THRESHOLD: Int = 1 * 1024 * 1024
/**
* Defines the initial capacity of the buffer (128KB).
*
* We use two constants ([FLUSH_THRESHOLD] and [INITIAL_BUFFER_CAPACITY]) to optimize memory usage and write performance independently.
* [INITIAL_BUFFER_CAPACITY] controls the initial memory footprint.
* A smaller value reduces memory consumption for small ZIP archives.
* [FLUSH_THRESHOLD] controls *when* the buffer is flushed to disk.
* It allows us to accumulate a larger chunk of data before writing, improving I/O efficiency,
* *without* requiring us to allocate a large buffer upfront for every archive, even small ones.
* We can flush the smaller buffer more frequently if the [FLUSH_THRESHOLD] is reached, preventing excessive memory usage.
*/
const val INITIAL_BUFFER_CAPACITY: Int = 128 * 1024
init {
// https://github.com/netty/netty/issues/11532
if (System.getProperty("io.netty.tryReflectionSetAccessible") == null) {
System.setProperty("io.netty.tryReflectionSetAccessible", "true")
}
}
}
private var finished = false
private val buffer = byteBufferAllocator.directBuffer(INITIAL_BUFFER_CAPACITY)
private var channelPosition = 0L
@Suppress("DuplicatedCode")
@Synchronized
internal fun addDirEntries(names: Collection<String>) {
if (dataWriter.isNioBufferSupported) {
channelPosition = writeDirEntriesUsingNioBuffer(names, flushBufferIfNeeded(0), dataWriter, zipIndexWriter)
}
else {
writeDirEntries(names, flushBufferIfNeeded(), zipIndexWriter, buffer)
}
}
@Suppress("DuplicatedCode")
private fun addDirEntries(names: Array<String>) {
if (dataWriter.isNioBufferSupported) {
channelPosition = writeDirEntriesUsingNioBuffer(names, flushBufferIfNeeded(0), dataWriter, zipIndexWriter)
}
else {
writeDirEntries(names = names, channelPosition = flushBufferIfNeeded(), zipIndexWriter = zipIndexWriter, buffer = buffer)
}
}
@Synchronized
fun writeDataWithUnknownSize(path: ByteArray, estimatedSize: Int, crc32: CRC32?, task: (ByteBuf) -> Unit) {
val headerOffset = flushBufferIfNeeded()
val headerSize = 30 + path.size
val headerPosition = buffer.writerIndex()
val endOfHeaderPosition = headerPosition + headerSize
buffer.ensureWritable(headerSize + estimatedSize.coerceAtLeast(1024))
buffer.writerIndex(endOfHeaderPosition)
task(buffer)
val size = buffer.writerIndex() - endOfHeaderPosition
val crc = if (crc32 == null || size == 0) {
0
}
else {
crc32.compute(buffer.internalNioBuffer(endOfHeaderPosition, size))
}
buffer.writerIndex(headerPosition)
writeZipLocalFileHeader(path = path, size = size, crc32 = crc, buffer = buffer)
buffer.writerIndex(endOfHeaderPosition + size)
zipIndexWriter.writeCentralFileHeader(
path = path,
size = size,
compressedSize = size,
method = ZipEntry.STORED,
crc = crc,
headerOffset = headerOffset,
dataOffset = headerOffset + headerSize,
)
}
internal data class CompressedSizeAndCrc(@JvmField val compressedSize: Int, @JvmField val crc: Long)
@Synchronized
internal fun writeMaybeCompressed(path: ByteArray, dataSize: Int, task: (resultConsumer: (ByteBuffer) -> Unit) -> CompressedSizeAndCrc) {
val headerSize = 30 + path.size
val localFileHeaderOffset = flushBufferIfNeeded(0)
channelPosition += headerSize
var (compressedSize, crc) = task(::writeBuffer)
val method = if (compressedSize == -1) {
compressedSize = dataSize
ZipEntry.STORED
}
else {
ZipEntry.DEFLATED
}
val endPosition = channelPosition
val compressedSizeByPosition = endPosition - localFileHeaderOffset - headerSize
require(compressedSizeByPosition.toInt() == compressedSize) {
"Expected $compressedSize, actual $compressedSizeByPosition"
}
writeZipLocalFileHeader(path = path, size = dataSize, compressedSize = compressedSize, crc32 = crc, method = method, buffer = buffer)
require(buffer.readableBytes() == headerSize)
dataWriter.write(buffer, localFileHeaderOffset)
buffer.clear()
channelPosition = endPosition
zipIndexWriter.writeCentralFileHeader(
path = path,
size = dataSize,
compressedSize = compressedSize,
method = method,
crc = crc,
headerOffset = localFileHeaderOffset,
dataOffset = localFileHeaderOffset + headerSize,
)
}
@Suppress("unused")
@Synchronized
fun writeUndeclaredData(maxSize: Int, task: (ByteBuffer, Long) -> Int) {
writeUsingNioBufferAndAllocateSeparateIfLargeData(maxSize, task)
}
// returns start position
@Suppress("unused")
@Synchronized
fun writeUndeclaredDataWithKnownSize(data: ByteBuffer): Long {
val size = data.remaining()
if (dataWriter.isNioBufferSupported) {
val position = flushBufferIfNeeded(0)
val size = size
dataWriter.write(data, position)
channelPosition = position + size
return position
}
else {
return writeData(data, size)
}
}
private fun writeData(data: ByteBuffer, size: Int): Long {
val writableBytes = buffer.writableBytes()
val position = channelPosition + buffer.readableBytes()
if (writableBytes >= size) {
buffer.writeBytes(data)
}
else {
// write partial data to buffer, flush it, then handle remaining data
if (writableBytes > 0) {
val limit = data.limit()
data.limit(data.position() + writableBytes)
buffer.writeBytes(data)
data.limit(limit)
}
writeBuffer(buffer)
buffer.clear()
val rest = size - writableBytes
// directly write large data to avoid unnecessary buffer resizing
if (rest >= buffer.writableBytes()) {
writeBuffer(data)
}
else {
buffer.writeBytes(data)
}
}
return position
}
@Synchronized
private fun flushBufferIfNeeded(threshold: Int = FLUSH_THRESHOLD): Long {
val readableBytes = buffer.readableBytes()
if (readableBytes > threshold) {
writeBuffer(buffer)
buffer.clear()
return channelPosition
}
else {
return channelPosition + readableBytes
}
}
@Synchronized
internal fun transferFromFileChannel(path: ByteArray, source: FileChannel, size: Int, crc32: CRC32?) {
if (size == 0) {
uncompressedData(path = path, data = emptyByteArray, crc32 = crc32)
return
}
if (crc32 == null) {
zipIndexWriter.writeCentralFileHeader(path = path, size = size, crc = 0, headerOffset = flushBufferIfNeeded())
writeZipLocalFileHeader(path = path, size = size, crc32 = 0, buffer = buffer)
val position = flushBufferIfNeeded(0)
dataWriter.transferFromFileChannel(source, position, size)
channelPosition = position + size
}
else if (dataWriter.isNioBufferSupported) {
// before dataWriter.asBuffer
val headerOffset = flushBufferIfNeeded(0)
val headerSize = 30 + path.size
val headerAndDataSize = headerSize + size
val buffer = dataWriter.asNioBuffer(headerAndDataSize, headerOffset)!!
val headerPosition = buffer.position()
val endOfHeaderPosition = headerPosition + headerSize
buffer.position(endOfHeaderPosition)
copyFromFileChannelToBuffer(sourceChannel = source, buffer = buffer, size = size.toLong(), file = null)
buffer.position(endOfHeaderPosition)
assert(buffer.limit() == headerPosition + headerAndDataSize)
crc32.reset()
crc32.update(buffer)
val crc = crc32.value
buffer.position(headerPosition)
writeZipLocalFileHeader(path = path, size = size, crc32 = crc, buffer = buffer)
zipIndexWriter.writeCentralFileHeader(path = path, size = size, crc = crc, headerOffset = headerOffset)
channelPosition += headerAndDataSize
}
else {
// we have to compute CRC32 for a file, so, we cannot use `FileChannel.transferTo`
val fileData = source.map(FileChannel.MapMode.READ_ONLY, 0, size.toLong())
try {
uncompressedData(path = path, data = fileData, crc32 = crc32)
}
finally {
unmapBuffer(fileData)
}
}
}
@Synchronized
fun fileWithoutCrc(path: ByteArray, file: Path) {
FileChannel.open(file, READ_OPEN_OPTION).use { sourceChannel ->
val size = sourceChannel.size()
if (size > Int.MAX_VALUE) {
throw IOException("File sizes over 2 GB are not supported: $size")
}
transferFromFileChannel(path = path, source = sourceChannel, size = size.toInt(), crc32 = null)
}
}
@Synchronized
fun writeDataWithKnownSize(path: ByteArray, size: Int, crc32: CRC32? = null, task: (ByteBuffer) -> Unit) {
val headerAndDataSize = 30 + path.size + size
writeUsingNioBufferAndAllocateSeparateIfLargeData(headerAndDataSize) { nioBuffer, localFileHeaderOffset ->
writeNioBuffer(
path = path,
crc32 = crc32,
localFileHeaderOffset = localFileHeaderOffset,
buffer = nioBuffer,
zipIndexWriter = zipIndexWriter,
task = task,
)
headerAndDataSize
}
}
private fun writeUsingNioBufferAndAllocateSeparateIfLargeData(headerAndDataSize: Int, task: (ByteBuffer, Long) -> Int) {
if (dataWriter.isNioBufferSupported) {
val localFileHeaderOffset = flushBufferIfNeeded(0)
val nioBuffer = dataWriter.asNioBuffer(headerAndDataSize, localFileHeaderOffset)!!
val size = task(nioBuffer, localFileHeaderOffset)
channelPosition += size
return
}
var buffer = buffer
var releaseBuffer = false
val localFileHeaderOffset = if (buffer.writableBytes() < headerAndDataSize) {
if (buffer.isReadable) {
writeBuffer(buffer)
buffer.clear()
}
if (headerAndDataSize > INITIAL_BUFFER_CAPACITY) {
// instead of resizing the current buffer, it's preferable to obtain a buffer of the required size from a pool
buffer = byteBufferAllocator.directBuffer(headerAndDataSize)
releaseBuffer = true
}
channelPosition
}
else {
channelPosition + buffer.readableBytes()
}
try {
val nioBuffer = buffer.internalNioBuffer(buffer.writerIndex(), headerAndDataSize)
val oldOrder = nioBuffer.order()
nioBuffer.order(ByteOrder.LITTLE_ENDIAN)
val size = task(nioBuffer, localFileHeaderOffset)
nioBuffer.order(oldOrder)
buffer.writerIndex(buffer.writerIndex() + size)
if (releaseBuffer) {
writeBuffer(buffer)
}
}
finally {
if (releaseBuffer) {
buffer.release()
}
}
}
@Suppress("DuplicatedCode")
@Synchronized
fun uncompressedData(path: ByteArray, data: ByteArray, crc32: CRC32?) {
val size = data.size
val crc = crc32?.compute(data) ?: 0
if (dataWriter.isNioBufferSupported) {
putUncompressedDataToMappedBuffer(path = path, size = size, crc = crc) {
it.put(data)
}
}
else {
val headerOffset = flushBufferIfNeeded()
writeZipLocalFileHeader(path = path, size = size, crc32 = crc, buffer = buffer)
zipIndexWriter.writeCentralFileHeader(path = path, size = size, crc = crc, headerOffset = headerOffset)
val freeCapacity = buffer.writableBytes()
if (freeCapacity >= size) {
buffer.writeBytes(data)
}
else {
// write partial data to buffer, flush it, then handle remaining data
if (freeCapacity > 0) {
buffer.writeBytes(data, 0, freeCapacity)
}
else {
check(freeCapacity == 0) {
"No writable bytes are expected, but the buffer still has free capacity: $freeCapacity"
}
}
writeBuffer(buffer)
buffer.clear()
val restOffset = freeCapacity
val restSize = size - freeCapacity
// directly write large data to avoid unnecessary buffer resizing
if (restSize >= buffer.writableBytes()) {
writeBuffer(ByteBuffer.wrap(data, restOffset, restSize))
}
else {
buffer.writeBytes(data, restOffset, restSize)
}
}
}
}
@Suppress("DuplicatedCode")
@Synchronized
fun uncompressedData(path: ByteArray, data: ByteBuffer, crc32: CRC32?) {
val size = data.remaining()
val crc = crc32?.compute(data) ?: 0
if (dataWriter.isNioBufferSupported) {
putUncompressedDataToMappedBuffer(path = path, size = size, crc = crc) {
it.put(data)
}
}
else {
val headerOffset = flushBufferIfNeeded()
writeZipLocalFileHeader(path = path, size = size, crc32 = crc, buffer = buffer)
zipIndexWriter.writeCentralFileHeader(path = path, size = size, crc = crc, headerOffset = headerOffset)
writeData(data, size)
}
}
private inline fun putUncompressedDataToMappedBuffer(path: ByteArray, size: Int, crc: Long, task: (ByteBuffer) -> Unit) {
// before dataWriter.asBuffer
val headerOffset = flushBufferIfNeeded(0)
val headerAndDataSize = 30 + path.size + size
val buffer = dataWriter.asNioBuffer(headerAndDataSize, headerOffset)!!
writeZipLocalFileHeader(path = path, size = size, crc32 = crc, buffer = buffer)
task(buffer)
channelPosition += headerAndDataSize
zipIndexWriter.writeCentralFileHeader(path = path, size = size, crc = crc, headerOffset = headerOffset)
}
private fun writeBuffer(data: ByteBuf) {
val size = data.readableBytes()
assert(size != 0)
dataWriter.write(data, channelPosition)
channelPosition += size
}
private fun writeBuffer(data: ByteBuffer) {
assert(!buffer.isReadable)
val size = data.remaining()
dataWriter.write(data, channelPosition)
channelPosition += size
}
@Synchronized
override fun close() {
if (finished) {
return
}
try {
try {
val packageIndexBuilder = zipIndexWriter.packageIndexBuilder
val indexDataEnd = if (packageIndexBuilder == null || zipIndexWriter.isEmpty()) {
-1
}
else {
packageIndexBuilder.writePackageIndex {
addDirEntries(it)
}
val indexWriter = packageIndexBuilder.indexWriter
// ditto on macOS doesn't like arbitrary data in zip file - wrap into zip entry
val indexDataSize = indexWriter.dataSize()
val indexDataEnd = flushBufferIfNeeded() + indexDataSize + 30 + INDEX_FILENAME_BYTES.size
writeIndex(indexWriter, indexDataSize, this)
indexDataEnd.toInt()
}
val unwrittenDataSize = buffer.readableBytes()
// write central directory file header
val zipIndexData = zipIndexWriter.finish(centralDirectoryOffset = channelPosition + unwrittenDataSize, indexDataEnd = indexDataEnd)
if (unwrittenDataSize != 0 && buffer.writableBytes() >= zipIndexData.readableBytes()) {
buffer.writeBytes(zipIndexData)
writeBuffer(buffer)
}
else {
if (unwrittenDataSize != 0) {
writeBuffer(buffer)
}
writeBuffer(zipIndexData)
}
finished = true
}
finally {
dataWriter.close(channelPosition)
}
}
finally {
try {
zipIndexWriter.release()
}
finally {
buffer.release()
}
}
}
}