pcap-fsnotify/main.go (389 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package main import ( "context" "flag" "fmt" "io" "io/fs" "os" "os/exec" "os/signal" "path/filepath" "regexp" "strconv" "strings" "sync" "sync/atomic" "syscall" "time" "github.com/GoogleCloudPlatform/pcap-sidecar/pcap-fsnotify/internal/constants" "github.com/GoogleCloudPlatform/pcap-sidecar/pcap-fsnotify/internal/gcs" "github.com/GoogleCloudPlatform/pcap-sidecar/pcap-fsnotify/internal/log" "github.com/alphadose/haxmap" "github.com/fsnotify/fsnotify" "github.com/gofrs/flock" "go.uber.org/zap/zapcore" ) type ( pcapEvent = constants.PcapEvent ) const ( PCAP_FSNINI = constants.PCAP_FSNINI PCAP_FSNEND = constants.PCAP_FSNEND PCAP_FSNERR = constants.PCAP_FSNERR PCAP_CREATE = constants.PCAP_CREATE PCAP_EXPORT = constants.PCAP_EXPORT PCAP_QUEUED = constants.PCAP_QUEUED PCAP_OSWMEM = constants.PCAP_OSWMEM PCAP_SIGNAL = constants.PCAP_SIGNAL PCAP_FSLOCK = constants.PCAP_FSLOCK ) const ( cgroupMemoryUtilization = "/sys/fs/cgroup/memory/memory.usage_in_bytes" dockerCgroupMemoryUtilization = "/sys/fs/cgroup/memory.current" procSysVmDropCaches = "/proc/sys/vm/drop_caches" pcapLockFile = "/var/lock/pcap.lock" ) var ( src_dir = flag.String("src_dir", "/pcap-tmp", "pcaps source directory") gcs_dir = flag.String("gcs_dir", "/pcap", "pcaps destination directory") pcap_ext = flag.String("pcap_ext", "pcap", "pcap files extension") gzip_pcaps = flag.Bool("gzip", false, "compress pcap files") gcp_env = flag.String("env", "run", "literal ID of the execution environment; any of: run, gae, gke") gcp_run = flag.Bool("run", true, "Cloud Run execution environment") gcp_gae = flag.Bool("gae", false, "App Engine execution environment") gcp_gke = flag.Bool("gke", false, "Kubernetes Engine execution environment") interval = flag.Uint("interval", 60, "seconds after which tcpdump rotates PCAP files") retries_max = flag.Uint("retries_max", 5, "times a failed copy-to-GCS operation should be retried") retries_delay = flag.Uint("retries_delay", 2, "seconds between retries for copy-to-GCS operations") compat = flag.Bool("compat", false, "apply filters in Cloud Run gen1 mode") rt_env = flag.String("rt_env", "cloud_run_gen2", "runtime where PCAP sidecar is used") pcap_debug = flag.Bool("debug", false, "enable debug logs") gcs_export = flag.Bool("gcs_export", true, "export PCAP files to GCS") gcs_fuse = flag.Bool("gcs_fuse", true, "export PCAP files using GCS Fuse") gcs_bucket = flag.String("gcs_bucket", "", "export PCAP files to this GCS bucket") instance_id = flag.String("instance_id", "", "compute resource hosting the PCAP sidecar") ) var ( projectID string = os.Getenv("PROJECT_ID") gcpRegion string = os.Getenv("GCP_REGION") service string = os.Getenv("APP_SERVICE") version string = os.Getenv("APP_VERSION") sidecar string = os.Getenv("APP_SIDECAR") instanceID string = os.Getenv("INSTANCE_ID") module string = os.Getenv("PROC_NAME") gcpGAE string = os.Getenv("PCAP_GAE") ) var ( logger = log.NewLogger(projectID, service, gcpRegion, version, instanceID, sidecar, module) exporter = gcs.NewNilExporter(logger) counters *haxmap.Map[string, *atomic.Uint64] lastPcap *haxmap.Map[string, string] ) var isActive atomic.Bool func movePcapToGcs( ctx context.Context, srcPcap *string, compress, delete bool, ) (*string, *int64, error) { return exporter.Export(ctx, srcPcap, compress, delete) } func getCurrentMemoryUtilization(isGAE bool) (uint64, error) { var err error var memoryUtilizationFilePath string if isGAE { memoryUtilizationFilePath = dockerCgroupMemoryUtilization } else { memoryUtilizationFilePath = cgroupMemoryUtilization } memoryUtilizationFile, err := os.OpenFile(memoryUtilizationFilePath, os.O_RDONLY, 0o444 /* -r--r--r-- */) if err != nil { return 0, err } var memoryUtilization int _, err = fmt.Fscanf(memoryUtilizationFile, "%d\n", &memoryUtilization) if err != nil { if err == io.EOF { return uint64(memoryUtilization), nil } return 0, err } return uint64(memoryUtilization), nil } func flushBuffers() (int, error) { cmd := exec.Command("sync") cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr cmd.Run() // see: https://www.kernel.org/doc/Documentation/sysctl/vm.txt fd, err := os.OpenFile(procSysVmDropCaches, os.O_WRONLY|os.O_TRUNC|os.O_EXCL, 0o200 /* --w------- */) if err != nil { return 0, err } defer fd.Close() return fmt.Fprintln(fd, "3") } func exportPcapFile( ctx context.Context, wg *sync.WaitGroup, pcapDotExt *regexp.Regexp, srcFile *string, compress, delete, flush bool, ) bool { defer wg.Done() if flush && isActive.Load() { return false } rMatch := pcapDotExt.FindStringSubmatch(*srcFile) if len(rMatch) == 0 || len(rMatch) < 3 { return false } iface := fmt.Sprintf("%s:%s", rMatch[1], rMatch[2]) ext := rMatch[3] key := strings.Join(rMatch[1:], "/") lastPcapFileName, loaded := lastPcap.Get(key) // `flushing` is the only thread-safe PCAP export operation. if flush { logger.LogFsEvent(zapcore.InfoLevel, fmt.Sprintf("flushing PCAP file: [%s] (%s/%s) %s", key, ext, iface, *srcFile), PCAP_EXPORT, *srcFile, "" /* target PCAP file */, 0, nil) tgtPcapFileName, pcapBytes, moveErr := movePcapToGcs(ctx, srcFile, compress, delete) if moveErr != nil { logger.LogFsEvent(zapcore.ErrorLevel, fmt.Sprintf("failed to flush PCAP file: (%s/%s) %s", ext, iface, *srcFile), PCAP_FSNERR, *srcFile, *tgtPcapFileName /* target PCAP file */, 0, moveErr) return false } logger.LogFsEvent(zapcore.InfoLevel, fmt.Sprintf("flushed PCAP file: (%s/%s) %s", ext, iface, *tgtPcapFileName), PCAP_EXPORT, *srcFile, *tgtPcapFileName, *pcapBytes, nil) return true } counter, _ := counters.GetOrCompute(key, func() *atomic.Uint64 { return new(atomic.Uint64) }) iteration := (*counter).Add(1) logger.LogFsEvent(zapcore.InfoLevel, fmt.Sprintf("new PCAP file detected: [%s] (%s/%s/%d) %s", key, ext, iface, iteration, *srcFile), PCAP_CREATE, *srcFile, "" /* target PCAP file */, 0, nil) // Skip 1st PCAP, start moving PCAPs as soon as TCPDUMP rolls over into the 2nd file. // The outcome of this implementation is that the directory in which TCPDUMP writes // PCAP files will contain at most 2 files, the current one, and the one being moved // into the destination directory ( `gcs_dir` ). Otherwise it will contain all PCAPs. if iteration == 1 { lastPcap.Set(key, *srcFile) return false } if !loaded || lastPcapFileName == "" { lastPcap.Set(key, *srcFile) logger.LogFsEvent(zapcore.ErrorLevel, fmt.Sprintf("PCAP file [%s] (%s/%s/%d) unavailable", key, ext, iface, iteration), PCAP_EXPORT, "" /* source PCAP File */, *srcFile /* target PCAP file */, 0, nil) return false } logger.LogFsEvent(zapcore.InfoLevel, fmt.Sprintf("exporting PCAP file: (%s/%s/%d) %s", ext, iface, iteration, *srcFile), PCAP_EXPORT, lastPcapFileName, "" /* target PCAP file */, 0, nil) // move non-current PCAP file into `gcs_dir` which means that: // 1. the GCS Bucket should have already been mounted // 2. the directory hierarchy to store PCAP files already exists tgtPcapFileName, pcapBytes, moveErr := movePcapToGcs(ctx, &lastPcapFileName, compress, delete) if moveErr == nil { logger.LogFsEvent(zapcore.InfoLevel, fmt.Sprintf("exported PCAP file: (%s/%s/%d) %s", ext, iface, iteration, *tgtPcapFileName), PCAP_EXPORT, lastPcapFileName, *tgtPcapFileName, *pcapBytes, nil) } else { logger.LogFsEvent(zapcore.ErrorLevel, fmt.Sprintf("failed to export PCAP file: (%s/%s/%d) %s", ext, iface, iteration, lastPcapFileName), PCAP_EXPORT, lastPcapFileName, *tgtPcapFileName /* target PCAP file */, 0, moveErr) } // current PCAP file is the next one to be moved if !lastPcap.CompareAndSwap(key, lastPcapFileName, *srcFile) { logger.LogFsEvent(zapcore.ErrorLevel, fmt.Sprintf("leaked PCAP file: [%s] (%s/%s/%d) %s", key, ext, iface, iteration, *srcFile), PCAP_FSNERR, *srcFile, "" /* target PCAP file */, 0, nil) lastPcap.Set(key, *srcFile) } logger.LogFsEvent(zapcore.InfoLevel, fmt.Sprintf("queued PCAP file: (%s/%s/%d) %s", ext, iface, iteration, *srcFile), PCAP_QUEUED, *srcFile, "" /* target PCAP file */, 0, nil) return moveErr == nil } func flushSrcDir( ctx context.Context, wg *sync.WaitGroup, pcapDotExt *regexp.Regexp, sync, compress, delete bool, validator func(fs.FileInfo) bool, ) uint32 { pendingPcapFiles := uint32(0) if sync { flushBuffers() } filepath.Walk(*src_dir, func(path string, info fs.FileInfo, err error) error { if info.IsDir() { return nil } if err != nil { logger.LogEvent(zapcore.ErrorLevel, "failed to flush PCAP files", PCAP_FSNERR, nil, err) return nil } if validator(info) { pendingPcapFiles += 1 wg.Add(1) go exportPcapFile(ctx, wg, pcapDotExt, &path, compress, delete, true /* flush */) } return nil }) return pendingPcapFiles } func main() { isActive.Store(false) flag.Parse() defer logger.Sync() counters = haxmap.New[string, *atomic.Uint64]() lastPcap = haxmap.New[string, string]() isGAE, isGAEerr := strconv.ParseBool(gcpGAE) isGAE = (isGAEerr == nil && isGAE) || *gcp_gae ext := strings.Join(strings.Split(*pcap_ext, ","), "|") pcapDotExt := regexp.MustCompile(`^` + *src_dir + `/part__(\d+?)_(.+?)__\d{8}T\d{6}\.(` + ext + `)$`) tcpdumpwExitSignal := regexp.MustCompile(`^` + *src_dir + `/TCPDUMPW_EXITED$`) // must match the value of `PCAP_ROTATE_SECS` watchdogInterval := time.Duration(*interval) * time.Second args := map[string]any{ "src_dir": *src_dir, "gcs_dir": *gcs_dir, "gcs_export": *gcs_export, "gcs_fuse": *gcs_fuse, "gcs_bucket": *gcs_bucket, "pcap_ext": pcapDotExt.String(), "interval": watchdogInterval.String(), "gzip": *gzip_pcaps, "rt_env": *rt_env, "pcap_debug": *pcap_debug, } logger.LogEvent(zapcore.InfoLevel, "starting PCAP filesystem watcher", PCAP_FSNINI, args, nil) sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP, syscall.SIGQUIT) // Create new watcher. watcher, err := fsnotify.NewBufferedWatcher(100) if err != nil { logger.LogEvent(zapcore.FatalLevel, fmt.Sprintf("failed to create FS watcher: %v", err), PCAP_FSNINI, nil, nil) os.Exit(1) } defer watcher.Close() ctx, cancel := context.WithCancel(context.Background()) if *gcs_export { // if GCS export is disabled, the PCAP files `exporter` is already initialized using `NewNilExporter` if *gcs_fuse { exporter = gcs.NewFuseExporter(logger, *gcs_dir, *retries_max, *retries_delay) } else { exporter = gcs.NewClientLibraryExporter(ctx, logger, projectID, service, instanceID, *gcs_bucket, *gcs_dir, *retries_max, *retries_delay) } } var wg sync.WaitGroup // Watch the PCAP files source directory for FS events. if isActive.CompareAndSwap(false, true) { if err = watcher.Add(*src_dir); err != nil { logger.LogEvent(zapcore.ErrorLevel, fmt.Sprintf("failed to watch directory '%s': %v", *src_dir, err), PCAP_FSNERR, nil, err) isActive.Store(false) } } ticker := time.NewTicker(watchdogInterval) // Start listening for FS events at PCAP files source directory. go func(wg *sync.WaitGroup, watcher *fsnotify.Watcher, ticker *time.Ticker) { for isActive.Load() { select { case event, ok := <-watcher.Events: if !ok { // Channel was closed (i.e. Watcher.Close() was called) return } // Skip events which are not CREATE, and all which are not related to PCAP files if event.Has(fsnotify.Create) && pcapDotExt.MatchString(event.Name) { wg.Add(1) exportPcapFile(ctx, wg, pcapDotExt, &event.Name, *gzip_pcaps /* compress */, true /* delete */, false /* flush */) } else if event.Has(fsnotify.Create) && tcpdumpwExitSignal.MatchString(event.Name) && isActive.CompareAndSwap(true, false) { // `tcpdumpw` signals its termination by creating the file `TCPDUMPW_EXITED` is the source directory tcpdumpwExitTS := time.Now() logger.LogEvent(zapcore.InfoLevel, "detected 'tcpdumpw' termination signal", PCAP_SIGNAL, map[string]interface{}{ "event": PCAP_SIGNAL, "signal": event.Name, "timestamp": tcpdumpwExitTS.Format(time.RFC3339Nano), }, nil) // delete `tcpdumpw` termination signal os.Remove(event.Name) // when `tcpdumpw` signal is detected: // - cancel the context which triggers final PCAP files flushing cancel() return } case fsnErr, ok := <-watcher.Errors: if !ok { // Channel was closed (i.e. Watcher.Close() was called). ticker.Stop() return } logger.LogEvent(zapcore.ErrorLevel, "FS watcher failed", PCAP_FSNERR, map[string]interface{}{"closed": ok}, fsnErr) } } }(&wg, watcher, ticker) go func(watcher *fsnotify.Watcher, ticker *time.Ticker) { for isActive.Load() { select { case <-ctx.Done(): return case <-ticker.C: // packet capturing is write intensive // OS buffers memory must be fluhsed often to prevent memory saturation // flushing OS file write buffers is safe: 'non-destructive operation and will not free any dirty objects' // additionally, PCAP files are [write|append]-only memoryBefore, _ := getCurrentMemoryUtilization(isGAE) _, memFlushErr := flushBuffers() memoryAfter, _ := getCurrentMemoryUtilization(isGAE) if memFlushErr != nil { continue } releasedMemory := int64(memoryBefore) - int64(memoryAfter) logger.LogEvent(zapcore.InfoLevel, fmt.Sprintf("flushed OS file write buffers: memory[before=%d|after=%d] / released=%d", memoryBefore, memoryAfter, releasedMemory), PCAP_OSWMEM, map[string]interface{}{"before": memoryBefore, "after": memoryAfter, "released": releasedMemory}, nil) } } }(watcher, ticker) go func(watcher *fsnotify.Watcher, ticker *time.Ticker) { signal := <-sigChan signalTS := time.Now() deadline := 3 * time.Second logger.LogEvent(zapcore.InfoLevel, fmt.Sprintf("signaled: %v", signal), PCAP_SIGNAL, map[string]interface{}{ "signal": signal, "timestamp": signalTS.Format(time.RFC3339Nano), }, nil) timer := time.AfterFunc(deadline-time.Since(signalTS), func() { if isActive.CompareAndSwap(true, false) { // cancel the context after 3s regardless of `tcpdumpw` termination signal: // - this is effectively the `max_wait_time` for `tcpdumpw` termination signal. cancel() } }) pcapMutex := flock.New(pcapLockFile) lockData := map[string]interface{}{"lock": pcapLockFile} logger.LogEvent(zapcore.InfoLevel, "waiting for PCAP lock file", PCAP_FSLOCK, lockData, nil) lockCtx, lockCancel := context.WithTimeout(ctx, deadline-time.Since(signalTS)) defer lockCancel() // `tcpdumpq` will unlock the PCAP lock file when all PCAP engines have stopped if locked, lockErr := pcapMutex.TryLockContext(lockCtx, 10*time.Millisecond); !locked || lockErr != nil { lockData["latency"] = time.Since(signalTS).String() logger.LogEvent(zapcore.ErrorLevel, "failed to acquire PCAP lock file", PCAP_FSLOCK, lockData, lockErr) } else if isActive.CompareAndSwap(true, false) { timer.Stop() lockData["latency"] = time.Since(signalTS).String() cancel() logger.LogEvent(zapcore.InfoLevel, "acquired PCAP lock file", PCAP_FSLOCK, lockData, nil) } }(watcher, ticker) if err == nil { logger.LogEvent(zapcore.InfoLevel, fmt.Sprintf("watching directory: %s", *src_dir), PCAP_FSNINI, nil, nil) } else if isActive.CompareAndSwap(true, false) { logger.LogEvent(zapcore.InfoLevel, fmt.Sprintf("error at initialization: %v", err), PCAP_FSNINI, nil, err) watcher.Close() ticker.Stop() cancel() } <-ctx.Done() // wait for context to be cancelled ticker.Stop() watcher.Remove(*src_dir) watcher.Close() // wait for all regular export operations to terminate wg.Wait() ctx = context.Background() ctx, cancel = context.WithTimeout(ctx, 5*time.Second) flushStart := time.Now() // flush remaining PCAP files after context is done // compression & deletion are disabled when exiting in order to speed up the process pendingPcapFiles := flushSrcDir(ctx, &wg, pcapDotExt, true /* sync */, false /* compress */, false, /* delete */ func(_ fs.FileInfo) bool { return true }, ) logger.LogEvent(zapcore.InfoLevel, fmt.Sprintf("waiting for %d PCAP files to be flushed", pendingPcapFiles), PCAP_FSNEND, map[string]interface{}{ "files": pendingPcapFiles, "timestamp": flushStart.Format(time.RFC3339Nano), }, nil) wg.Wait() // wait for remaining PCAP failes to be flushed flushLatency := time.Since(flushStart) logger.LogEvent(zapcore.InfoLevel, fmt.Sprintf("flushed %d PCAP files", pendingPcapFiles), PCAP_FSNEND, map[string]interface{}{ "files": pendingPcapFiles, "latency": flushLatency.String(), }, nil) }