katran/lib/PcapWriter.cpp (186 lines of code) (raw):

/* Copyright (C) 2018-present, Facebook, Inc. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; version 2 of the License. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License along * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include "katran/lib/PcapWriter.h" #include <chrono> #include "katran/lib/PcapStructs.h" using Guard = std::lock_guard<std::mutex>; namespace katran { namespace { constexpr uint32_t kPcapWriterMagic = 0xa1b2c3d4; constexpr uint16_t kVersionMajor = 2; constexpr uint16_t kVersionMinor = 4; constexpr int32_t kGmt = 0; constexpr uint32_t kAccuracy = 0; constexpr uint32_t kMaxSnapLen = 0xFFFF; // 65535 constexpr uint32_t kEthernet = 1; using EventId = monitoring::EventId; constexpr EventId kDefaultWriter = EventId::TCP_NONSYN_LRUMISS; } // namespace PcapWriter::PcapWriter( std::shared_ptr<DataWriter> dataWriter, uint32_t packetLimit, uint32_t snaplen) : packetLimit_(packetLimit), snaplen_(snaplen) { dataWriters_.insert({kDefaultWriter, dataWriter}); } PcapWriter::PcapWriter( std::unordered_map<EventId, std::shared_ptr<DataWriter>>& dataWriters, uint32_t packetLimit, uint32_t snaplen) : dataWriters_(dataWriters), packetLimit_(packetLimit), snaplen_(snaplen) {} void PcapWriter::writePacket(const PcapMsg& msg, EventId writerId) { auto unix_usec = std::chrono::duration_cast<std::chrono::microseconds>( std::chrono::high_resolution_clock::now().time_since_epoch()) .count(); // 1sec = 1mil usec const uint32_t now_sec = unix_usec / 1000000; // in pcap format ts_usec is a offset in msec after ts_sec. const uint32_t now_usec = unix_usec - now_sec * 1000000; pcaprec_hdr_s rec_hdr{ .ts_sec = now_sec, .ts_usec = now_usec, }; rec_hdr.incl_len = msg.getCapturedLen(); rec_hdr.orig_len = msg.getOrigLen(); auto writerIt = dataWriters_.find(writerId); if (writerIt == dataWriters_.end()) { LOG(ERROR) << "no writer w/ specified ID: " << writerId; return; } writerIt->second->writeData(&rec_hdr, sizeof(rec_hdr)); writerIt->second->writeData(msg.getRawBuffer(), msg.getCapturedLen()); } bool PcapWriter::writePcapHeader(EventId writerId) { if (headerExists_.find(writerId) != headerExists_.end()) { VLOG(4) << "header already exists"; return true; } auto writerIt = dataWriters_.find(writerId); if (writerIt == dataWriters_.end()) { LOG(ERROR) << "No writer w/ specified ID: " << writerId; return false; } if (!writerIt->second->available(sizeof(struct pcap_hdr_s))) { LOG(ERROR) << "DataWriter failed to write a header. Not enough space."; return false; } struct pcap_hdr_s hdr { .magic_number = kPcapWriterMagic, .version_major = kVersionMajor, .version_minor = kVersionMinor, .thiszone = kGmt, .sigfigs = kAccuracy, .snaplen = snaplen_ ?: kMaxSnapLen, .network = kEthernet }; writerIt->second->writeHeader(&hdr, sizeof(hdr)); headerExists_.insert(writerId); return true; } void PcapWriter::run(std::shared_ptr<folly::MPMCQueue<PcapMsg>> queue) { auto snaplen = snaplen_ ?: kMaxSnapLen; if (!writePcapHeader(kDefaultWriter)) { LOG(ERROR) << "DataWriter failed to write a header"; return; } PcapMsg msg(nullptr, 0, 0); while (packetLimit_ == 0 || packetAmount_ < packetLimit_) { queue->blockingRead(msg); Guard lock(cntrLock_); msg.trim(snaplen); if (msg.emptyMsg()) { LOG(INFO) << "Empty message was received. Writer thread is stopping."; break; } auto writerIt = dataWriters_.find(kDefaultWriter); if (writerIt == dataWriters_.end()) { LOG(ERROR) << "No writer w/ specified Id: " << kDefaultWriter; } if (!writerIt->second->available( msg.getCapturedLen() + sizeof(pcaprec_hdr_s))) { ++bufferFull_; break; } writePacket(msg, kDefaultWriter); ++packetAmount_; } } PcapWriterStats PcapWriter::getStats() { PcapWriterStats stats; Guard lock(cntrLock_); stats.limit = packetLimit_; stats.amount = packetAmount_; stats.bufferFull = bufferFull_; return stats; } void PcapWriter::restartWriters(uint32_t packetLimit) { // as we are going to overrite all data writers. we would need to rewrite // headers headerExists_.clear(); for (auto& eventAndWriter : dataWriters_) { eventAndWriter.second->restart(); } packetLimit_ = packetLimit; packetAmount_ = 0; } void PcapWriter::stopWriters() { for (auto& eventAndWriter : dataWriters_) { eventAndWriter.second->stop(); } packetLimit_ = 0; packetAmount_ = packetLimit_; } void PcapWriter::resetWriters( std::unordered_map<monitoring::EventId, std::shared_ptr<DataWriter>>&& newDataWriters) { // Writers should have been stopped Guard lock(cntrLock_); // Gracefully stop for (auto& eventAndWriter : dataWriters_) { eventAndWriter.second->stop(); } dataWriters_.clear(); dataWriters_ = std::move(newDataWriters); } void PcapWriter::runMulti( std::shared_ptr<folly::MPMCQueue<PcapMsgMeta>> queue) { auto snaplen = snaplen_ ?: kMaxSnapLen; PcapMsgMeta msg; for (;;) { VLOG(4) << __func__ << " blockingRead msg"; queue->blockingRead(msg); Guard lock(cntrLock_); if (msg.isControl()) { if (msg.isShutdown()) { VLOG(4) << "Shutdown message was received. Stopping."; break; } else if (msg.isRestart()) { VLOG(4) << "Restart message was received. Restarting."; restartWriters(msg.getLimit()); } else if (msg.isStop()) { VLOG(4) << "Stop message was received. Stopping."; stopWriters(); } continue; } if (!packetLimitOverride_ && packetAmount_ >= packetLimit_) { VLOG(4) << "No packetLimitOverride and packetAmount is greater than packetLimit. Skipping"; continue; } auto eventId = msg.getEventId(); if (enabledEvents_.find(eventId) == enabledEvents_.end()) { LOG(INFO) << "event " << eventId << " is not enabled, skipping"; continue; } if (!writePcapHeader(eventId)) { LOG(ERROR) << "DataWriter failed to write a header"; continue; } msg.getPcapMsg().trim(snaplen); auto writerIt = dataWriters_.find(eventId); if (writerIt == dataWriters_.end()) { LOG(ERROR) << "No writer w/ specified Id: " << eventId; continue; } if (!writerIt->second->available( msg.getPcapMsg().getCapturedLen() + sizeof(pcaprec_hdr_s))) { VLOG(4) << "Writer buffer is full. Skipping"; ++bufferFull_; continue; } VLOG(4) << __func__ << " write packet for event: " << eventId; writePacket(msg.getPcapMsg(), eventId); ++packetAmount_; } } } // namespace katran