void WriterShared::flushStripe()

in velox/dwio/dwrf/writer/WriterShared.cpp [286:454]


void WriterShared::flushStripe(bool close) {
  auto& context = getContext();
  auto preFlushTotalMemoryUsage = getTotalMemoryUsage(context);
  int64_t preFlushStreamMemoryUsage =
      context.getMemoryUsage(MemoryUsageCategory::OUTPUT_STREAM)
          .getCurrentBytes();
  if (context.stripeRowCount == 0) {
    return;
  }

  dwio::common::MetricsLog::StripeFlushMetrics metrics;
  metrics.writerVersion =
      writerVersionToString(context.getConfig(Config::WRITER_VERSION));
  metrics.outputStreamMemoryEstimate = context.getEstimatedOutputStreamSize();
  metrics.stripeSizeEstimate =
      context.getEstimatedStripeSize(context.stripeRawSize);

  if (context.isIndexEnabled && context.indexRowCount > 0) {
    createRowIndexEntry();
  }

  auto& handler = context.getEncryptionHandler();
  EncodingManager encodingManager{handler};

  flushImpl([&](uint32_t nodeId) -> proto::ColumnEncoding& {
    return encodingManager.addEncodingToFooter(nodeId);
  });

  // Collects the memory increment from flushing data to output streams.
  auto flushOverhead =
      context.getMemoryUsage(MemoryUsageCategory::OUTPUT_STREAM)
          .getCurrentBytes() -
      preFlushStreamMemoryUsage;
  context.recordFlushOverhead(flushOverhead);
  metrics.flushOverhead = flushOverhead;

  auto& sink = getSink();
  auto stripeOffset = sink.size();

  uint32_t lastIndex = 0;
  uint64_t offset = 0;
  auto addStream = [&](const auto& stream, const auto& out) {
    proto::Stream* s;
    uint32_t currentIndex;
    auto nodeId = stream.node;
    s = encodingManager.addStreamToFooter(nodeId, currentIndex);

    // set offset only when needed, ie. when offset of current stream cannot be
    // calculated based on offset and length of previous stream. In that case,
    // it must be that current stream and previous stream doesn't belong to same
    // encryption group or neither are encrypted. So the logic is simplified to
    // check if group index are the same for current and previous stream
    if (offset > 0 && lastIndex != currentIndex) {
      s->set_offset(offset);
    }
    lastIndex = currentIndex;

    // Jolly/Presto readers can't read streams bigger than 2GB.
    validateStreamSize(stream, out.size());

    s->set_kind(static_cast<proto::Stream_Kind>(stream.kind));
    s->set_node(nodeId);
    s->set_column(stream.column);
    s->set_sequence(stream.sequence);
    s->set_length(out.size());
    s->set_usevints(context.getConfig(Config::USE_VINTS));
    offset += out.size();

    context.incrementNodeSize(nodeId, out.size());
  };

  // TODO: T45025996 Discard all empty streams at flush time.
  // deals with streams
  uint64_t indexLength = 0;
  sink.setMode(WriterSink::Mode::Index);
  auto planner = layoutPlannerFactory_(getStreamList(context), encodingManager);
  planner->plan();
  planner->iterateIndexStreams([&](auto& streamId, auto& content) {
    DWIO_ENSURE_EQ(
        streamId.kind,
        StreamKind::StreamKind_ROW_INDEX,
        "unexpected stream kind ",
        streamId.kind);
    indexLength += content.size();
    addStream(streamId, content);
    sink.addBuffers(content);
  });

  uint64_t dataLength = 0;
  sink.setMode(WriterSink::Mode::Data);
  planner->iterateDataStreams([&](auto& streamId, auto& content) {
    DWIO_ENSURE_NE(
        streamId.kind,
        StreamKind::StreamKind_ROW_INDEX,
        "unexpected stream kind ",
        streamId.kind);
    dataLength += content.size();
    addStream(streamId, content);
    sink.addBuffers(content);
  });
  DWIO_ENSURE_GT(dataLength, 0);
  metrics.stripeSize = dataLength;

  if (handler.isEncrypted()) {
    // fill encryption metadata
    for (uint32_t i = 0; i < handler.getEncryptionGroupCount(); ++i) {
      auto group = encodingManager.addEncryptionGroupToFooter();
      writeProtoAsString(
          *group,
          encodingManager.getEncryptionGroup(i),
          std::addressof(handler.getEncryptionProviderByIndex(i)));
    }
  }

  // flush footer
  uint64_t footerOffset = sink.size();
  DWIO_ENSURE_EQ(footerOffset, stripeOffset + dataLength + indexLength);

  sink.setMode(WriterSink::Mode::Footer);
  writeProto(encodingManager.getFooter());
  sink.setMode(WriterSink::Mode::None);

  auto& stripe = addStripeInfo();
  stripe.set_offset(stripeOffset);
  stripe.set_indexlength(indexLength);
  stripe.set_datalength(dataLength);
  stripe.set_footerlength(sink.size() - footerOffset);

  // set encryption key metadata
  if (handler.isEncrypted() && context.stripeIndex == 0) {
    for (uint32_t i = 0; i < handler.getEncryptionGroupCount(); ++i) {
      *stripe.add_keymetadata() =
          handler.getEncryptionProviderByIndex(i).getKey();
    }
  }

  context.recordAverageRowSize();
  context.recordCompressionRatio(dataLength);

  auto totalMemoryUsage = getTotalMemoryUsage(context);
  metrics.limit = totalMemoryUsage;
  metrics.availableMemory = context.getMemoryBudget() - totalMemoryUsage;

  auto& dictionaryDataMemoryUsage =
      context.getMemoryUsage(MemoryUsageCategory::DICTIONARY);
  metrics.dictionaryMemory = dictionaryDataMemoryUsage.getCurrentBytes();
  // TODO: what does this try to capture?
  metrics.maxDictSize = dictionaryDataMemoryUsage.getMaxBytes();

  metrics.stripeIndex = context.stripeIndex;
  metrics.rawStripeSize = context.stripeRawSize;
  metrics.rowsInStripe = context.stripeRowCount;
  metrics.compressionRatio = context.getCompressionRatio();
  metrics.flushOverheadRatio = context.getFlushOverheadRatio();
  metrics.averageRowSize = context.getAverageRowSize();
  metrics.groupSize = 0;
  metrics.close = close;

  LOG(INFO) << fmt::format(
      "Flush overhead = {}, data length = {}",
      metrics.flushOverhead,
      metrics.stripeSize);
  // Add flush overhead and other ratio logging.
  context.metricLogger->logStripeFlush(metrics);

  // prepare for next stripe
  context.nextStripe();
  resetImpl();
}