protected void computeScanRangeLocations()

in fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java [1240:1432]


  protected void computeScanRangeLocations(Analyzer analyzer)
      throws ImpalaRuntimeException {
    long scanRangeBytesLimit = analyzer.getQueryCtx().client_request.getQuery_options()
        .getMax_scan_range_length();
    if (RuntimeEnv.INSTANCE.hasTableScanRangeLimit() && desc_.getTableName() != null) {
      long testLimit = RuntimeEnv.INSTANCE.getTableScanRangeLimit(
          desc_.getTableName().getDb(), desc_.getTableName().getTbl());
      if (testLimit > 0
          && (scanRangeBytesLimit == 0 || scanRangeBytesLimit > testLimit)) {
        scanRangeBytesLimit = testLimit;
      }
    }

    // Initialize class fields that related to scan ranges.
    scanRangeSpecs_ = new TScanRangeSpec();
    generatedScanRangeCount_ = 0;
    largestScanRangeBytes_ = 0;
    maxScanRangeNumRows_ = -1;
    numScanRangesNoDiskIds_ = 0;
    numFilesNoDiskIds_ = 0;
    numPartitionsNoDiskIds_ = 0;
    numPartitionsPerFs_ = new TreeMap<>();
    totalFilesPerFs_ = new TreeMap<>();
    totalBytesPerFs_ = new TreeMap<>();
    totalFilesPerFsEC_ = new TreeMap<>();
    totalBytesPerFsEC_ = new TreeMap<>();

    Preconditions.checkState((sampleParams_ == null) == (sampledPartitions_ == null));
    // Assume all table files are in parquet format and all in columnar format
    // until proven otherwise.
    boolean allParquet = true;
    boolean allColumnarFormat = true;

    if (this instanceof IcebergScanNode && this.fileFormats_.isEmpty()) {
      // Iceberg tables always have one partition, even if the scan node contains zero
      // file descriptors. TODO: IMPALA-13267
      allParquet = false;
      allColumnarFormat = false;
    } else {
      // If table has no partition, then it is not all parquet, nor it is all columnar.
      int partitionsSize = getSampledOrRawPartitions().size();
      allParquet = partitionsSize > 0;
      allColumnarFormat = partitionsSize > 0;
    }
    long simpleLimitNumRows = 0; // only used for the simple limit case
    boolean isSimpleLimit = sampleParams_ == null &&
        (analyzer.getQueryCtx().client_request.getQuery_options()
            .isOptimize_simple_limit()
        && analyzer.getSimpleLimitStatus() != null
        && analyzer.getSimpleLimitStatus().first);

    // Save the last looked up FileSystem object. It is enough for the scheme and
    // authority part of the URI to match to ensure that getFileSystem() would return the
    // same file system. See Hadoop's filesystem caching implementation at
    // https://github.com/apache/hadoop/blob/1046f9cf9888155c27923f3f56efa107d908ad5b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java#L3867
    // Note that in the Hadoop code the slow part is UserGroupInformation.getCurrentUser()
    // which is not important here as the user is always the same in Impala.
    String lastFsScheme = null;
    String lastFsAuthority = null;
    FileSystem lastFileSytem = null;
    for (FeFsPartition partition : getSampledOrRawPartitions()) {
      // Save location to local variable because getLocation() can be slow as it needs to
      // decompress the partition's location.
      String partitionLocation = partition.getLocation();
      Path partitionPath = new Path(partitionLocation);
      String fsScheme = partitionPath.toUri().getScheme();
      String fsAuthority = partitionPath.toUri().getAuthority();
      FileSystemUtil.FsType fsType = FileSystemUtil.FsType.getFsType(fsScheme);

      FileSystem partitionFs;
      if (lastFileSytem != null &&
         Objects.equals(lastFsScheme, fsScheme) &&
         Objects.equals(lastFsAuthority, fsAuthority)) {
        partitionFs = lastFileSytem;
      } else {
        try {
          partitionFs = partitionPath.getFileSystem(CONF);
        } catch (IOException e) {
          throw new ImpalaRuntimeException("Error determining partition fs type", e);
        }
        lastFsScheme = fsScheme;
        lastFsAuthority = fsAuthority;
        lastFileSytem = partitionFs;
      }

      // Missing disk id accounting is only done for file systems that support the notion
      // of disk/storage ids.
      boolean fsHasBlocks = FileSystemUtil.supportsStorageIds(partitionFs);
      List<FileDescriptor> fileDescs = getFileDescriptorsWithLimit(partition, fsHasBlocks,
          isSimpleLimit ? analyzer.getSimpleLimitStatus().second - simpleLimitNumRows
                        : -1);
      // conservatively estimate 1 row per file
      simpleLimitNumRows += fileDescs.size();
      if (sampledFiles_ != null) {
        // If we are sampling, get files in the sample.
        fileDescs = sampledFiles_.get(partition.getId());
      }

      long partitionNumRows = partition.getNumRows();
      analyzer.getDescTbl().addReferencedPartition(tbl_, partition.getId());
      if (!partition.getFileFormat().isParquetBased()) {
        allParquet = false;
      }
      allColumnarFormat =
          allColumnarFormat && VALID_COLUMNAR_FORMATS.contains(partition.getFileFormat());

      if (!fsHasBlocks) {
        // Limit the scan range length if generating scan ranges (and we're not
        // short-circuiting the scan for a partition key scan).
        long defaultBlockSize = (partition.getFileFormat().isParquetBased()) ?
            analyzer.getQueryOptions().parquet_object_store_split_size :
            partitionFs.getDefaultBlockSize(partitionPath);
        long maxBlockSize =
            Math.max(defaultBlockSize, FileDescriptor.MIN_SYNTHETIC_BLOCK_SIZE);
        if (scanRangeBytesLimit > 0) {
          scanRangeBytesLimit = Math.min(scanRangeBytesLimit, maxBlockSize);
        } else {
          scanRangeBytesLimit = maxBlockSize;
        }
      }
      final long partitionBytes = FileDescriptor.computeTotalFileLength(fileDescs);
      long partitionMaxScanRangeBytes = 0;
      boolean partitionMissingDiskIds = false;
      totalBytesPerFs_.merge(fsType, partitionBytes, Long::sum);
      totalFilesPerFs_.merge(fsType, (long) fileDescs.size(), Long::sum);
      numPartitionsPerFs_.merge(fsType, 1L, Long::sum);

      for (FileDescriptor fileDesc: fileDescs) {
        if (!analyzer.getQueryOptions().isAllow_erasure_coded_files() &&
            fileDesc.getIsEc()) {
          throw new ImpalaRuntimeException(String
              .format("Scanning of HDFS erasure-coded file (%s) is not supported",
                  fileDesc.getAbsolutePath(partitionLocation)));
        }

        // Accumulate on the number of EC files and the total size of such files.
        if (fileDesc.getIsEc()) {
          totalFilesPerFsEC_.merge(fsType, 1L, Long::sum);
          totalBytesPerFsEC_.merge(fsType, fileDesc.getFileLength(), Long::sum);
        }

        // If parquet count star optimization is enabled, we only need the
        // 'RowGroup.num_rows' in file metadata, thus only the scan range that contains
        // a file footer is required.
        // IMPALA-8834 introduced the optimization for partition key scan by generating
        // one scan range for each HDFS file. With Parquet and ORC, we only need to get
        // the scan range that contains a file footer for short-circuiting.
        boolean isFooterOnly = countStarSlot_ != null
            || (isPartitionKeyScan_
                && (partition.getFileFormat().isParquetBased()
                    || partition.getFileFormat() == HdfsFileFormat.ORC));

        if (!fsHasBlocks) {
          Preconditions.checkState(fileDesc.getNumFileBlocks() == 0);
          generateScanRangeSpecs(
              partition, partitionLocation, fileDesc, scanRangeBytesLimit, isFooterOnly);
        } else {
          // Skips files that have no associated blocks.
          if (fileDesc.getNumFileBlocks() == 0) continue;
          Pair<Boolean, Long> result =
              transformBlocksToScanRanges(partition, partitionLocation, fsType, fileDesc,
                  fsHasBlocks, scanRangeBytesLimit, analyzer, isFooterOnly);
          partitionMaxScanRangeBytes =
              Math.max(partitionMaxScanRangeBytes, result.second);
          if (result.first) partitionMissingDiskIds = true;
        }
      }
      if (partitionMissingDiskIds) ++numPartitionsNoDiskIds_;
      if (partitionMaxScanRangeBytes > 0 && partitionNumRows >= 0) {
        updateMaxScanRangeNumRows(
            partitionNumRows, partitionBytes, partitionMaxScanRangeBytes);
      }
      if (isSimpleLimit && simpleLimitNumRows ==
          analyzer.getSimpleLimitStatus().second) {
        // for the simple limit case if the estimated rows has already reached the limit
        // there's no need to process more partitions
        break;
      }
    }
    allParquet_ = allParquet;
    allColumnarFormat_ = allColumnarFormat;
    if (totalFilesPerFs_.isEmpty() || sumValues(totalFilesPerFs_) == 0) {
      maxScanRangeNumRows_ = 0;
    } else {
      // Also estimate max rows per scan range based on table-level stats, in case some
      // or all partition-level stats were missing.
      long tableNumRows = tbl_.getNumRows();
      if (tableNumRows >= 0) {
        updateMaxScanRangeNumRows(
            tableNumRows, sumValues(totalBytesPerFs_), largestScanRangeBytes_);
      }
    }
  }