in pcap-fsnotify/main.go [158:245]
func exportPcapFile(
ctx context.Context,
wg *sync.WaitGroup,
pcapDotExt *regexp.Regexp,
srcFile *string,
compress, delete, flush bool,
) bool {
defer wg.Done()
if flush && isActive.Load() {
return false
}
rMatch := pcapDotExt.FindStringSubmatch(*srcFile)
if len(rMatch) == 0 || len(rMatch) < 3 {
return false
}
iface := fmt.Sprintf("%s:%s", rMatch[1], rMatch[2])
ext := rMatch[3]
key := strings.Join(rMatch[1:], "/")
lastPcapFileName, loaded := lastPcap.Get(key)
// `flushing` is the only thread-safe PCAP export operation.
if flush {
logger.LogFsEvent(zapcore.InfoLevel,
fmt.Sprintf("flushing PCAP file: [%s] (%s/%s) %s", key, ext, iface, *srcFile), PCAP_EXPORT, *srcFile, "" /* target PCAP file */, 0, nil)
tgtPcapFileName, pcapBytes, moveErr := movePcapToGcs(ctx, srcFile, compress, delete)
if moveErr != nil {
logger.LogFsEvent(zapcore.ErrorLevel,
fmt.Sprintf("failed to flush PCAP file: (%s/%s) %s", ext, iface, *srcFile), PCAP_FSNERR, *srcFile, *tgtPcapFileName /* target PCAP file */, 0, moveErr)
return false
}
logger.LogFsEvent(zapcore.InfoLevel,
fmt.Sprintf("flushed PCAP file: (%s/%s) %s", ext, iface, *tgtPcapFileName), PCAP_EXPORT, *srcFile, *tgtPcapFileName, *pcapBytes, nil)
return true
}
counter, _ := counters.GetOrCompute(key,
func() *atomic.Uint64 {
return new(atomic.Uint64)
})
iteration := (*counter).Add(1)
logger.LogFsEvent(zapcore.InfoLevel,
fmt.Sprintf("new PCAP file detected: [%s] (%s/%s/%d) %s", key, ext, iface, iteration, *srcFile), PCAP_CREATE, *srcFile, "" /* target PCAP file */, 0, nil)
// Skip 1st PCAP, start moving PCAPs as soon as TCPDUMP rolls over into the 2nd file.
// The outcome of this implementation is that the directory in which TCPDUMP writes
// PCAP files will contain at most 2 files, the current one, and the one being moved
// into the destination directory ( `gcs_dir` ). Otherwise it will contain all PCAPs.
if iteration == 1 {
lastPcap.Set(key, *srcFile)
return false
}
if !loaded || lastPcapFileName == "" {
lastPcap.Set(key, *srcFile)
logger.LogFsEvent(zapcore.ErrorLevel, fmt.Sprintf("PCAP file [%s] (%s/%s/%d) unavailable", key, ext, iface, iteration), PCAP_EXPORT, "" /* source PCAP File */, *srcFile /* target PCAP file */, 0, nil)
return false
}
logger.LogFsEvent(zapcore.InfoLevel,
fmt.Sprintf("exporting PCAP file: (%s/%s/%d) %s", ext, iface, iteration, *srcFile), PCAP_EXPORT, lastPcapFileName, "" /* target PCAP file */, 0, nil)
// move non-current PCAP file into `gcs_dir` which means that:
// 1. the GCS Bucket should have already been mounted
// 2. the directory hierarchy to store PCAP files already exists
tgtPcapFileName, pcapBytes, moveErr := movePcapToGcs(ctx, &lastPcapFileName, compress, delete)
if moveErr == nil {
logger.LogFsEvent(zapcore.InfoLevel,
fmt.Sprintf("exported PCAP file: (%s/%s/%d) %s", ext, iface, iteration, *tgtPcapFileName), PCAP_EXPORT, lastPcapFileName, *tgtPcapFileName, *pcapBytes, nil)
} else {
logger.LogFsEvent(zapcore.ErrorLevel,
fmt.Sprintf("failed to export PCAP file: (%s/%s/%d) %s", ext, iface, iteration, lastPcapFileName), PCAP_EXPORT, lastPcapFileName, *tgtPcapFileName /* target PCAP file */, 0, moveErr)
}
// current PCAP file is the next one to be moved
if !lastPcap.CompareAndSwap(key, lastPcapFileName, *srcFile) {
logger.LogFsEvent(zapcore.ErrorLevel,
fmt.Sprintf("leaked PCAP file: [%s] (%s/%s/%d) %s", key, ext, iface, iteration, *srcFile), PCAP_FSNERR, *srcFile, "" /* target PCAP file */, 0, nil)
lastPcap.Set(key, *srcFile)
}
logger.LogFsEvent(zapcore.InfoLevel,
fmt.Sprintf("queued PCAP file: (%s/%s/%d) %s", ext, iface, iteration, *srcFile), PCAP_QUEUED, *srcFile, "" /* target PCAP file */, 0, nil)
return moveErr == nil
}