func main()

in pcap-fsnotify/main.go [276:501]


func main() {
	isActive.Store(false)

	flag.Parse()

	defer logger.Sync()

	counters = haxmap.New[string, *atomic.Uint64]()
	lastPcap = haxmap.New[string, string]()

	isGAE, isGAEerr := strconv.ParseBool(gcpGAE)
	isGAE = (isGAEerr == nil && isGAE) || *gcp_gae

	ext := strings.Join(strings.Split(*pcap_ext, ","), "|")
	pcapDotExt := regexp.MustCompile(`^` + *src_dir + `/part__(\d+?)_(.+?)__\d{8}T\d{6}\.(` + ext + `)$`)
	tcpdumpwExitSignal := regexp.MustCompile(`^` + *src_dir + `/TCPDUMPW_EXITED$`)

	// must match the value of `PCAP_ROTATE_SECS`
	watchdogInterval := time.Duration(*interval) * time.Second

	args := map[string]any{
		"src_dir":    *src_dir,
		"gcs_dir":    *gcs_dir,
		"gcs_export": *gcs_export,
		"gcs_fuse":   *gcs_fuse,
		"gcs_bucket": *gcs_bucket,
		"pcap_ext":   pcapDotExt.String(),
		"interval":   watchdogInterval.String(),
		"gzip":       *gzip_pcaps,
		"rt_env":     *rt_env,
		"pcap_debug": *pcap_debug,
	}

	logger.LogEvent(zapcore.InfoLevel, "starting PCAP filesystem watcher", PCAP_FSNINI, args, nil)

	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP, syscall.SIGQUIT)

	// Create new watcher.
	watcher, err := fsnotify.NewBufferedWatcher(100)
	if err != nil {
		logger.LogEvent(zapcore.FatalLevel, fmt.Sprintf("failed to create FS watcher: %v", err), PCAP_FSNINI, nil, nil)
		os.Exit(1)
	}
	defer watcher.Close()

	ctx, cancel := context.WithCancel(context.Background())

	if *gcs_export {
		// if GCS export is disabled, the PCAP files `exporter` is already initialized using `NewNilExporter`
		if *gcs_fuse {
			exporter = gcs.NewFuseExporter(logger, *gcs_dir, *retries_max, *retries_delay)
		} else {
			exporter = gcs.NewClientLibraryExporter(ctx, logger, projectID, service, instanceID, *gcs_bucket, *gcs_dir, *retries_max, *retries_delay)
		}
	}

	var wg sync.WaitGroup

	// Watch the PCAP files source directory for FS events.
	if isActive.CompareAndSwap(false, true) {
		if err = watcher.Add(*src_dir); err != nil {
			logger.LogEvent(zapcore.ErrorLevel, fmt.Sprintf("failed to watch directory '%s': %v", *src_dir, err), PCAP_FSNERR, nil, err)
			isActive.Store(false)
		}
	}

	ticker := time.NewTicker(watchdogInterval)

	// Start listening for FS events at PCAP files source directory.
	go func(wg *sync.WaitGroup, watcher *fsnotify.Watcher, ticker *time.Ticker) {
		for isActive.Load() {
			select {

			case event, ok := <-watcher.Events:
				if !ok { // Channel was closed (i.e. Watcher.Close() was called)
					return
				}
				// Skip events which are not CREATE, and all which are not related to PCAP files
				if event.Has(fsnotify.Create) && pcapDotExt.MatchString(event.Name) {
					wg.Add(1)
					exportPcapFile(ctx, wg, pcapDotExt, &event.Name, *gzip_pcaps /* compress */, true /* delete */, false /* flush */)
				} else if event.Has(fsnotify.Create) && tcpdumpwExitSignal.MatchString(event.Name) && isActive.CompareAndSwap(true, false) {
					// `tcpdumpw` signals its termination by creating the file `TCPDUMPW_EXITED` is the source directory
					tcpdumpwExitTS := time.Now()
					logger.LogEvent(zapcore.InfoLevel,
						"detected 'tcpdumpw' termination signal",
						PCAP_SIGNAL,
						map[string]interface{}{
							"event":     PCAP_SIGNAL,
							"signal":    event.Name,
							"timestamp": tcpdumpwExitTS.Format(time.RFC3339Nano),
						}, nil)
					// delete `tcpdumpw` termination signal
					os.Remove(event.Name)
					// when `tcpdumpw` signal is detected:
					//   - cancel the context which triggers final PCAP files flushing
					cancel()
					return
				}

			case fsnErr, ok := <-watcher.Errors:
				if !ok { // Channel was closed (i.e. Watcher.Close() was called).
					ticker.Stop()
					return
				}
				logger.LogEvent(zapcore.ErrorLevel, "FS watcher failed", PCAP_FSNERR, map[string]interface{}{"closed": ok}, fsnErr)

			}
		}
	}(&wg, watcher, ticker)

	go func(watcher *fsnotify.Watcher, ticker *time.Ticker) {
		for isActive.Load() {
			select {

			case <-ctx.Done():
				return

			case <-ticker.C:
				// packet capturing is write intensive
				// OS buffers memory must be fluhsed often to prevent memory saturation
				// flushing OS file write buffers is safe: 'non-destructive operation and will not free any dirty objects'
				// additionally, PCAP files are [write|append]-only
				memoryBefore, _ := getCurrentMemoryUtilization(isGAE)
				_, memFlushErr := flushBuffers()
				memoryAfter, _ := getCurrentMemoryUtilization(isGAE)
				if memFlushErr != nil {
					continue
				}
				releasedMemory := int64(memoryBefore) - int64(memoryAfter)
				logger.LogEvent(zapcore.InfoLevel,
					fmt.Sprintf("flushed OS file write buffers: memory[before=%d|after=%d] / released=%d", memoryBefore, memoryAfter, releasedMemory),
					PCAP_OSWMEM, map[string]interface{}{"before": memoryBefore, "after": memoryAfter, "released": releasedMemory}, nil)

			}
		}
	}(watcher, ticker)

	go func(watcher *fsnotify.Watcher, ticker *time.Ticker) {
		signal := <-sigChan

		signalTS := time.Now()
		deadline := 3 * time.Second

		logger.LogEvent(zapcore.InfoLevel,
			fmt.Sprintf("signaled: %v", signal),
			PCAP_SIGNAL,
			map[string]interface{}{
				"signal":    signal,
				"timestamp": signalTS.Format(time.RFC3339Nano),
			}, nil)

		timer := time.AfterFunc(deadline-time.Since(signalTS), func() {
			if isActive.CompareAndSwap(true, false) {
				// cancel the context after 3s regardless of `tcpdumpw` termination signal:
				//   - this is effectively the `max_wait_time` for `tcpdumpw` termination signal.
				cancel()
			}
		})

		pcapMutex := flock.New(pcapLockFile)
		lockData := map[string]interface{}{"lock": pcapLockFile}
		logger.LogEvent(zapcore.InfoLevel, "waiting for PCAP lock file", PCAP_FSLOCK, lockData, nil)
		lockCtx, lockCancel := context.WithTimeout(ctx, deadline-time.Since(signalTS))
		defer lockCancel()
		// `tcpdumpq` will unlock the PCAP lock file when all PCAP engines have stopped
		if locked, lockErr := pcapMutex.TryLockContext(lockCtx, 10*time.Millisecond); !locked || lockErr != nil {
			lockData["latency"] = time.Since(signalTS).String()
			logger.LogEvent(zapcore.ErrorLevel, "failed to acquire PCAP lock file", PCAP_FSLOCK, lockData, lockErr)
		} else if isActive.CompareAndSwap(true, false) {
			timer.Stop()
			lockData["latency"] = time.Since(signalTS).String()
			cancel()
			logger.LogEvent(zapcore.InfoLevel, "acquired PCAP lock file", PCAP_FSLOCK, lockData, nil)
		}
	}(watcher, ticker)

	if err == nil {
		logger.LogEvent(zapcore.InfoLevel, fmt.Sprintf("watching directory: %s", *src_dir), PCAP_FSNINI, nil, nil)
	} else if isActive.CompareAndSwap(true, false) {
		logger.LogEvent(zapcore.InfoLevel, fmt.Sprintf("error at initialization: %v", err), PCAP_FSNINI, nil, err)
		watcher.Close()
		ticker.Stop()
		cancel()
	}

	<-ctx.Done() // wait for context to be cancelled

	ticker.Stop()
	watcher.Remove(*src_dir)
	watcher.Close()

	// wait for all regular export operations to terminate
	wg.Wait()

	ctx = context.Background()
	ctx, cancel = context.WithTimeout(ctx, 5*time.Second)

	flushStart := time.Now()
	// flush remaining PCAP files after context is done
	// compression & deletion are disabled when exiting in order to speed up the process
	pendingPcapFiles := flushSrcDir(ctx, &wg, pcapDotExt,
		true /* sync */, false /* compress */, false, /* delete */
		func(_ fs.FileInfo) bool { return true },
	)

	logger.LogEvent(zapcore.InfoLevel,
		fmt.Sprintf("waiting for %d PCAP files to be flushed", pendingPcapFiles),
		PCAP_FSNEND,
		map[string]interface{}{
			"files":     pendingPcapFiles,
			"timestamp": flushStart.Format(time.RFC3339Nano),
		}, nil)

	wg.Wait() // wait for remaining PCAP failes to be flushed
	flushLatency := time.Since(flushStart)

	logger.LogEvent(zapcore.InfoLevel,
		fmt.Sprintf("flushed %d PCAP files", pendingPcapFiles),
		PCAP_FSNEND,
		map[string]interface{}{
			"files":   pendingPcapFiles,
			"latency": flushLatency.String(),
		}, nil)
}