in katran/lib/PcapWriter.cpp [172:223]
void PcapWriter::runMulti(
std::shared_ptr<folly::MPMCQueue<PcapMsgMeta>> queue) {
auto snaplen = snaplen_ ?: kMaxSnapLen;
PcapMsgMeta msg;
for (;;) {
VLOG(4) << __func__ << " blockingRead msg";
queue->blockingRead(msg);
Guard lock(cntrLock_);
if (msg.isControl()) {
if (msg.isShutdown()) {
VLOG(4) << "Shutdown message was received. Stopping.";
break;
} else if (msg.isRestart()) {
VLOG(4) << "Restart message was received. Restarting.";
restartWriters(msg.getLimit());
} else if (msg.isStop()) {
VLOG(4) << "Stop message was received. Stopping.";
stopWriters();
}
continue;
}
if (!packetLimitOverride_ && packetAmount_ >= packetLimit_) {
VLOG(4)
<< "No packetLimitOverride and packetAmount is greater than packetLimit. Skipping";
continue;
}
auto eventId = msg.getEventId();
if (enabledEvents_.find(eventId) == enabledEvents_.end()) {
LOG(INFO) << "event " << eventId << " is not enabled, skipping";
continue;
}
if (!writePcapHeader(eventId)) {
LOG(ERROR) << "DataWriter failed to write a header";
continue;
}
msg.getPcapMsg().trim(snaplen);
auto writerIt = dataWriters_.find(eventId);
if (writerIt == dataWriters_.end()) {
LOG(ERROR) << "No writer w/ specified Id: " << eventId;
continue;
}
if (!writerIt->second->available(
msg.getPcapMsg().getCapturedLen() + sizeof(pcaprec_hdr_s))) {
VLOG(4) << "Writer buffer is full. Skipping";
++bufferFull_;
continue;
}
VLOG(4) << __func__ << " write packet for event: " << eventId;
writePacket(msg.getPcapMsg(), eventId);
++packetAmount_;
}
}