void PcapWriter::runMulti()

in katran/lib/PcapWriter.cpp [172:223]


void PcapWriter::runMulti(
    std::shared_ptr<folly::MPMCQueue<PcapMsgMeta>> queue) {
  auto snaplen = snaplen_ ?: kMaxSnapLen;
  PcapMsgMeta msg;
  for (;;) {
    VLOG(4) << __func__ << " blockingRead msg";
    queue->blockingRead(msg);
    Guard lock(cntrLock_);
    if (msg.isControl()) {
      if (msg.isShutdown()) {
        VLOG(4) << "Shutdown message was received. Stopping.";
        break;
      } else if (msg.isRestart()) {
        VLOG(4) << "Restart message was received. Restarting.";
        restartWriters(msg.getLimit());
      } else if (msg.isStop()) {
        VLOG(4) << "Stop message was received. Stopping.";
        stopWriters();
      }
      continue;
    }
    if (!packetLimitOverride_ && packetAmount_ >= packetLimit_) {
      VLOG(4)
          << "No packetLimitOverride and packetAmount is greater than packetLimit. Skipping";
      continue;
    }
    auto eventId = msg.getEventId();
    if (enabledEvents_.find(eventId) == enabledEvents_.end()) {
      LOG(INFO) << "event " << eventId << " is not enabled, skipping";
      continue;
    }
    if (!writePcapHeader(eventId)) {
      LOG(ERROR) << "DataWriter failed to write a header";
      continue;
    }
    msg.getPcapMsg().trim(snaplen);
    auto writerIt = dataWriters_.find(eventId);
    if (writerIt == dataWriters_.end()) {
      LOG(ERROR) << "No writer w/ specified Id: " << eventId;
      continue;
    }
    if (!writerIt->second->available(
            msg.getPcapMsg().getCapturedLen() + sizeof(pcaprec_hdr_s))) {
      VLOG(4) << "Writer buffer is full. Skipping";
      ++bufferFull_;
      continue;
    }
    VLOG(4) << __func__ << " write packet for event: " << eventId;
    writePacket(msg.getPcapMsg(), eventId);
    ++packetAmount_;
  }
}