in paimon-format/src/main/java/org/apache/orc/OrcFile.java [1240:1344]
public static List<Path> mergeFiles(
Path outputPath, WriterOptions options, List<Path> inputFiles) throws IOException {
Writer output = null;
final Configuration conf = options.getConfiguration();
KeyProvider keyProvider = options.getKeyProvider();
try {
byte[] buffer = new byte[0];
Reader firstFile = null;
List<Path> result = new ArrayList<>(inputFiles.size());
Map<String, ByteBuffer> userMetadata = new HashMap<>();
int bufferSize = 0;
for (Path input : inputFiles) {
FileSystem fs = input.getFileSystem(conf);
Reader reader =
createReader(
input,
readerOptions(options.getConfiguration())
.filesystem(fs)
.setKeyProvider(keyProvider));
if (!understandFormat(input, reader)) {
continue;
} else if (firstFile == null) {
// if this is the first file that we are including, grab the values
firstFile = reader;
bufferSize = reader.getCompressionSize();
CompressionKind compression = reader.getCompressionKind();
options.bufferSize(bufferSize)
.version(reader.getFileVersion())
.writerVersion(reader.getWriterVersion())
.compress(compression)
.rowIndexStride(reader.getRowIndexStride())
.setSchema(reader.getSchema());
if (compression != CompressionKind.NONE) {
options.enforceBufferSize().bufferSize(bufferSize);
}
mergeMetadata(userMetadata, reader);
// ensure that the merged file uses the same key versions
for (EncryptionKey key : reader.getColumnEncryptionKeys()) {
options.setKeyVersion(
key.getKeyName(), key.getKeyVersion(), key.getAlgorithm());
}
output = createWriter(outputPath, options);
} else if (!readerIsCompatible(firstFile, userMetadata, input, reader)) {
continue;
} else {
mergeMetadata(userMetadata, reader);
if (bufferSize < reader.getCompressionSize()) {
bufferSize = reader.getCompressionSize();
((WriterInternal) output).increaseCompressionSize(bufferSize);
}
}
EncryptionVariant[] variants = reader.getEncryptionVariants();
List<StripeStatistics>[] completeList = new List[variants.length + 1];
for (int v = 0; v < variants.length; ++v) {
completeList[v] = reader.getVariantStripeStatistics(variants[v]);
}
completeList[completeList.length - 1] = reader.getVariantStripeStatistics(null);
StripeStatistics[] stripeStats = new StripeStatistics[completeList.length];
try (FSDataInputStream inputStream = ((ReaderImpl) reader).takeFile()) {
result.add(input);
for (StripeInformation stripe : reader.getStripes()) {
int length = (int) stripe.getLength();
if (buffer.length < length) {
buffer = new byte[length];
}
long offset = stripe.getOffset();
inputStream.readFully(offset, buffer, 0, length);
int stripeId = (int) stripe.getStripeId();
for (int v = 0; v < completeList.length; ++v) {
stripeStats[v] = completeList[v].get(stripeId);
}
output.appendStripe(buffer, 0, length, stripe, stripeStats);
}
}
}
if (output != null) {
for (Map.Entry<String, ByteBuffer> entry : userMetadata.entrySet()) {
output.addUserMetadata(entry.getKey(), entry.getValue());
}
output.close();
}
return result;
} catch (Throwable t) {
if (output != null) {
try {
output.close();
} catch (Throwable ignore) {
// PASS
}
try {
FileSystem fs =
options.getFileSystem() == null
? outputPath.getFileSystem(conf)
: options.getFileSystem();
fs.delete(outputPath, false);
} catch (Throwable ignore) {
// PASS
}
}
throw new IOException("Problem merging files into " + outputPath, t);
}
}