in common/src/main/java/org/apache/comet/parquet/BatchReader.java [231:378]
public void init() throws URISyntaxException, IOException {
useDecimal128 =
conf.getBoolean(
CometConf.COMET_USE_DECIMAL_128().key(),
(Boolean) CometConf.COMET_USE_DECIMAL_128().defaultValue().get());
useLazyMaterialization =
conf.getBoolean(
CometConf.COMET_USE_LAZY_MATERIALIZATION().key(),
(Boolean) CometConf.COMET_USE_LAZY_MATERIALIZATION().defaultValue().get());
long start = file.start();
long length = file.length();
String filePath = file.filePath().toString();
ParquetReadOptions.Builder builder = HadoopReadOptions.builder(conf, new Path(filePath));
if (start >= 0 && length >= 0) {
builder = builder.withRange(start, start + length);
}
ParquetReadOptions readOptions = builder.build();
// TODO: enable off-heap buffer when they are ready
ReadOptions cometReadOptions = ReadOptions.builder(conf).build();
Path path = new Path(new URI(filePath));
fileReader =
new FileReader(
CometInputFile.fromPath(path, conf), footer, readOptions, cometReadOptions, metrics);
requestedSchema = fileReader.getFileMetaData().getSchema();
MessageType fileSchema = requestedSchema;
if (sparkSchema == null) {
sparkSchema = new ParquetToSparkSchemaConverter(conf).convert(requestedSchema);
} else {
requestedSchema =
CometParquetReadSupport.clipParquetSchema(
requestedSchema, sparkSchema, isCaseSensitive, useFieldId, ignoreMissingIds);
if (requestedSchema.getFieldCount() != sparkSchema.size()) {
throw new IllegalArgumentException(
String.format(
"Spark schema has %d columns while " + "Parquet schema has %d columns",
sparkSchema.size(), requestedSchema.getColumns().size()));
}
}
totalRowCount = fileReader.getRecordCount();
List<ColumnDescriptor> columns = requestedSchema.getColumns();
int numColumns = columns.size();
if (partitionSchema != null) numColumns += partitionSchema.size();
columnReaders = new AbstractColumnReader[numColumns];
// Initialize missing columns and use null vectors for them
missingColumns = new boolean[columns.size()];
List<String[]> paths = requestedSchema.getPaths();
// We do not need the column index of the row index; but this method has the
// side effect of throwing an exception if a column with the same name is
// found which we do want (spark unit tests explicitly test for that).
ShimFileFormat.findRowIndexColumnIndexInSchema(sparkSchema);
StructField[] nonPartitionFields = sparkSchema.fields();
for (int i = 0; i < requestedSchema.getFieldCount(); i++) {
Type t = requestedSchema.getFields().get(i);
Preconditions.checkState(
t.isPrimitive() && !t.isRepetition(Type.Repetition.REPEATED),
"Complex type is not supported");
String[] colPath = paths.get(i);
if (nonPartitionFields[i].name().equals(ShimFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME())) {
// Values of ROW_INDEX_TEMPORARY_COLUMN_NAME column are always populated with
// generated row indexes, rather than read from the file.
// TODO(SPARK-40059): Allow users to include columns named
// FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME in their schemas.
long[] rowIndices = fileReader.getRowIndices();
columnReaders[i] = new RowIndexColumnReader(nonPartitionFields[i], capacity, rowIndices);
missingColumns[i] = true;
} else if (fileSchema.containsPath(colPath)) {
ColumnDescriptor fd = fileSchema.getColumnDescription(colPath);
if (!fd.equals(columns.get(i))) {
throw new UnsupportedOperationException("Schema evolution is not supported");
}
missingColumns[i] = false;
} else {
if (columns.get(i).getMaxDefinitionLevel() == 0) {
throw new IOException(
"Required column '"
+ Arrays.toString(colPath)
+ "' is missing"
+ " in data file "
+ filePath);
}
ConstantColumnReader reader =
new ConstantColumnReader(nonPartitionFields[i], capacity, useDecimal128);
columnReaders[i] = reader;
missingColumns[i] = true;
}
}
// Initialize constant readers for partition columns
if (partitionSchema != null) {
StructField[] partitionFields = partitionSchema.fields();
for (int i = columns.size(); i < columnReaders.length; i++) {
int fieldIndex = i - columns.size();
StructField field = partitionFields[fieldIndex];
ConstantColumnReader reader =
new ConstantColumnReader(field, capacity, partitionValues, fieldIndex, useDecimal128);
columnReaders[i] = reader;
}
}
vectors = new CometVector[numColumns];
currentBatch = new ColumnarBatch(vectors);
fileReader.setRequestedSchema(requestedSchema.getColumns());
// For test purpose only
// If the last external accumulator is `NumRowGroupsAccumulator`, the row group number to read
// will be updated to the accumulator. So we can check if the row groups are filtered or not
// in test case.
// Note that this tries to get thread local TaskContext object, if this is called at other
// thread, it won't update the accumulator.
if (taskContext != null) {
Option<AccumulatorV2<?, ?>> accu =
ShimTaskMetrics.getTaskAccumulator(taskContext.taskMetrics());
if (accu.isDefined() && accu.get().getClass().getSimpleName().equals("NumRowGroupsAcc")) {
@SuppressWarnings("unchecked")
AccumulatorV2<Integer, Integer> intAccum = (AccumulatorV2<Integer, Integer>) accu.get();
intAccum.add(fileReader.getRowGroups().size());
}
}
// Pre-fetching
boolean preFetchEnabled =
conf.getBoolean(
CometConf.COMET_SCAN_PREFETCH_ENABLED().key(),
(boolean) CometConf.COMET_SCAN_PREFETCH_ENABLED().defaultValue().get());
if (preFetchEnabled) {
LOG.info("Prefetch enabled for BatchReader.");
this.prefetchQueue = new LinkedBlockingQueue<>();
}
isInitialized = true;
synchronized (this) {
// if prefetch is enabled, `init()` is called in separate thread. When
// `BatchReader.nextBatch()` is called asynchronously, it is possibly that
// `init()` is not called or finished. We need to hold on `nextBatch` until
// initialization of `BatchReader` is done. Once we are close to finish
// initialization, we notify the waiting thread of `nextBatch` to continue.
notifyAll();
}
}