in pcap-fsnotify/main.go [276:501]
func main() {
isActive.Store(false)
flag.Parse()
defer logger.Sync()
counters = haxmap.New[string, *atomic.Uint64]()
lastPcap = haxmap.New[string, string]()
isGAE, isGAEerr := strconv.ParseBool(gcpGAE)
isGAE = (isGAEerr == nil && isGAE) || *gcp_gae
ext := strings.Join(strings.Split(*pcap_ext, ","), "|")
pcapDotExt := regexp.MustCompile(`^` + *src_dir + `/part__(\d+?)_(.+?)__\d{8}T\d{6}\.(` + ext + `)$`)
tcpdumpwExitSignal := regexp.MustCompile(`^` + *src_dir + `/TCPDUMPW_EXITED$`)
// must match the value of `PCAP_ROTATE_SECS`
watchdogInterval := time.Duration(*interval) * time.Second
args := map[string]any{
"src_dir": *src_dir,
"gcs_dir": *gcs_dir,
"gcs_export": *gcs_export,
"gcs_fuse": *gcs_fuse,
"gcs_bucket": *gcs_bucket,
"pcap_ext": pcapDotExt.String(),
"interval": watchdogInterval.String(),
"gzip": *gzip_pcaps,
"rt_env": *rt_env,
"pcap_debug": *pcap_debug,
}
logger.LogEvent(zapcore.InfoLevel, "starting PCAP filesystem watcher", PCAP_FSNINI, args, nil)
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP, syscall.SIGQUIT)
// Create new watcher.
watcher, err := fsnotify.NewBufferedWatcher(100)
if err != nil {
logger.LogEvent(zapcore.FatalLevel, fmt.Sprintf("failed to create FS watcher: %v", err), PCAP_FSNINI, nil, nil)
os.Exit(1)
}
defer watcher.Close()
ctx, cancel := context.WithCancel(context.Background())
if *gcs_export {
// if GCS export is disabled, the PCAP files `exporter` is already initialized using `NewNilExporter`
if *gcs_fuse {
exporter = gcs.NewFuseExporter(logger, *gcs_dir, *retries_max, *retries_delay)
} else {
exporter = gcs.NewClientLibraryExporter(ctx, logger, projectID, service, instanceID, *gcs_bucket, *gcs_dir, *retries_max, *retries_delay)
}
}
var wg sync.WaitGroup
// Watch the PCAP files source directory for FS events.
if isActive.CompareAndSwap(false, true) {
if err = watcher.Add(*src_dir); err != nil {
logger.LogEvent(zapcore.ErrorLevel, fmt.Sprintf("failed to watch directory '%s': %v", *src_dir, err), PCAP_FSNERR, nil, err)
isActive.Store(false)
}
}
ticker := time.NewTicker(watchdogInterval)
// Start listening for FS events at PCAP files source directory.
go func(wg *sync.WaitGroup, watcher *fsnotify.Watcher, ticker *time.Ticker) {
for isActive.Load() {
select {
case event, ok := <-watcher.Events:
if !ok { // Channel was closed (i.e. Watcher.Close() was called)
return
}
// Skip events which are not CREATE, and all which are not related to PCAP files
if event.Has(fsnotify.Create) && pcapDotExt.MatchString(event.Name) {
wg.Add(1)
exportPcapFile(ctx, wg, pcapDotExt, &event.Name, *gzip_pcaps /* compress */, true /* delete */, false /* flush */)
} else if event.Has(fsnotify.Create) && tcpdumpwExitSignal.MatchString(event.Name) && isActive.CompareAndSwap(true, false) {
// `tcpdumpw` signals its termination by creating the file `TCPDUMPW_EXITED` is the source directory
tcpdumpwExitTS := time.Now()
logger.LogEvent(zapcore.InfoLevel,
"detected 'tcpdumpw' termination signal",
PCAP_SIGNAL,
map[string]interface{}{
"event": PCAP_SIGNAL,
"signal": event.Name,
"timestamp": tcpdumpwExitTS.Format(time.RFC3339Nano),
}, nil)
// delete `tcpdumpw` termination signal
os.Remove(event.Name)
// when `tcpdumpw` signal is detected:
// - cancel the context which triggers final PCAP files flushing
cancel()
return
}
case fsnErr, ok := <-watcher.Errors:
if !ok { // Channel was closed (i.e. Watcher.Close() was called).
ticker.Stop()
return
}
logger.LogEvent(zapcore.ErrorLevel, "FS watcher failed", PCAP_FSNERR, map[string]interface{}{"closed": ok}, fsnErr)
}
}
}(&wg, watcher, ticker)
go func(watcher *fsnotify.Watcher, ticker *time.Ticker) {
for isActive.Load() {
select {
case <-ctx.Done():
return
case <-ticker.C:
// packet capturing is write intensive
// OS buffers memory must be fluhsed often to prevent memory saturation
// flushing OS file write buffers is safe: 'non-destructive operation and will not free any dirty objects'
// additionally, PCAP files are [write|append]-only
memoryBefore, _ := getCurrentMemoryUtilization(isGAE)
_, memFlushErr := flushBuffers()
memoryAfter, _ := getCurrentMemoryUtilization(isGAE)
if memFlushErr != nil {
continue
}
releasedMemory := int64(memoryBefore) - int64(memoryAfter)
logger.LogEvent(zapcore.InfoLevel,
fmt.Sprintf("flushed OS file write buffers: memory[before=%d|after=%d] / released=%d", memoryBefore, memoryAfter, releasedMemory),
PCAP_OSWMEM, map[string]interface{}{"before": memoryBefore, "after": memoryAfter, "released": releasedMemory}, nil)
}
}
}(watcher, ticker)
go func(watcher *fsnotify.Watcher, ticker *time.Ticker) {
signal := <-sigChan
signalTS := time.Now()
deadline := 3 * time.Second
logger.LogEvent(zapcore.InfoLevel,
fmt.Sprintf("signaled: %v", signal),
PCAP_SIGNAL,
map[string]interface{}{
"signal": signal,
"timestamp": signalTS.Format(time.RFC3339Nano),
}, nil)
timer := time.AfterFunc(deadline-time.Since(signalTS), func() {
if isActive.CompareAndSwap(true, false) {
// cancel the context after 3s regardless of `tcpdumpw` termination signal:
// - this is effectively the `max_wait_time` for `tcpdumpw` termination signal.
cancel()
}
})
pcapMutex := flock.New(pcapLockFile)
lockData := map[string]interface{}{"lock": pcapLockFile}
logger.LogEvent(zapcore.InfoLevel, "waiting for PCAP lock file", PCAP_FSLOCK, lockData, nil)
lockCtx, lockCancel := context.WithTimeout(ctx, deadline-time.Since(signalTS))
defer lockCancel()
// `tcpdumpq` will unlock the PCAP lock file when all PCAP engines have stopped
if locked, lockErr := pcapMutex.TryLockContext(lockCtx, 10*time.Millisecond); !locked || lockErr != nil {
lockData["latency"] = time.Since(signalTS).String()
logger.LogEvent(zapcore.ErrorLevel, "failed to acquire PCAP lock file", PCAP_FSLOCK, lockData, lockErr)
} else if isActive.CompareAndSwap(true, false) {
timer.Stop()
lockData["latency"] = time.Since(signalTS).String()
cancel()
logger.LogEvent(zapcore.InfoLevel, "acquired PCAP lock file", PCAP_FSLOCK, lockData, nil)
}
}(watcher, ticker)
if err == nil {
logger.LogEvent(zapcore.InfoLevel, fmt.Sprintf("watching directory: %s", *src_dir), PCAP_FSNINI, nil, nil)
} else if isActive.CompareAndSwap(true, false) {
logger.LogEvent(zapcore.InfoLevel, fmt.Sprintf("error at initialization: %v", err), PCAP_FSNINI, nil, err)
watcher.Close()
ticker.Stop()
cancel()
}
<-ctx.Done() // wait for context to be cancelled
ticker.Stop()
watcher.Remove(*src_dir)
watcher.Close()
// wait for all regular export operations to terminate
wg.Wait()
ctx = context.Background()
ctx, cancel = context.WithTimeout(ctx, 5*time.Second)
flushStart := time.Now()
// flush remaining PCAP files after context is done
// compression & deletion are disabled when exiting in order to speed up the process
pendingPcapFiles := flushSrcDir(ctx, &wg, pcapDotExt,
true /* sync */, false /* compress */, false, /* delete */
func(_ fs.FileInfo) bool { return true },
)
logger.LogEvent(zapcore.InfoLevel,
fmt.Sprintf("waiting for %d PCAP files to be flushed", pendingPcapFiles),
PCAP_FSNEND,
map[string]interface{}{
"files": pendingPcapFiles,
"timestamp": flushStart.Format(time.RFC3339Nano),
}, nil)
wg.Wait() // wait for remaining PCAP failes to be flushed
flushLatency := time.Since(flushStart)
logger.LogEvent(zapcore.InfoLevel,
fmt.Sprintf("flushed %d PCAP files", pendingPcapFiles),
PCAP_FSNEND,
map[string]interface{}{
"files": pendingPcapFiles,
"latency": flushLatency.String(),
}, nil)
}