tcpdumpw/main.go (641 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"net"
"os"
"os/signal"
"regexp"
"runtime/debug"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
// _ "net/http/pprof"
_ "time/tzdata"
"github.com/GoogleCloudPlatform/pcap-sidecar/pcap-cli/pkg/pcap"
"github.com/alphadose/haxmap"
"github.com/go-co-op/gocron/v2"
"github.com/gofrs/flock"
"github.com/google/uuid"
"github.com/wissance/stringFormatter"
pcapFilter "github.com/GoogleCloudPlatform/pcap-sidecar/tcpdumpw/pkg/filter"
)
func UNUSED(x ...interface{}) {}
var (
use_cron = flag.Bool("use_cron", false, "perform packet capture at specific intervals")
cron_exp = flag.String("cron_exp", "", "stardard cron expression; i/e: '1 * * * *'")
timezone = flag.String("timezone", "UTC", "TimeZone to be used to schedule packet captures")
duration = flag.Int("timeout", 0, "perform packet capture during this mount of seconds")
interval = flag.Int("interval", 60, "seconds after which tcpdump rotates PCAP files")
snaplen = flag.Int("snaplen", 0, "bytes to be captured from each packet")
extension = flag.String("extension", "pcap", "extension to be used for tcpdump PCAP files")
directory = flag.String("directory", "", "directory where PCAP files will be stored")
tcp_dump = flag.Bool("tcpdump", true, "enable JSON PCAP using tcpdump")
json_dump = flag.Bool("jsondump", false, "enable JSON PCAP using gopacket")
json_log = flag.Bool("jsonlog", false, "enable JSON PCAP to stardard output")
ordered = flag.Bool("ordered", false, "write JSON PCAP output as obtained from gopacket")
conntrack = flag.Bool("conntrack", false, "enable connection tracking ('ordered' is also enabled)")
gcp_env = flag.String("env", "run", "literal ID of the execution environment; any of: run, gae, gke")
gcp_run = flag.Bool("run", true, "Cloud Run execution environment")
gcp_gae = flag.Bool("gae", false, "App Engine execution environment")
gcp_gke = flag.Bool("gke", false, "Kubernetes Engine execution environment")
pcap_iface = flag.String("iface", "", "prefix to scan for network interfaces to capture from")
hc_port = flag.Uint("hc_port", 12345, "TCP port for health checking")
filter = flag.String("filter", pcap.PcapDefaultFilter, "BPF filter to be used for capturing packets")
l3_protos = flag.String("l3_protos", "ipv4,ipv6", "FQDNs to be translated into IPs to apply as packet filter")
l4_protos = flag.String("l4_protos", "tcp,udp", "FQDNs to be translated into IPs to apply as packet filter")
hosts = flag.String("hosts", "", "FQDNs to be translated into IPs to apply as packet filter")
ports = flag.String("ports", "", "TCP/UDP ports to be used in any side of the 5-tuple for a packet to be captured")
ipv4 = flag.String("ipv4", "", "IPv4s or CIDR to be applied to the packet filter")
ipv6 = flag.String("ipv6", "", "IPv6s or CIDR to be applied to the packet filter")
tcp_flags = flag.String("tcp_flags", "", "TCP flags to be set for a segment to be captured")
ephemerals = flag.String("ephemerals", "32768,65535", "range of ephemeral ports")
compat = flag.Bool("compat", false, "apply filters in Cloud Run gen1 mode")
rt_env = flag.String("rt_env", "cloud_run_gen2", "runtime where PCAP sidecar is used")
pcap_debug = flag.Bool("debug", false, "enable debug logs")
pcap_verbosity = flag.String("verbosity", "DEBUG", "PCAP translations verbosity")
supervisor = flag.String("supervisor", "http://127.0.0.1:23456", "supervisord 'serverurl'")
no_procs = flag.String("no_procs", "gcsfuse", "process for which TCP sockets should be excluded")
no_procs_interval = flag.Uint("no_procs_interval", 15, "how often to reresh sockets owned by pcap-sidecar's processes")
no_procs_debug = flag.Bool("no_procs_debug", false, "enable/disable logging of socket discovery for pcap-sidecar's processes")
)
type (
pcapTask struct {
engine pcap.PcapEngine `json:"-"`
writers []pcap.PcapWriter `json:"-"`
iface string `json:"-"`
}
tcpdumpJob struct {
ctx context.Context `json:"-"`
j *gocron.Job `json:"-"`
Xid string `json:"xid,omitempty"`
Jid string `json:"jid,omitempty"`
Name string `json:"name,omitempty"`
Tags []string `json:"-"`
tasks []*pcapTask `json:"-"`
debug bool `json:"-"`
}
jLogLevel string
jLogEntry struct {
Severity jLogLevel `json:"severity"`
Message string `json:"message"`
Sidecar string `json:"sidecar"`
Module string `json:"module"`
Job tcpdumpJob `json:"job,omitempty"`
Tags []string `json:"tags,omitempty"`
Timestamp map[string]int64 `json:"timestamp,omitempty"`
}
)
var (
projectID string = os.Getenv("PROJECT_ID")
ifacePrefixEnvVar string = os.Getenv("PCAP_IFACE_SAFE")
sidecarEnvVar string = os.Getenv("APP_SIDECAR")
moduleEnvVar string = os.Getenv("PROC_NAME")
gaeEnvVar string = os.Getenv("GCP_GAE")
hcPortEnvVar string = os.Getenv("PCAP_HC_PORT")
)
var wg sync.WaitGroup
var jid, xid atomic.Value
var jobs *haxmap.Map[string, *tcpdumpJob]
var emptyTcpdumpJob = tcpdumpJob{Jid: uuid.Nil.String()}
var (
errTcpdumpDisabled = errors.New("GCS PCAP export disabled")
errJsondumpDisabled = errors.New("GCS JSON export disabled")
errJSONLogDisabled = errors.New("STDOUT JSON log disabled")
errGaeDisabled = errors.New("GAE JSON log disabled")
)
var gaeJSONInterval = 0 // disable time based file rotation
const (
INFO jLogLevel = "INFO"
ERROR jLogLevel = "ERROR"
FATAL jLogLevel = "FATAL"
)
const (
fileNamePattern = "%d_%s__%%Y%%m%%dT%%H%%M%%S"
runFileOutput = `%s/part__` + fileNamePattern
gaeFileOutput = `/var/log/app_engine/app/app_pcap__` + fileNamePattern
pcapLockFile = "/var/lock/pcap.lock"
defaultPcapFilter = "(tcp or udp or icmp or icmp6) and (ip or ip6)"
devicesRegexTemplate = "^(?:(?:lo$)|(?:(?:ipvlan-)?%s\\d+.*$))"
)
const (
anyIfaceName string = "any"
anyIfaceIndex int = int(0)
)
const (
defaultNoProcsInterval = uint(15) // 15 seconds
maxNoProcsInterval = uint(240) // 4 minutes
)
func parsePcapVerbosity(
pcapVerbosity *string,
) pcap.PcapVerbosity {
switch strings.ToUpper(*pcapVerbosity) {
case "INFO":
return pcap.VERBOSITY_INFO
default: // DEBUG
return pcap.VERBOSITY_DEBUG
}
}
func jlog(severity jLogLevel, job *tcpdumpJob, message string) {
now := time.Now()
j := *job
// this is safe as only 1 concurrent job execution is ever allowed.
j.Xid = xid.Load().(uuid.UUID).String()
entry := &jLogEntry{
Severity: severity,
Message: message,
Sidecar: sidecarEnvVar,
Module: moduleEnvVar,
Job: j,
Tags: j.Tags,
Timestamp: map[string]int64{
"seconds": now.Unix(),
"nanos": int64(now.Nanosecond()),
},
}
jEntry, err := json.Marshal(entry)
if err != nil {
fmt.Fprintf(os.Stderr, "%+v\n", entry)
return
}
io.WriteString(os.Stdout, string(jEntry)+"\n")
}
func afterTcpdump(id uuid.UUID, name string) {
if job, jobFound := jobs.Get(id.String()); jobFound {
jlog(INFO, job, "execution complete")
j := *job.j
nextRun, _ := j.NextRun()
jlog(INFO, job, fmt.Sprintf("next execution: %v", nextRun))
}
xid.Store(uuid.Nil) // reset execution id
}
func beforeTcpdump(id uuid.UUID, name string) {
if job, jobFound := jobs.Get(id.String()); jobFound {
j := *job.j
lastRun, _ := j.LastRun()
jlog(INFO, job, fmt.Sprintf("execution started ( last execution: %v )", lastRun))
}
xid.Store(uuid.New())
}
func waitJobDone(
job *tcpdumpJob,
wg *sync.WaitGroup,
ctxDoneTS *time.Time,
deadline *time.Duration,
stopDeadline chan<- *time.Duration,
) {
jobDoneSignal := make(chan struct{})
maxWaitTime := *deadline - time.Since(*ctxDoneTS)
timer := time.NewTimer(maxWaitTime)
go func(wg *sync.WaitGroup, ctxDoneTS *time.Time, deadline *time.Duration, signal chan struct{}) {
jlog(INFO, job, fmt.Sprintf("waiting for PCAP job execution to stop | deadline: %v", *deadline))
for range job.tasks {
taskStopDeadline := *deadline - time.Since(*ctxDoneTS)
stopDeadline <- &taskStopDeadline
}
// wait for tasks to gracefully stop
wg.Wait()
close(signal)
}(wg, ctxDoneTS, &maxWaitTime, jobDoneSignal)
select {
case <-timer.C:
jlog(ERROR, job, "timed out waiting for PCAP job execution to stop")
case <-jobDoneSignal:
if !timer.Stop() {
<-timer.C
}
jlog(INFO, job, fmt.Sprintf("PCAP job execution stopped | latency: %v", time.Since(*ctxDoneTS)))
}
}
func start(ctx context.Context, timeout *time.Duration, job *tcpdumpJob) error {
var cancel context.CancelFunc
if *timeout > 0*time.Second {
ctx, cancel = context.WithTimeout(ctx, *timeout)
defer cancel()
}
stopDeadline := make(chan *time.Duration, len(job.tasks))
for _, task := range job.tasks {
wg.Add(1)
go func(ctx context.Context, wg *sync.WaitGroup, j *tcpdumpJob, t *pcapTask) {
defer wg.Done()
// all PCAP engines are context aware
err := t.engine.Start(ctx, t.writers, stopDeadline)
if err != nil {
jlog(INFO, j, fmt.Sprintf("PCAP task execution stopped: %s | %s", t.iface, err.Error()))
} else {
jlog(INFO, j, fmt.Sprintf("PCAP task execution stopped: %s", t.iface))
}
}(ctx, &wg, job, task)
}
// wait for context cancel/timeout
<-ctx.Done()
ctxDoneTS := time.Now()
deadline := 2 * time.Second
waitJobDone(job, &wg, &ctxDoneTS, &deadline, stopDeadline)
close(stopDeadline)
return ctx.Err()
}
func tcpdump(
timeout time.Duration,
debug bool,
verbosity pcap.PcapVerbosity,
) error {
jobID := jid.Load().(uuid.UUID)
exeID := xid.Load().(uuid.UUID)
var job *tcpdumpJob
var jobFound bool
if job, jobFound = jobs.Get(jobID.String()); !jobFound {
message := fmt.Sprintf("job[id:%s] not found", jobID)
jlog(ERROR, &emptyTcpdumpJob, message)
return fmt.Errorf(message)
}
// enable PCAP tasks with context awareness
id := fmt.Sprintf("job/%s/exe/%s", jobID.String(), exeID.String())
ctx := context.WithValue(job.ctx, pcap.PcapContextID, id)
ctx = context.WithValue(ctx, pcap.PcapContextLogName,
fmt.Sprintf("projects/%s/pcap/%s", projectID, id))
ctx = context.WithValue(ctx, pcap.PcapContextDebug, debug)
ctx = context.WithValue(ctx, pcap.PcapContextVerbosity, verbosity)
err := start(ctx, &timeout, job)
if err == context.DeadlineExceeded || err == context.Canceled {
// if context times out, it is a clean termination
return nil
}
return err
}
func newPcapConfig(
iface, format, output, extension, filter string,
filters []pcap.PcapFilterProvider,
compatFilters pcap.PcapFilters,
snaplen, interval int,
compat, debug, ordered, conntrack bool,
ephemerals *pcap.PcapEphemeralPorts,
verbosity pcap.PcapVerbosity,
) *pcap.PcapConfig {
return &pcap.PcapConfig{
Compat: compat,
Debug: debug,
Promisc: true,
Iface: iface,
Snaplen: snaplen,
TsType: "",
Format: format,
Output: output,
Extension: extension,
Filter: filter,
Interval: interval,
Ordered: ordered,
ConnTrack: conntrack,
Filters: filters,
CompatFilters: compatFilters,
Ephemerals: ephemerals,
Verbosity: verbosity,
}
}
func createTasks(
ctx context.Context,
ifacePrefix, timezone, directory, extension, filter *string,
filters []pcap.PcapFilterProvider,
compatFilters pcap.PcapFilters,
snaplen, interval *int,
compat, debug, tcpdump, jsondump, jsonlog, ordered, conntrack, gcpGAE *bool,
ephemerals *pcap.PcapEphemeralPorts,
verbosity pcap.PcapVerbosity,
) []*pcapTask {
tasks := []*pcapTask{}
iface := *ifacePrefix
if iface == "" {
iface = ifacePrefixEnvVar
}
isGAE, err := strconv.ParseBool(gaeEnvVar)
isGAE = (err == nil && isGAE) || *gcpGAE
var devices []*pcap.PcapDevice = nil
if strings.EqualFold(iface, anyIfaceName) {
devices = []*pcap.PcapDevice{
{
NetInterface: &net.Interface{
Name: anyIfaceName,
Index: anyIfaceIndex,
},
},
}
} else {
ifaceRegexp := regexp.MustCompile(fmt.Sprintf(devicesRegexTemplate, iface))
devices, _ = pcap.FindDevicesByRegex(ifaceRegexp)
}
for _, device := range devices {
netIface := device.NetInterface
iface := netIface.Name
ifaceAndIndex := fmt.Sprintf("%d/%s", netIface.Index, iface)
jlog(INFO, &emptyTcpdumpJob, fmt.Sprintf("configuring PCAP for iface: %s", ifaceAndIndex))
output := fmt.Sprintf(runFileOutput, *directory, netIface.Index, netIface.Name)
tcpdumpCfg := newPcapConfig(iface, "pcap", output, *extension, *filter, filters, compatFilters, *snaplen, *interval, *compat, *debug, *ordered, *conntrack, ephemerals, verbosity)
jsondumpCfg := newPcapConfig(iface, "json", output, "json", *filter, filters, compatFilters, *snaplen, *interval, *compat, *debug, *ordered, *conntrack, ephemerals, verbosity)
// premature optimization is the root of all evil
var engineErr, writerErr error = nil, nil
var tcpdumpEngine, jsondumpEngine pcap.PcapEngine = nil, nil
var jsondumpWriter, jsonlogWriter, gaejsonWriter pcap.PcapWriter = nil, nil, nil // `tcpdump` does not use custom writers
if *tcpdump {
tcpdumpEngine, engineErr = pcap.NewTcpdump(tcpdumpCfg)
} else {
engineErr = errTcpdumpDisabled
}
if engineErr == nil {
tasks = append(tasks, &pcapTask{engine: tcpdumpEngine, writers: nil, iface: iface})
jlog(INFO, &emptyTcpdumpJob, fmt.Sprintf("configured 'tcpdump' for iface: %s", ifaceAndIndex))
} else if *tcpdump {
jlog(ERROR, &emptyTcpdumpJob, fmt.Sprintf("tcpdump GCS writer creation failed: %s (%s)", ifaceAndIndex, engineErr))
}
// skip JSON setup if JSON pcap is disabled
if !*jsondump && !*jsonlog {
continue
}
engineErr = nil
jsondumpCfg.Ordered = *ordered
// some form of JSON packet capturing is enabled
jsondumpEngine, engineErr = pcap.NewPcap(jsondumpCfg)
if engineErr != nil {
jlog(ERROR, &emptyTcpdumpJob, fmt.Sprintf("jsondump task creation failed: %s (%s)", ifaceAndIndex, engineErr))
continue // abort all JSON setup for this device
}
pcapWriters := []pcap.PcapWriter{}
if *jsondump {
// writing JSON PCAP file is only enabled if `jsondump` is enabled
jsondumpWriter, writerErr = pcap.NewPcapWriter(ctx, &ifaceAndIndex, &output, &jsondumpCfg.Extension, timezone, *interval)
} else {
jsondumpWriter, writerErr = nil, errJSONLogDisabled
}
if writerErr == nil {
pcapWriters = append(pcapWriters, jsondumpWriter)
jlog(INFO, &emptyTcpdumpJob, fmt.Sprintf("configured JSON '%s' writer for iface: %s", output, ifaceAndIndex))
} else if *jsondump {
jlog(ERROR, &emptyTcpdumpJob, fmt.Sprintf("jsondump GCS writer creation failed: %s (%s)", ifaceAndIndex, writerErr))
}
// add `/dev/stdout` as an additional PCAP writer
if *jsonlog {
jsonlogWriter, writerErr = pcap.NewStdoutPcapWriter(ctx, &ifaceAndIndex)
} else {
jsonlogWriter, writerErr = nil, errJSONLogDisabled
}
if writerErr == nil {
pcapWriters = append(pcapWriters, jsonlogWriter)
jlog(INFO, &emptyTcpdumpJob, fmt.Sprintf("configured JSON 'stdout' writer for iface: %s", ifaceAndIndex))
} else if *jsonlog {
jlog(ERROR, &emptyTcpdumpJob, fmt.Sprintf("jsondump stdout writer creation failed: %s (%s)", ifaceAndIndex, writerErr))
}
// handle GAE JSON logger
gaeOutput := ""
if isGAE {
gaeOutput = fmt.Sprintf(gaeFileOutput, netIface.Index, netIface.Name)
gaejsonWriter, writerErr = pcap.NewPcapWriter(ctx, &ifaceAndIndex, &gaeOutput, &jsondumpCfg.Extension, timezone, *interval)
} else {
gaejsonWriter, writerErr = nil, errGaeDisabled
}
if writerErr == nil {
pcapWriters = append(pcapWriters, gaejsonWriter)
jlog(INFO, &emptyTcpdumpJob, fmt.Sprintf("configured GAE JSON '%s' writer for iface: %s", gaeOutput, ifaceAndIndex))
} else if isGAE {
jlog(ERROR, &emptyTcpdumpJob, fmt.Sprintf("jsondump GAE json writer creation failed: %s (%s)", ifaceAndIndex, errGaeDisabled))
}
jlog(INFO, &emptyTcpdumpJob, fmt.Sprintf("configured 'jsondump' for iface: %s", ifaceAndIndex))
tasks = append(tasks, &pcapTask{engine: jsondumpEngine, writers: pcapWriters, iface: iface})
}
return tasks
}
func startTCPListener(ctx context.Context, port *uint, job *tcpdumpJob, stopChannel chan<- bool) {
tcpListener, tcpListenerErr := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if tcpListenerErr != nil {
jlog(ERROR, &emptyTcpdumpJob, fmt.Sprintf("failed to start the TCP listener: %v", tcpListenerErr))
os.Exit(5)
}
for {
select {
case <-ctx.Done():
var err error
if err = tcpListener.Close(); err != nil {
jlog(ERROR, job, fmt.Sprintf("failed to stop TCP listener: %d | %v", *port, err))
} else {
jlog(INFO, job, fmt.Sprintf("stopped TCP listener: %d", *port))
}
stopChannel <- (err == nil)
return
// accept connections until context is done
default:
conn, err := tcpListener.Accept()
if err == nil {
conn.Close()
}
}
}
}
func waitDone(job *tcpdumpJob, pcapMutex *flock.Flock, exitSignal *string) {
// wait for all PCAP tasks to be gracefully stopped
wg.Wait()
for _, task := range job.tasks {
for _, writer := range task.writers {
writer.Rotate()
writer.Close()
}
}
// `TCPDUMPW_EXITED` file creation signals `pcapfsn` to start its own termination process
terminationSignal, err := os.OpenFile(*exitSignal, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o666)
if err == nil {
jlog(INFO, job, fmt.Sprintf("'tcpdumpw' termination signal created: %s", terminationSignal.Name()))
terminationSignal.Close()
} else {
jlog(ERROR, job, fmt.Sprintf("'tcpdumpw' termination signal creation failed: %s | %s", *exitSignal, err.Error()))
}
if unlockErr := pcapMutex.Unlock(); unlockErr != nil {
jlog(ERROR, job, fmt.Sprintf("failed to release PCAP lock file: %v", unlockErr))
} else {
jlog(INFO, job, fmt.Sprintf("released PCAP lock file: %s", pcapLockFile))
}
}
func appendFilter(
ctx context.Context,
filters []pcap.PcapFilterProvider,
compatFilters pcap.PcapFilters,
rawFilter *string,
factory pcapFilter.PcapFilterProviderFactory,
) []pcap.PcapFilterProvider {
select {
case <-ctx.Done():
return filters
default:
if *rawFilter == "" ||
strings.EqualFold(*rawFilter, "ALL") ||
strings.EqualFold(*rawFilter, "ANY") {
return filters
}
}
filter := factory(rawFilter, compatFilters)
filters = append(filters, filter)
jlog(INFO, &emptyTcpdumpJob, stringFormatter.Format("using filter: {0}", filter.String()))
return filters
}
func parseEphemeralPorts(ephemerals *string) *pcap.PcapEphemeralPorts {
// default ephemeral ports range
ephemeralPortRange := &pcap.PcapEphemeralPorts{
Min: pcap.PCAP_MIN_EPHEMERAL_PORT,
Max: pcap.PCAP_MAX_EPHEMERAL_PORT,
}
if *ephemerals == "" {
return ephemeralPortRange
}
ephemeralPorts := strings.SplitN(*ephemerals, ",", 2)
if len(ephemeralPorts) != 2 {
return ephemeralPortRange
}
for i, valueStr := range ephemeralPorts {
if value, err := strconv.ParseUint(valueStr, 10, 16); err != nil && value >= 0x0400 && value <= 0xFFFF {
// see: https://datatracker.ietf.org/doc/html/rfc6056#page-5
// a valid `ephemeral port` must be within RFC 6056 range: [1024/0x4000,65535/0xFFFF]
port := uint16(value)
if i == 0 && port < ephemeralPortRange.Max {
ephemeralPortRange.Min = uint16(value)
} else if port > ephemeralPortRange.Min {
ephemeralPortRange.Max = uint16(value)
}
}
}
return ephemeralPortRange
}
func main() {
flag.Parse()
ctx, cancel := context.WithCancel(context.Background())
defer func() {
if r := recover(); r != nil {
jlog(FATAL, &emptyTcpdumpJob, stringFormatter.Format("panic: {0}", r))
fmt.Fprintln(os.Stderr, string(debug.Stack()))
}
}()
jid.Store(uuid.Nil)
xid.Store(uuid.Nil)
if *compat || strings.EqualFold(*filter, "DISABLED") {
*filter = ""
} else {
*filter = strings.TrimSpace(*filter)
}
compatFilters := pcap.NewPcapFilters()
filters := []pcap.PcapFilterProvider{}
if *compat || *filter == "" {
// if complex filter is empty, build it using 'Simple PCAP filters'
filters = appendFilter(ctx, filters, compatFilters, l3_protos, pcapFilter.NewL3ProtoFilterProvider)
filters = appendFilter(ctx, filters, compatFilters, l4_protos, pcapFilter.NewL4ProtoFilterProvider)
filters = appendFilter(ctx, filters, compatFilters, ports, pcapFilter.NewPortsFilterProvider)
filters = appendFilter(ctx, filters, compatFilters, tcp_flags, pcapFilter.NewTCPFlagsFilterProvider)
ipFilterProvider := pcapFilter.NewIPFilterProvider(ipv4, ipv6, hosts, compatFilters)
if _, ok := ipFilterProvider.Get(ctx); ok {
jlog(INFO, &emptyTcpdumpJob, stringFormatter.Format("using filter: {0}", ipFilterProvider.String()))
filters = append(filters, ipFilterProvider)
}
if len(filters) == 0 && !*compat {
// if no simple filters are available:
// - use a default 'catch-all' filter
// - but only if compat mode is disabled
*filter = string(pcap.PcapDefaultFilter)
}
}
noProcsInterval := *no_procs_interval
if noProcsInterval > maxNoProcsInterval {
noProcsInterval = maxNoProcsInterval
}
processFilter := pcapFilter.NewProcessFilterProvider(supervisor, no_procs, uint8(noProcsInterval), *no_procs_debug, compatFilters)
// initialize `ProcessFilterProvider` for its side effects
filters = append(filters, processFilter.Initialize(ctx))
ephemeralPortRange := parseEphemeralPorts(ephemerals)
pcapVerbosity := parsePcapVerbosity(pcap_verbosity)
tasks := createTasks(ctx, pcap_iface, timezone, directory, extension,
filter, filters, compatFilters, snaplen, interval, compat, pcap_debug, tcp_dump,
json_dump, json_log, ordered, conntrack, gcp_gae, ephemeralPortRange, pcapVerbosity)
if len(tasks) == 0 {
jlog(FATAL, &emptyTcpdumpJob, "no PCAP tasks available")
os.Exit(1)
}
pcapMutex := flock.New(pcapLockFile)
if locked, lockErr := pcapMutex.TryLock(); !locked || lockErr != nil {
jlog(FATAL, &emptyTcpdumpJob, fmt.Sprintf("failed to acquire PCAP lock | locked: %t | %v", locked, lockErr))
os.Exit(2)
}
jobs = haxmap.New[string, *tcpdumpJob]()
timeout := time.Duration(*duration) * time.Second
jlog(INFO, &emptyTcpdumpJob, fmt.Sprintf("parsed timeout: %v", timeout))
// the file to be created when `tcpdumpw` exists
exitSignal := fmt.Sprintf("%s/TCPDUMPW_EXITED", *directory)
// receives status of TCP listener termination: `true` means successful
tcpStopChannel := make(chan bool, 1)
// create empty job: used if CRON is not enabled
job := &tcpdumpJob{
Jid: uuid.Nil.String(),
tasks: tasks,
debug: *pcap_debug,
}
jlog(INFO, job, fmt.Sprintf("acquired PCAP lock: %s", pcapLockFile))
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
go func() {
signal := <-signals
jlog(INFO, job, fmt.Sprintf("signaled: %v", signal))
cancel()
// unblock TCP listener; next iteration will find `ctx` done
conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", *hc_port))
if err == nil {
conn.Close()
}
}()
// Skip scheduling, execute `tcpdump` immediately
if !*use_cron {
id := uuid.New().String()
ctx = context.WithValue(ctx, pcap.PcapContextID, id)
logName := fmt.Sprintf("projects/%s/pcaps/%s", os.Getenv("PROJECT_ID"), id)
ctx = context.WithValue(ctx, pcap.PcapContextLogName, logName)
ctx = context.WithValue(ctx, pcap.PcapContextDebug, *pcap_debug)
ctx = context.WithValue(ctx, pcap.PcapContextVerbosity, pcapVerbosity)
// start the TCP listener for health checks
go startTCPListener(ctx, hc_port, job, tcpStopChannel)
start(ctx, &timeout, job)
waitDone(job, pcapMutex, &exitSignal)
<-tcpStopChannel
close(tcpStopChannel)
return
}
// The `timezone` to be used when scheduling `tcpdump` cron jobs
location, err := time.LoadLocation(*timezone)
if err != nil {
*timezone = "UTC"
location = time.UTC
jlog(ERROR, &emptyTcpdumpJob, fmt.Sprintf("could not load timezone '%s': %v", *timezone, err))
}
jlog(INFO, &emptyTcpdumpJob, fmt.Sprintf("parsed timezone: %v", location))
// Create a scheduler using the requested timezone.
// no more than 1 packet capturing job (all its tasks) should ever be executed.
s, err := gocron.NewScheduler(
gocron.WithLimitConcurrentJobs(1, gocron.LimitModeReschedule),
gocron.WithLocation(location),
gocron.WithGlobalJobOptions(
gocron.WithTags(
os.Getenv("PROJECT_ID"),
os.Getenv("APP_SERVICE"),
os.Getenv("GCP_REGION"),
os.Getenv("APP_REVISION"),
os.Getenv("INSTANCE_ID"),
),
),
)
if err != nil {
jlog(ERROR, &emptyTcpdumpJob, fmt.Sprintf("failed to create scheduler: %v", err))
os.Exit(3)
}
// Use the provided `cron` expression ro schedule the packet capturing job
j, err := s.NewJob(
gocron.CronJob(fmt.Sprintf("TZ=%s %s", *timezone, *cron_exp), true),
gocron.NewTask(tcpdump, timeout, *pcap_debug, pcapVerbosity),
gocron.WithName("tcpdump"),
gocron.WithSingletonMode(gocron.LimitModeReschedule),
gocron.WithEventListeners(
gocron.AfterJobRuns(afterTcpdump),
gocron.BeforeJobRuns(beforeTcpdump),
),
)
if err != nil {
jlog(ERROR, &emptyTcpdumpJob, fmt.Sprintf("failed to create scheduled job: %v", err))
s.Shutdown()
os.Exit(4)
}
jid.Store(j.ID())
// redefine default `job` with the scheduled one
job = &tcpdumpJob{
ctx: ctx,
tasks: tasks,
Jid: j.ID().String(),
Name: j.Name(),
Tags: j.Tags(),
j: &j,
}
jobs.Set(job.Jid, job)
jlog(INFO, job, "scheduled job")
// Start the packet capturing scheduler
s.Start()
nextRun, _ := j.NextRun()
jlog(INFO, job, fmt.Sprintf("next execution: %v", nextRun))
// start the TCP listener for health checks
go startTCPListener(ctx, hc_port, job, tcpStopChannel)
// Block main goroutine until a signal is received
<-ctx.Done()
s.StopJobs()
s.RemoveJob(j.ID())
s.Shutdown()
jlog(INFO, job, "scheduler terminated")
waitDone(job, pcapMutex, &exitSignal)
<-tcpStopChannel
close(tcpStopChannel)
}