in common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java [232:408]
public void init() throws URISyntaxException, IOException {
useDecimal128 =
conf.getBoolean(
CometConf.COMET_USE_DECIMAL_128().key(),
(Boolean) CometConf.COMET_USE_DECIMAL_128().defaultValue().get());
long start = file.start();
long length = file.length();
String filePath = file.filePath().toString();
long fileSize = file.fileSize();
ParquetReadOptions.Builder builder = HadoopReadOptions.builder(conf, new Path(filePath));
if (start >= 0 && length >= 0) {
builder = builder.withRange(start, start + length);
}
ParquetReadOptions readOptions = builder.build();
// TODO: enable off-heap buffer when they are ready
ReadOptions cometReadOptions = ReadOptions.builder(conf).build();
Path path = new Path(new URI(filePath));
try (FileReader fileReader =
new FileReader(
CometInputFile.fromPath(path, conf), footer, readOptions, cometReadOptions, metrics)) {
requestedSchema = footer.getFileMetaData().getSchema();
MessageType fileSchema = requestedSchema;
if (sparkSchema == null) {
sparkSchema = new ParquetToSparkSchemaConverter(conf).convert(requestedSchema);
} else {
requestedSchema =
CometParquetReadSupport.clipParquetSchema(
requestedSchema, sparkSchema, isCaseSensitive, useFieldId, ignoreMissingIds);
if (requestedSchema.getFieldCount() != sparkSchema.size()) {
throw new IllegalArgumentException(
String.format(
"Spark schema has %d columns while " + "Parquet schema has %d columns",
sparkSchema.size(), requestedSchema.getColumns().size()));
}
}
String timeZoneId = conf.get("spark.sql.session.timeZone");
// Native code uses "UTC" always as the timeZoneId when converting from spark to arrow schema.
Schema arrowSchema = Utils$.MODULE$.toArrowSchema(sparkSchema, "UTC");
byte[] serializedRequestedArrowSchema = serializeArrowSchema(arrowSchema);
Schema dataArrowSchema = Utils$.MODULE$.toArrowSchema(dataSchema, "UTC");
byte[] serializedDataArrowSchema = serializeArrowSchema(dataArrowSchema);
// Create Column readers
List<Type> fields = requestedSchema.getFields();
List<Type> fileFields = fileSchema.getFields();
int numColumns = fields.size();
if (partitionSchema != null) numColumns += partitionSchema.size();
columnReaders = new AbstractColumnReader[numColumns];
// Initialize missing columns and use null vectors for them
missingColumns = new boolean[numColumns];
// We do not need the column index of the row index; but this method has the
// side effect of throwing an exception if a column with the same name is
// found which we do want (spark unit tests explicitly test for that).
ShimFileFormat.findRowIndexColumnIndexInSchema(sparkSchema);
StructField[] nonPartitionFields = sparkSchema.fields();
boolean hasRowIndexColumn = false;
// Ranges of rows to read (needed iff row indexes are being read)
List<BlockMetaData> blocks =
FileReader.filterRowGroups(readOptions, footer.getBlocks(), fileReader);
totalRowCount = fileReader.getFilteredRecordCount();
if (totalRowCount == 0) {
// all the data is filtered out.
isInitialized = true;
return;
}
long[] starts = new long[blocks.size()];
long[] lengths = new long[blocks.size()];
starts = new long[blocks.size()];
lengths = new long[blocks.size()];
int blockIndex = 0;
for (BlockMetaData block : blocks) {
long blockStart = block.getStartingPos();
long blockLength = block.getCompressedSize();
starts[blockIndex] = blockStart;
lengths[blockIndex] = blockLength;
blockIndex++;
}
for (int i = 0; i < fields.size(); i++) {
Type field = fields.get(i);
Optional<Type> optFileField =
fileFields.stream().filter(f -> f.getName().equals(field.getName())).findFirst();
if (nonPartitionFields[i].name().equals(ShimFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME())) {
// Values of ROW_INDEX_TEMPORARY_COLUMN_NAME column are always populated with
// generated row indexes, rather than read from the file.
// TODO(SPARK-40059): Allow users to include columns named
// FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME in their schemas.
long[] rowIndices = FileReader.getRowIndices(blocks);
columnReaders[i] = new RowIndexColumnReader(nonPartitionFields[i], capacity, rowIndices);
hasRowIndexColumn = true;
missingColumns[i] = true;
} else if (optFileField.isPresent()) {
// The column we are reading may be a complex type in which case we check if each field in
// the requested type is in the file type (and the same data type)
if (!isEqual(field, optFileField.get())) {
throw new UnsupportedOperationException("Schema evolution is not supported");
}
missingColumns[i] = false;
} else {
if (field.getRepetition() == Type.Repetition.REQUIRED) {
throw new IOException(
"Required column '"
+ field.getName()
+ "' is missing"
+ " in data file "
+ filePath);
}
if (field.isPrimitive()) {
ConstantColumnReader reader =
new ConstantColumnReader(nonPartitionFields[i], capacity, useDecimal128);
columnReaders[i] = reader;
missingColumns[i] = true;
} else {
// the column requested is not in the file, but the native reader can handle that
// and will return nulls for all rows requested
missingColumns[i] = false;
}
}
}
// Initialize constant readers for partition columns
if (partitionSchema != null) {
StructField[] partitionFields = partitionSchema.fields();
for (int i = fields.size(); i < columnReaders.length; i++) {
int fieldIndex = i - fields.size();
StructField field = partitionFields[fieldIndex];
ConstantColumnReader reader =
new ConstantColumnReader(field, capacity, partitionValues, fieldIndex, useDecimal128);
columnReaders[i] = reader;
}
}
vectors = new CometVector[numColumns];
currentBatch = new ColumnarBatch(vectors);
// For test purpose only
// If the last external accumulator is `NumRowGroupsAccumulator`, the row group number to read
// will be updated to the accumulator. So we can check if the row groups are filtered or not
// in test case.
// Note that this tries to get thread local TaskContext object, if this is called at other
// thread, it won't update the accumulator.
if (taskContext != null) {
Option<AccumulatorV2<?, ?>> accu = getTaskAccumulator(taskContext.taskMetrics());
if (accu.isDefined() && accu.get().getClass().getSimpleName().equals("NumRowGroupsAcc")) {
@SuppressWarnings("unchecked")
AccumulatorV2<Integer, Integer> intAccum = (AccumulatorV2<Integer, Integer>) accu.get();
intAccum.add(blocks.size());
}
}
int batchSize =
conf.getInt(
CometConf.COMET_BATCH_SIZE().key(),
(Integer) CometConf.COMET_BATCH_SIZE().defaultValue().get());
this.handle =
Native.initRecordBatchReader(
filePath,
fileSize,
starts,
lengths,
hasRowIndexColumn ? null : nativeFilter,
serializedRequestedArrowSchema,
serializedDataArrowSchema,
timeZoneId,
batchSize);
}
isInitialized = true;
}