in velox/dwio/dwrf/writer/WriterShared.cpp [286:454]
void WriterShared::flushStripe(bool close) {
auto& context = getContext();
auto preFlushTotalMemoryUsage = getTotalMemoryUsage(context);
int64_t preFlushStreamMemoryUsage =
context.getMemoryUsage(MemoryUsageCategory::OUTPUT_STREAM)
.getCurrentBytes();
if (context.stripeRowCount == 0) {
return;
}
dwio::common::MetricsLog::StripeFlushMetrics metrics;
metrics.writerVersion =
writerVersionToString(context.getConfig(Config::WRITER_VERSION));
metrics.outputStreamMemoryEstimate = context.getEstimatedOutputStreamSize();
metrics.stripeSizeEstimate =
context.getEstimatedStripeSize(context.stripeRawSize);
if (context.isIndexEnabled && context.indexRowCount > 0) {
createRowIndexEntry();
}
auto& handler = context.getEncryptionHandler();
EncodingManager encodingManager{handler};
flushImpl([&](uint32_t nodeId) -> proto::ColumnEncoding& {
return encodingManager.addEncodingToFooter(nodeId);
});
// Collects the memory increment from flushing data to output streams.
auto flushOverhead =
context.getMemoryUsage(MemoryUsageCategory::OUTPUT_STREAM)
.getCurrentBytes() -
preFlushStreamMemoryUsage;
context.recordFlushOverhead(flushOverhead);
metrics.flushOverhead = flushOverhead;
auto& sink = getSink();
auto stripeOffset = sink.size();
uint32_t lastIndex = 0;
uint64_t offset = 0;
auto addStream = [&](const auto& stream, const auto& out) {
proto::Stream* s;
uint32_t currentIndex;
auto nodeId = stream.node;
s = encodingManager.addStreamToFooter(nodeId, currentIndex);
// set offset only when needed, ie. when offset of current stream cannot be
// calculated based on offset and length of previous stream. In that case,
// it must be that current stream and previous stream doesn't belong to same
// encryption group or neither are encrypted. So the logic is simplified to
// check if group index are the same for current and previous stream
if (offset > 0 && lastIndex != currentIndex) {
s->set_offset(offset);
}
lastIndex = currentIndex;
// Jolly/Presto readers can't read streams bigger than 2GB.
validateStreamSize(stream, out.size());
s->set_kind(static_cast<proto::Stream_Kind>(stream.kind));
s->set_node(nodeId);
s->set_column(stream.column);
s->set_sequence(stream.sequence);
s->set_length(out.size());
s->set_usevints(context.getConfig(Config::USE_VINTS));
offset += out.size();
context.incrementNodeSize(nodeId, out.size());
};
// TODO: T45025996 Discard all empty streams at flush time.
// deals with streams
uint64_t indexLength = 0;
sink.setMode(WriterSink::Mode::Index);
auto planner = layoutPlannerFactory_(getStreamList(context), encodingManager);
planner->plan();
planner->iterateIndexStreams([&](auto& streamId, auto& content) {
DWIO_ENSURE_EQ(
streamId.kind,
StreamKind::StreamKind_ROW_INDEX,
"unexpected stream kind ",
streamId.kind);
indexLength += content.size();
addStream(streamId, content);
sink.addBuffers(content);
});
uint64_t dataLength = 0;
sink.setMode(WriterSink::Mode::Data);
planner->iterateDataStreams([&](auto& streamId, auto& content) {
DWIO_ENSURE_NE(
streamId.kind,
StreamKind::StreamKind_ROW_INDEX,
"unexpected stream kind ",
streamId.kind);
dataLength += content.size();
addStream(streamId, content);
sink.addBuffers(content);
});
DWIO_ENSURE_GT(dataLength, 0);
metrics.stripeSize = dataLength;
if (handler.isEncrypted()) {
// fill encryption metadata
for (uint32_t i = 0; i < handler.getEncryptionGroupCount(); ++i) {
auto group = encodingManager.addEncryptionGroupToFooter();
writeProtoAsString(
*group,
encodingManager.getEncryptionGroup(i),
std::addressof(handler.getEncryptionProviderByIndex(i)));
}
}
// flush footer
uint64_t footerOffset = sink.size();
DWIO_ENSURE_EQ(footerOffset, stripeOffset + dataLength + indexLength);
sink.setMode(WriterSink::Mode::Footer);
writeProto(encodingManager.getFooter());
sink.setMode(WriterSink::Mode::None);
auto& stripe = addStripeInfo();
stripe.set_offset(stripeOffset);
stripe.set_indexlength(indexLength);
stripe.set_datalength(dataLength);
stripe.set_footerlength(sink.size() - footerOffset);
// set encryption key metadata
if (handler.isEncrypted() && context.stripeIndex == 0) {
for (uint32_t i = 0; i < handler.getEncryptionGroupCount(); ++i) {
*stripe.add_keymetadata() =
handler.getEncryptionProviderByIndex(i).getKey();
}
}
context.recordAverageRowSize();
context.recordCompressionRatio(dataLength);
auto totalMemoryUsage = getTotalMemoryUsage(context);
metrics.limit = totalMemoryUsage;
metrics.availableMemory = context.getMemoryBudget() - totalMemoryUsage;
auto& dictionaryDataMemoryUsage =
context.getMemoryUsage(MemoryUsageCategory::DICTIONARY);
metrics.dictionaryMemory = dictionaryDataMemoryUsage.getCurrentBytes();
// TODO: what does this try to capture?
metrics.maxDictSize = dictionaryDataMemoryUsage.getMaxBytes();
metrics.stripeIndex = context.stripeIndex;
metrics.rawStripeSize = context.stripeRawSize;
metrics.rowsInStripe = context.stripeRowCount;
metrics.compressionRatio = context.getCompressionRatio();
metrics.flushOverheadRatio = context.getFlushOverheadRatio();
metrics.averageRowSize = context.getAverageRowSize();
metrics.groupSize = 0;
metrics.close = close;
LOG(INFO) << fmt::format(
"Flush overhead = {}, data length = {}",
metrics.flushOverhead,
metrics.stripeSize);
// Add flush overhead and other ratio logging.
context.metricLogger->logStripeFlush(metrics);
// prepare for next stripe
context.nextStripe();
resetImpl();
}