public void readEncodedColumns()

in ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java [374:655]


  public void readEncodedColumns(int stripeIx, StripeInformation stripe,
      OrcProto.RowIndex[] indexes, List<OrcProto.ColumnEncoding> encodings,
      List<OrcProto.Stream> streamList, boolean[] physicalFileIncludes, boolean[] rgs,
      Consumer<OrcEncodedColumnBatch> consumer) throws IOException {
    // Note: for now we don't have to setError here, caller will setError if we throw.
    // We are also not supposed to call setDone, since we are only part of the operation.
    long stripeOffset = stripe.getOffset();
    // 1. Figure out what we have to read.
    long offset = 0; // Stream offset in relation to the stripe.
    // 1.1. Figure out which columns have a present stream
    boolean[] hasNull = findPresentStreamsByColumn(streamList, types);
    if (isTracingEnabled) {
      LOG.trace("The following columns have PRESENT streams: " + arrayToString(hasNull));
    }

    // We assume stream list is sorted by column and that non-data
    // streams do not interleave data streams for the same column.
    // 1.2. With that in mind, determine disk ranges to read/get from cache (not by stream).
    ColumnReadContext[] colCtxs = new ColumnReadContext[physicalFileIncludes.length];
    int colRgIx = -1;
    // Don't create context for the 0-s column.
    for (int i = 1; i < physicalFileIncludes.length; ++i) {
      if (!physicalFileIncludes[i]) continue;
      ColumnEncoding enc = encodings.get(i);
      colCtxs[i] = new ColumnReadContext(i, enc, indexes[i], ++colRgIx);
      if (isTracingEnabled) {
        LOG.trace("Creating context: " + colCtxs[i].toString());
      }
      trace.logColumnRead(i, colRgIx, enc.getKind());
    }
    CreateHelper listToRead = new CreateHelper();
    boolean hasIndexOnlyCols = false, hasAnyNonData = false;
    for (OrcProto.Stream stream : streamList) {
      long length = stream.getLength();
      int colIx = stream.getColumn();
      OrcProto.Stream.Kind streamKind = stream.getKind();
      boolean isIndexCol = StreamName.getArea(streamKind) != StreamName.Area.DATA;
      hasAnyNonData = hasAnyNonData || isIndexCol;
      // We have a stream for included column, but in future it might have no data streams.
      // It's more like "has at least one column included that has an index stream".
      hasIndexOnlyCols = hasIndexOnlyCols || (isIndexCol && physicalFileIncludes[colIx]);
      if (!physicalFileIncludes[colIx] || isIndexCol) {
        if (isTracingEnabled) {
          LOG.trace("Skipping stream for column " + colIx + ": "
              + streamKind + " at " + offset + ", " + length);
        }
        trace.logSkipStream(colIx, streamKind, offset, length);
        offset += length;
        continue;
      }
      ColumnReadContext ctx = colCtxs[colIx];
      assert ctx != null;
      int indexIx = RecordReaderUtils.getIndexPosition(ctx.encoding.getKind(),
          fileSchema.findSubtype(colIx).getCategory(), streamKind, isCompressed, hasNull[colIx]);
      ctx.addStream(offset, stream, indexIx);
      if (isTracingEnabled) {
        LOG.trace("Adding stream for column " + colIx + ": " + streamKind + " at " + offset
            + ", " + length + ", index position " + indexIx);
      }
      if (rgs == null || RecordReaderUtils.isDictionary(streamKind, encodings.get(colIx))) {
        trace.logAddStream(colIx, streamKind, offset, length, indexIx, true);
        addEntireStreamToRanges(offset, length, listToRead, true);
        if (isTracingEnabled) {
          LOG.trace("Will read whole stream " + streamKind + "; added to " + listToRead.getTail());
        }
      } else {
        trace.logAddStream(colIx, streamKind, offset, length, indexIx, false);
        addRgFilteredStreamToRanges(stream, rgs,
            isCompressed, indexes[colIx], encodings.get(colIx), fileSchema.findSubtype(colIx).getCategory(),
            bufferSize, hasNull[colIx], offset, length, listToRead, true);
      }
      offset += length;
    }

    boolean hasFileId = this.fileKey != null;
    if (listToRead.get() == null) {
      // No data to read for this stripe. Check if we have some included index-only columns.
      // For example, count(1) would have the root column, that has no data stream, included.
      // It may also happen that we have a column included with no streams whatsoever. That
      // should only be possible if the file has no index streams.
      boolean hasAnyIncludes = false;
      if (!hasIndexOnlyCols) {
        for (int i = 0; i < physicalFileIncludes.length; ++i) {
          if (!physicalFileIncludes[i]) continue;
          hasAnyIncludes = true;
          break;
        }
      }
      boolean nonProjectionRead = hasIndexOnlyCols || (!hasAnyNonData && hasAnyIncludes);

      // TODO: Could there be partial RG filtering w/no projection?
      //       We should probably just disable filtering for such cases if they exist.
      if (nonProjectionRead && (rgs == SargApplier.READ_ALL_RGS)) {
        OrcEncodedColumnBatch ecb = POOLS.ecbPool.take();
        ecb.init(fileKey, stripeIx, OrcEncodedColumnBatch.ALL_RGS, physicalFileIncludes.length);
        try {
          consumer.consumeData(ecb);
        } catch (InterruptedException e) {
          LOG.error("IO thread interrupted while queueing data");
          throw new IOException(e);
        }
      } else {
        LOG.warn("Nothing to read for stripe [" + stripe + "]");
      }
      return;
    }

    // 2. Now, read all of the ranges from cache or disk.
    IdentityHashMap<ByteBuffer, Boolean> toRelease = new IdentityHashMap<>();
    MutateHelper toRead = getDataFromCacheAndDisk(
        listToRead.get(), stripeOffset, hasFileId, toRelease);


    // 3. For uncompressed case, we need some special processing before read.
    //    Basically, we are trying to create artificial, consistent ranges to cache, as there are
    //    no CBs in an uncompressed file. At the end of this processing, the list would contain
    //    either cache buffers, or buffers allocated by us and not cached (if we are only reading
    //    parts of the data for some ranges and don't want to cache it). Both are represented by
    //    CacheChunks, so the list is just CacheChunk-s from that point on.
    DiskRangeList iter = preReadUncompressedStreams(stripeOffset, colCtxs, toRead, toRelease);

    // 4. Finally, decompress data, map per RG, and return to caller.
    // We go by RG and not by column because that is how data is processed.
    boolean hasError = true;
    try {
      int rgCount = rowIndexStride == 0 ? 1 : (int)Math.ceil((double)stripe.getNumberOfRows() / rowIndexStride);
      for (int rgIx = 0; rgIx < rgCount; ++rgIx) {
        if (rgs != null && !rgs[rgIx]) {
          continue; // RG filtered.
        }
        boolean isLastRg = rgIx == rgCount - 1;
        // Create the batch we will use to return data for this RG.
        OrcEncodedColumnBatch ecb = POOLS.ecbPool.take();
        trace.logStartRg(rgIx);
        boolean hasErrorForEcb = true;
        try {
          ecb.init(fileKey, stripeIx, rgIx, physicalFileIncludes.length);
          for (int colIx = 0; colIx < colCtxs.length; ++colIx) {
            ColumnReadContext ctx = colCtxs[colIx];
            if (ctx == null) continue; // This column is not included

            OrcProto.RowIndexEntry index;
            OrcProto.RowIndexEntry nextIndex;
            // index is disabled
            if (ctx.rowIndex == null) {
              if (isTracingEnabled) {
                LOG.trace("Row index is null. Likely reading a file with indexes disabled.");
              }
              index = null;
              nextIndex = null;
            } else {
              index = ctx.rowIndex.getEntry(rgIx);
              nextIndex = isLastRg ? null : ctx.rowIndex.getEntry(rgIx + 1);
            }
            if (isTracingEnabled) {
              LOG.trace("ctx: {} rgIx: {} isLastRg: {} rgCount: {}", ctx, rgIx, isLastRg, rgCount);
            }
            ecb.initOrcColumn(ctx.colIx);
            trace.logStartCol(ctx.colIx);
            for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
              StreamContext sctx = ctx.streams[streamIx];
              ColumnStreamData cb = null;
              try {
                if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding) || index == null) {
                  // This stream is for entire stripe and needed for every RG; uncompress once and reuse.
                  if (sctx.stripeLevelStream == null) {
                    if (isTracingEnabled) {
                      LOG.trace("Getting stripe-level stream [" + sctx.kind + ", " + ctx.encoding + "] for"
                          + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " + sctx.length);
                    }
                    trace.logStartStripeStream(sctx.kind);
                    sctx.stripeLevelStream = POOLS.csdPool.take();
                    // We will be using this for each RG while also sending RGs to processing.
                    // To avoid buffers being unlocked, run refcount one ahead; so each RG
                    // processing will decref once, and the last one will unlock the buffers.
                    sctx.stripeLevelStream.incRef();
                    // For stripe-level streams we don't need the extra refcount on the block.
                    // See class comment about refcounts.
                    long unlockUntilCOffset = sctx.offset + sctx.length;
                    DiskRangeList lastCached = readEncodedStream(stripeOffset, iter,
                        sctx.offset, sctx.offset + sctx.length, sctx.stripeLevelStream,
                        unlockUntilCOffset, sctx.offset, toRelease);
                    if (lastCached != null) {
                      iter = lastCached;
                    }
                  }
                  sctx.stripeLevelStream.incRef();
                  cb = sctx.stripeLevelStream;
                } else {
                  // This stream can be separated by RG using index. Let's do that.
                  // Offset to where this RG begins.
                  long cOffset = sctx.offset + index.getPositions(sctx.streamIndexOffset);
                  // Offset relative to the beginning of the stream of where this RG ends.
                  long nextCOffsetRel = isLastRg ? sctx.length
                      : nextIndex.getPositions(sctx.streamIndexOffset);
                  // Offset before which this RG is guaranteed to end. Can only be estimated.
                  // We estimate the same way for compressed and uncompressed for now.
                  long endCOffset = sctx.offset + estimateRgEndOffset(
                      isCompressed, isLastRg, nextCOffsetRel, sctx.length, bufferSize);
                  // As we read, we can unlock initial refcounts for the buffers that end before
                  // the data that we need for this RG.
                  long unlockUntilCOffset = sctx.offset + nextCOffsetRel;
                  cb = createRgColumnStreamData(rgIx, isLastRg, ctx.colIx, sctx,
                      cOffset, endCOffset, isCompressed, unlockUntilCOffset);
                  boolean isStartOfStream = sctx.bufferIter == null;
                  DiskRangeList lastCached = readEncodedStream(stripeOffset,
                      (isStartOfStream ? iter : sctx.bufferIter), cOffset, endCOffset, cb,
                      unlockUntilCOffset, sctx.offset, toRelease);
                  if (lastCached != null) {
                    sctx.bufferIter = iter = lastCached;
                  }
                }
              } catch (Exception ex) {
                DiskRangeList drl = toRead == null ? null : toRead.next;
                LOG.error("Error getting stream [" + sctx.kind + ", " + ctx.encoding + "] for"
                    + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", "
                    + sctx.length + "; toRead " + RecordReaderUtils.stringifyDiskRanges(drl), ex);
                throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex);
              } finally {
                // Always add stream data to ecb; releaseEcbRefCountsOnError relies on it.
                // Otherwise, we won't release consumer refcounts for a partially read stream.
                if (cb != null) {
                  ecb.setStreamData(ctx.colIx, sctx.kind.getNumber(), cb);
                }
              }
            }
          }
          hasErrorForEcb = false;
        } finally {
          if (hasErrorForEcb) {
            releaseEcbRefCountsOnError(ecb);
          }
        }
        try {
          consumer.consumeData(ecb);
          // After this, the non-initial refcounts are the responsibility of the consumer.
        } catch (InterruptedException e) {
          LOG.error("IO thread interrupted while queueing data");
          releaseEcbRefCountsOnError(ecb);
          throw new IOException(e);
        }
      }

      if (isTracingEnabled) {
        LOG.trace("Disk ranges after preparing all the data "
            + RecordReaderUtils.stringifyDiskRanges(toRead.next));
      }
      trace.logRanges(fileKey, stripeOffset, toRead.next, RangesSrc.PREREAD);
      hasError = false;
    } finally {
      try {
        // Release the unreleased stripe-level buffers. See class comment about refcounts.
        for (int colIx = 0; colIx < colCtxs.length; ++colIx) {
          ColumnReadContext ctx = colCtxs[colIx];
          if (ctx == null) continue; // This column is not included.
          for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
            StreamContext sctx = ctx.streams[streamIx];
            if (sctx == null || sctx.stripeLevelStream == null) continue;
            if (0 != sctx.stripeLevelStream.decRef()) continue;
            // Note - this is a little bit confusing; the special treatment of stripe-level buffers
            // is because we run the ColumnStreamData refcount one ahead (as specified above). It
            // may look like this would release the buffers too many times (one release from the
            // consumer, one from releaseInitialRefcounts below, and one here); however, this is
            // merely handling a special case where all the batches that are sharing the stripe-
            // level stream have been processed before we got here; they have all decRef-ed the CSD,
            // but have not released the buffers because of that extra refCount. So, this is
            // essentially the "consumer" refcount being released here.
            for (MemoryBuffer buf : sctx.stripeLevelStream.getCacheBuffers()) {
              LOG.trace("Unlocking {} at the end of processing", buf);
              cacheWrapper.releaseBuffer(buf);
            }
          }
        }
        releaseInitialRefcounts(toRead.next);
        // Release buffers as we are done with all the streams... also see toRelease comment.
        releaseBuffers(toRelease.keySet(), true);
      } catch (Throwable t) {
        if (!hasError) throw new IOException(t);
        LOG.error("Error during the cleanup after another error; ignoring", t);
      }
    }
  }