pcap-fsnotify/internal/gcs/exporter.go (215 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package gcs import ( "compress/gzip" "context" "fmt" "io" "os" "path/filepath" "time" "github.com/GoogleCloudPlatform/pcap-sidecar/pcap-fsnotify/internal/constants" "github.com/GoogleCloudPlatform/pcap-sidecar/pcap-fsnotify/internal/log" "github.com/pkg/errors" sf "github.com/wissance/stringFormatter" "go.uber.org/zap/zapcore" ) type ( ClosableWriter interface { io.Writer io.Closer } Exporter interface { Export( ctx context.Context, srcPcap *string, compress bool, delete bool, ) (*string, *int64, error) } exporter struct { directory string maxRetries uint retriesDelay time.Duration logger *log.Logger } nilExporter struct { *exporter } exportCallback func( cw ClosableWriter, srcPcapFile *string, tgtPcapFile *string, pcapBytes *int64, ) error ) const ( PCAP_EXPORT = constants.PCAP_EXPORT ) var nilExporterError = fmt.Errorf("GCS export is disabled") func newExporter( logger *log.Logger, directory string, maxRetries uint, retriesDelay uint, ) *exporter { return &exporter{ directory: directory, maxRetries: maxRetries, retriesDelay: time.Duration(retriesDelay) * time.Second, logger: logger, } } func NewNilExporter( logger *log.Logger, ) Exporter { return &nilExporter{ exporter: newExporter(logger, "", 0, 0), } } func (x *nilExporter) Export( ctx context.Context, srcPcapFile *string, compress bool, delete bool, ) (*string, *int64, error) { tgtPcap := "" pcapBytes := int64(0) err := errors.Wrap( nilExporterError, sf.Format("not exported: {0}", *srcPcapFile), ) x.logger.LogEvent( zapcore.WarnLevel, sf.Format("lost PCAP file: {0}", *srcPcapFile), PCAP_EXPORT, map[string]any{ "source": *srcPcapFile, "target": x.toTargetPcapFile(srcPcapFile, compress), }, err) return &tgtPcap, &pcapBytes, err } func (x *exporter) toTargetPcapFile( srcPcapFile *string, compress bool, ) string { pcapFileName := filepath.Base(*srcPcapFile) tgtPcapFile := filepath.Join(x.directory, pcapFileName) // If compressing PCAP files is enabled, add `gz` siffux to the destination PCAP file path if compress { return sf.Format("{0}.gz", tgtPcapFile) } return tgtPcapFile } func (x *exporter) export( srcPcapFile *string, tgtPcapFile *string, outputPcapWriter ClosableWriter, compress bool, delete bool, callback exportCallback, ) (int64, error) { pcapBytes := int64(0) // Open source PCAP file: the one thas is being moved to the destination directory inputPcapWriter, err := os.OpenFile(*srcPcapFile, os.O_RDONLY|os.O_EXCL, 0) if err != nil { x.logger.LogFsEvent( zapcore.ErrorLevel, sf.Format("failed to OPEN file {0}", *srcPcapFile), PCAP_EXPORT, *srcPcapFile, *tgtPcapFile, 0, err) return pcapBytes, errors.Wrap(err, sf.Format("failed to open source pcap: {0}", *srcPcapFile)) } // Copy source PCAP into destination PCAP, compressing destination PCAP is optional if compress { // see: https://pkg.go.dev/compress/gzip#NewWriter gzipPcap := gzip.NewWriter(outputPcapWriter) pcapBytes, err = io.Copy(gzipPcap, inputPcapWriter) gzipPcap.Flush() gzipPcap.Close() // this is still required; `Close()` on parent `Writer` does not trigger `Close()` at `gzip` } else { pcapBytes, err = io.Copy(outputPcapWriter, inputPcapWriter) } if err != nil { inputPcapWriter.Close() x.logger.LogFsEvent( zapcore.ErrorLevel, sf.Format("failed to COPY file: {0}", *srcPcapFile), PCAP_EXPORT, *srcPcapFile, *tgtPcapFile, 0, err) return pcapBytes, errors.Wrapf(err, "failed to COPY file: %s", *srcPcapFile) } // closing `outputPcapWriter` is responsibility of the caller of this method inputPcapWriter.Close() if err != nil { x.logger.LogFsEvent( zapcore.ErrorLevel, sf.Format("failed to EXPORT file: {0}", *srcPcapFile), PCAP_EXPORT, *srcPcapFile, *tgtPcapFile, pcapBytes, err) return pcapBytes, errors.Wrap(err, sf.Format("failed to COPY file: {0}", *srcPcapFile)) } if err = callback( outputPcapWriter, srcPcapFile, tgtPcapFile, &pcapBytes, ); err != nil { x.logger.LogFsEvent( zapcore.ErrorLevel, sf.Format( "failed to EXPORT file: {0}", *srcPcapFile, ), PCAP_EXPORT, *srcPcapFile, *tgtPcapFile, pcapBytes, err) return pcapBytes, errors.Wrap(err, sf.Format("failed to EXPORT file: {0}", *srcPcapFile)) } x.logger.LogFsEvent( zapcore.InfoLevel, sf.Format("EXPORTED: {0}", *srcPcapFile), PCAP_EXPORT, *srcPcapFile, *tgtPcapFile, pcapBytes, nil) if delete { // remove the source PCAP file if copying is sucessful err = os.Remove(*srcPcapFile) if err != nil { x.logger.LogFsEvent( zapcore.ErrorLevel, sf.Format( "failed to DELETE file: {0}", *srcPcapFile, ), PCAP_EXPORT, *srcPcapFile, *tgtPcapFile, pcapBytes, err) } else { x.logger.LogFsEvent( zapcore.InfoLevel, sf.Format( "DELETED: {0}", *srcPcapFile, ), PCAP_EXPORT, *srcPcapFile, *tgtPcapFile, pcapBytes, nil) } } return pcapBytes, nil }