in parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java [1033:1115]
public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup,
boolean dropColumns) throws IOException {
startBlock(rowGroup.getRowCount());
Map<String, ColumnChunkMetaData> columnsToCopy =
new HashMap<String, ColumnChunkMetaData>();
for (ColumnChunkMetaData chunk : rowGroup.getColumns()) {
columnsToCopy.put(chunk.getPath().toDotString(), chunk);
}
List<ColumnChunkMetaData> columnsInOrder =
new ArrayList<ColumnChunkMetaData>();
for (ColumnDescriptor descriptor : schema.getColumns()) {
String path = ColumnPath.get(descriptor.getPath()).toDotString();
ColumnChunkMetaData chunk = columnsToCopy.remove(path);
if (chunk != null) {
columnsInOrder.add(chunk);
} else {
throw new IllegalArgumentException(String.format(
"Missing column '%s', cannot copy row group: %s", path, rowGroup));
}
}
// complain if some columns would be dropped and that's not okay
if (!dropColumns && !columnsToCopy.isEmpty()) {
throw new IllegalArgumentException(String.format(
"Columns cannot be copied (missing from target schema): %s",
String.join(", ", columnsToCopy.keySet())));
}
// copy the data for all chunks
long start = -1;
long length = 0;
long blockUncompressedSize = 0L;
for (int i = 0; i < columnsInOrder.size(); i += 1) {
ColumnChunkMetaData chunk = columnsInOrder.get(i);
// get this chunk's start position in the new file
long newChunkStart = out.getPos() + length;
// add this chunk to be copied with any previous chunks
if (start < 0) {
// no previous chunk included, start at this chunk's starting pos
start = chunk.getStartingPos();
}
length += chunk.getTotalSize();
if ((i + 1) == columnsInOrder.size() ||
columnsInOrder.get(i + 1).getStartingPos() != (start + length)) {
// not contiguous. do the copy now.
copy(from, out, start, length);
// reset to start at the next column chunk
start = -1;
length = 0;
}
// TODO: column/offset indexes are not copied
// (it would require seeking to the end of the file for each row groups)
currentColumnIndexes.add(null);
currentOffsetIndexes.add(null);
Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart);
currentBlock.addColumn(ColumnChunkMetaData.get(
chunk.getPath(),
chunk.getPrimitiveType(),
chunk.getCodec(),
chunk.getEncodingStats(),
chunk.getEncodings(),
chunk.getStatistics(),
offsets.firstDataPageOffset,
offsets.dictionaryPageOffset,
chunk.getValueCount(),
chunk.getTotalSize(),
chunk.getTotalUncompressedSize()));
blockUncompressedSize += chunk.getTotalUncompressedSize();
}
currentBlock.setTotalByteSize(blockUncompressedSize);
endBlock();
}