in fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java [1240:1432]
protected void computeScanRangeLocations(Analyzer analyzer)
throws ImpalaRuntimeException {
long scanRangeBytesLimit = analyzer.getQueryCtx().client_request.getQuery_options()
.getMax_scan_range_length();
if (RuntimeEnv.INSTANCE.hasTableScanRangeLimit() && desc_.getTableName() != null) {
long testLimit = RuntimeEnv.INSTANCE.getTableScanRangeLimit(
desc_.getTableName().getDb(), desc_.getTableName().getTbl());
if (testLimit > 0
&& (scanRangeBytesLimit == 0 || scanRangeBytesLimit > testLimit)) {
scanRangeBytesLimit = testLimit;
}
}
// Initialize class fields that related to scan ranges.
scanRangeSpecs_ = new TScanRangeSpec();
generatedScanRangeCount_ = 0;
largestScanRangeBytes_ = 0;
maxScanRangeNumRows_ = -1;
numScanRangesNoDiskIds_ = 0;
numFilesNoDiskIds_ = 0;
numPartitionsNoDiskIds_ = 0;
numPartitionsPerFs_ = new TreeMap<>();
totalFilesPerFs_ = new TreeMap<>();
totalBytesPerFs_ = new TreeMap<>();
totalFilesPerFsEC_ = new TreeMap<>();
totalBytesPerFsEC_ = new TreeMap<>();
Preconditions.checkState((sampleParams_ == null) == (sampledPartitions_ == null));
// Assume all table files are in parquet format and all in columnar format
// until proven otherwise.
boolean allParquet = true;
boolean allColumnarFormat = true;
if (this instanceof IcebergScanNode && this.fileFormats_.isEmpty()) {
// Iceberg tables always have one partition, even if the scan node contains zero
// file descriptors. TODO: IMPALA-13267
allParquet = false;
allColumnarFormat = false;
} else {
// If table has no partition, then it is not all parquet, nor it is all columnar.
int partitionsSize = getSampledOrRawPartitions().size();
allParquet = partitionsSize > 0;
allColumnarFormat = partitionsSize > 0;
}
long simpleLimitNumRows = 0; // only used for the simple limit case
boolean isSimpleLimit = sampleParams_ == null &&
(analyzer.getQueryCtx().client_request.getQuery_options()
.isOptimize_simple_limit()
&& analyzer.getSimpleLimitStatus() != null
&& analyzer.getSimpleLimitStatus().first);
// Save the last looked up FileSystem object. It is enough for the scheme and
// authority part of the URI to match to ensure that getFileSystem() would return the
// same file system. See Hadoop's filesystem caching implementation at
// https://github.com/apache/hadoop/blob/1046f9cf9888155c27923f3f56efa107d908ad5b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java#L3867
// Note that in the Hadoop code the slow part is UserGroupInformation.getCurrentUser()
// which is not important here as the user is always the same in Impala.
String lastFsScheme = null;
String lastFsAuthority = null;
FileSystem lastFileSytem = null;
for (FeFsPartition partition : getSampledOrRawPartitions()) {
// Save location to local variable because getLocation() can be slow as it needs to
// decompress the partition's location.
String partitionLocation = partition.getLocation();
Path partitionPath = new Path(partitionLocation);
String fsScheme = partitionPath.toUri().getScheme();
String fsAuthority = partitionPath.toUri().getAuthority();
FileSystemUtil.FsType fsType = FileSystemUtil.FsType.getFsType(fsScheme);
FileSystem partitionFs;
if (lastFileSytem != null &&
Objects.equals(lastFsScheme, fsScheme) &&
Objects.equals(lastFsAuthority, fsAuthority)) {
partitionFs = lastFileSytem;
} else {
try {
partitionFs = partitionPath.getFileSystem(CONF);
} catch (IOException e) {
throw new ImpalaRuntimeException("Error determining partition fs type", e);
}
lastFsScheme = fsScheme;
lastFsAuthority = fsAuthority;
lastFileSytem = partitionFs;
}
// Missing disk id accounting is only done for file systems that support the notion
// of disk/storage ids.
boolean fsHasBlocks = FileSystemUtil.supportsStorageIds(partitionFs);
List<FileDescriptor> fileDescs = getFileDescriptorsWithLimit(partition, fsHasBlocks,
isSimpleLimit ? analyzer.getSimpleLimitStatus().second - simpleLimitNumRows
: -1);
// conservatively estimate 1 row per file
simpleLimitNumRows += fileDescs.size();
if (sampledFiles_ != null) {
// If we are sampling, get files in the sample.
fileDescs = sampledFiles_.get(partition.getId());
}
long partitionNumRows = partition.getNumRows();
analyzer.getDescTbl().addReferencedPartition(tbl_, partition.getId());
if (!partition.getFileFormat().isParquetBased()) {
allParquet = false;
}
allColumnarFormat =
allColumnarFormat && VALID_COLUMNAR_FORMATS.contains(partition.getFileFormat());
if (!fsHasBlocks) {
// Limit the scan range length if generating scan ranges (and we're not
// short-circuiting the scan for a partition key scan).
long defaultBlockSize = (partition.getFileFormat().isParquetBased()) ?
analyzer.getQueryOptions().parquet_object_store_split_size :
partitionFs.getDefaultBlockSize(partitionPath);
long maxBlockSize =
Math.max(defaultBlockSize, FileDescriptor.MIN_SYNTHETIC_BLOCK_SIZE);
if (scanRangeBytesLimit > 0) {
scanRangeBytesLimit = Math.min(scanRangeBytesLimit, maxBlockSize);
} else {
scanRangeBytesLimit = maxBlockSize;
}
}
final long partitionBytes = FileDescriptor.computeTotalFileLength(fileDescs);
long partitionMaxScanRangeBytes = 0;
boolean partitionMissingDiskIds = false;
totalBytesPerFs_.merge(fsType, partitionBytes, Long::sum);
totalFilesPerFs_.merge(fsType, (long) fileDescs.size(), Long::sum);
numPartitionsPerFs_.merge(fsType, 1L, Long::sum);
for (FileDescriptor fileDesc: fileDescs) {
if (!analyzer.getQueryOptions().isAllow_erasure_coded_files() &&
fileDesc.getIsEc()) {
throw new ImpalaRuntimeException(String
.format("Scanning of HDFS erasure-coded file (%s) is not supported",
fileDesc.getAbsolutePath(partitionLocation)));
}
// Accumulate on the number of EC files and the total size of such files.
if (fileDesc.getIsEc()) {
totalFilesPerFsEC_.merge(fsType, 1L, Long::sum);
totalBytesPerFsEC_.merge(fsType, fileDesc.getFileLength(), Long::sum);
}
// If parquet count star optimization is enabled, we only need the
// 'RowGroup.num_rows' in file metadata, thus only the scan range that contains
// a file footer is required.
// IMPALA-8834 introduced the optimization for partition key scan by generating
// one scan range for each HDFS file. With Parquet and ORC, we only need to get
// the scan range that contains a file footer for short-circuiting.
boolean isFooterOnly = countStarSlot_ != null
|| (isPartitionKeyScan_
&& (partition.getFileFormat().isParquetBased()
|| partition.getFileFormat() == HdfsFileFormat.ORC));
if (!fsHasBlocks) {
Preconditions.checkState(fileDesc.getNumFileBlocks() == 0);
generateScanRangeSpecs(
partition, partitionLocation, fileDesc, scanRangeBytesLimit, isFooterOnly);
} else {
// Skips files that have no associated blocks.
if (fileDesc.getNumFileBlocks() == 0) continue;
Pair<Boolean, Long> result =
transformBlocksToScanRanges(partition, partitionLocation, fsType, fileDesc,
fsHasBlocks, scanRangeBytesLimit, analyzer, isFooterOnly);
partitionMaxScanRangeBytes =
Math.max(partitionMaxScanRangeBytes, result.second);
if (result.first) partitionMissingDiskIds = true;
}
}
if (partitionMissingDiskIds) ++numPartitionsNoDiskIds_;
if (partitionMaxScanRangeBytes > 0 && partitionNumRows >= 0) {
updateMaxScanRangeNumRows(
partitionNumRows, partitionBytes, partitionMaxScanRangeBytes);
}
if (isSimpleLimit && simpleLimitNumRows ==
analyzer.getSimpleLimitStatus().second) {
// for the simple limit case if the estimated rows has already reached the limit
// there's no need to process more partitions
break;
}
}
allParquet_ = allParquet;
allColumnarFormat_ = allColumnarFormat;
if (totalFilesPerFs_.isEmpty() || sumValues(totalFilesPerFs_) == 0) {
maxScanRangeNumRows_ = 0;
} else {
// Also estimate max rows per scan range based on table-level stats, in case some
// or all partition-level stats were missing.
long tableNumRows = tbl_.getNumRows();
if (tableNumRows >= 0) {
updateMaxScanRangeNumRows(
tableNumRows, sumValues(totalBytesPerFs_), largestScanRangeBytes_);
}
}
}