pcap-fsnotify/internal/gcs/fuse_exporter.go (100 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gcs
import (
"context"
"os"
"github.com/GoogleCloudPlatform/pcap-sidecar/pcap-fsnotify/internal/log"
"github.com/avast/retry-go/v4"
"github.com/pkg/errors"
sf "github.com/wissance/stringFormatter"
"go.uber.org/zap/zapcore"
)
type (
fuseExporter struct {
*exporter
}
)
func (x *fuseExporter) newFile(
srcPcapFile *string,
tgtPcapFile *string,
) (*os.File, error) {
return os.OpenFile(
*tgtPcapFile,
os.O_RDWR|os.O_CREATE|os.O_EXCL,
0o666,
)
}
func (x *fuseExporter) onExported(
cw ClosableWriter,
src *string,
tgt *string,
size *int64,
) error {
x.logger.LogFsEvent(
zapcore.InfoLevel,
sf.Format(
"copied {0} bytes into file: {1}",
*size, *tgt,
),
PCAP_EXPORT,
*src,
*tgt,
*size,
nil)
return cw.Close()
}
func (x *fuseExporter) Export(
ctx context.Context,
srcPcapFile *string,
compress bool,
delete bool,
) (*string, *int64, error) {
tgtPcapFile := x.toTargetPcapFile(srcPcapFile, compress)
var pcapBytes int64 = 0
// Create destination PCAP file ( when using Fuse this is the same as exporting to the GCS Bucket )
pcapFileWriter, err := x.newFile(srcPcapFile, &tgtPcapFile)
if err != nil {
x.logger.LogFsEvent(
zapcore.ErrorLevel,
sf.Format("failed to CREATE file: {0}", tgtPcapFile),
PCAP_EXPORT,
*srcPcapFile,
tgtPcapFile,
0,
err)
return &tgtPcapFile, &pcapBytes, errors.Wrap(err,
sf.Format("failed to create destination pcap: {0}", tgtPcapFile))
}
// x.logger.logFsEvent(zapcore.InfoLevel, fmt.Sprintf("CREATED: %s", tgtPcap), PCAP_EXPORT, *srcPcap, tgtPcap, 0)
pcapBytes, err = retry.DoWithData(func() (int64, error) {
// Copy source PCAP into destination PCAP directory, compressing destination PCAP is optional
return x.export(srcPcapFile, &tgtPcapFile, pcapFileWriter, compress, delete, x.onExported)
},
retry.Context(ctx),
retry.Attempts(x.maxRetries),
retry.Delay(x.retriesDelay),
retry.DelayType(retry.FixedDelay),
retry.OnRetry(func(attempt uint, err error) {
x.logger.LogEvent(
zapcore.WarnLevel,
sf.Format(
"failed to COPY file at attempt {0}: {1}",
attempt+1, *srcPcapFile,
),
PCAP_EXPORT,
map[string]any{
"source": *srcPcapFile,
"target": tgtPcapFile,
"attempt": attempt + 1,
},
err)
}))
return &tgtPcapFile, &pcapBytes, nil
}
func NewFuseExporter(
logger *log.Logger,
directory string,
maxRetries uint,
retriesDelay uint,
) Exporter {
x := newExporter(logger, directory, maxRetries, retriesDelay)
return &fuseExporter{
exporter: x,
}
}