pcap-fsnotify/internal/gcs/client_library_exporter.go (361 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package gcs import ( "context" "fmt" "maps" "net" "strings" "time" "cloud.google.com/go/storage" "github.com/GoogleCloudPlatform/pcap-sidecar/pcap-fsnotify/internal/log" "github.com/avast/retry-go/v4" "github.com/googleapis/gax-go/v2" "github.com/googleapis/gax-go/v2/callctx" "github.com/pkg/errors" sf "github.com/wissance/stringFormatter" "go.uber.org/zap/zapcore" "google.golang.org/api/googleapi" "google.golang.org/api/option" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" ) type ( libraryExporter struct { *exporter projectID string service string instanceID string bucket string client *storage.Client handle *storage.BucketHandle dialer *net.Dialer keepalive keepalive.ClientParameters } contextKey string ) const ( sourcePcapFile = contextKey("source_pcap_file") targetPcapFile = contextKey("target_pcap_file") // see: https://pkg.go.dev/google.golang.org/grpc#WithContextDialer gcsEndpoint = "passthrough:storage.googleapis.com" gcsPort = uint16(443) ) func (x *libraryExporter) onIntialized( client *storage.Client, handle *storage.BucketHandle, attrs *storage.BucketAttrs, ) *libraryExporter { x.client = client x.handle = handle.UserProject(x.projectID) bucketName := string(attrs.Name) x.bucket = bucketName data := map[string]any{ "bucket": bucketName, } for label, value := range attrs.Labels { data[label] = value } x.logger.LogEvent( zapcore.InfoLevel, sf.Format("initialized GCS client library exporter with bucket: {0}", bucketName), PCAP_EXPORT, data, nil) return x } func (x *libraryExporter) setBucketHandle( ctx context.Context, client *storage.Client, ) (*libraryExporter, error) { bucket := string(x.bucket) bucketHandle := client.Bucket(bucket) if bucketHandle == nil { return x, fmt.Errorf("GCS bucket is unavailable: %s", bucket) } if attrs, err := bucketHandle.Attrs(ctx); err == nil { return x.onIntialized(client, bucketHandle, attrs), nil } else { x.logger.LogEvent( zapcore.ErrorLevel, sf.Format("failed to initialize GCS client library exporter with bucket: {0}", bucket), PCAP_EXPORT, map[string]any{ "bucket": bucket, }, err) return x, err } } func (x *libraryExporter) gcsRemoteAddr( gcsEndpoint *string, ) string { return sf.Format("{0}:{1}", *gcsEndpoint, gcsPort) } func (x *libraryExporter) connect( ctx context.Context, address *string, ) (net.Conn, error) { addr := *address return retry.DoWithData( func() (net.Conn, error) { // [ToDo]: set network to `tcp4` only when VPC for ALL egress networking is used; otherwise, use `tcp`: // - when VPC is used for ALL egress networking, IPv4 is the only protocol supported for external hosts // - network is currently set to `tcp4` to support ALL egress networking configurations. return x.dialer.DialContext(ctx, "tcp4", addr) }, retry.Context(ctx), retry.Attempts(30), retry.Delay(1*time.Second), retry.DelayType(retry.FixedDelay), retry.OnRetry(func( attempt uint, err error, ) { _attempt := (attempt + 1) x.logger.LogEvent( zapcore.WarnLevel, sf.Format("failed to connect at attempt {0}: {1}", _attempt, addr), PCAP_EXPORT, map[string]any{ "address": *address, "attempt": addr, }, err) }), ) } func (x *libraryExporter) dialContext( ctx context.Context, addr string, ) (net.Conn, error) { address := x.gcsRemoteAddr(&addr) data := map[string]any{ "endpoint": addr, "address": address, "bucket": x.bucket, } if conn, err := x.connect(ctx, &address); err != nil { x.logger.LogEvent( zapcore.ErrorLevel, sf.Format("failed to connect to GCS: {0}", address), PCAP_EXPORT, data, err) return nil, err } else { remoteAddrStr := conn.RemoteAddr().String() info := map[string]any{ "local": conn.LocalAddr().String(), "remote": remoteAddrStr, } maps.Copy(info, data) x.logger.LogEvent( zapcore.InfoLevel, sf.Format("connected to GCS via: {0} => {1}", address, remoteAddrStr), PCAP_EXPORT, info, nil) return conn, nil } } func (x *libraryExporter) streamInterceptor( ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption, ) (grpc.ClientStream, error) { target := cc.CanonicalTarget() x.logger.LogEvent( zapcore.InfoLevel, sf.Format("GCS operation: {0}{1}", target, method), PCAP_EXPORT, map[string]any{ "target": target, "stream": desc.StreamName, "state": cc.GetState().String(), "bucket": x.bucket, }, nil) return streamer(ctx, desc, cc, method, opts...) } func (x *libraryExporter) initialize( ctx context.Context, ) (*libraryExporter, error) { client, err := storage.NewGRPCClient(ctx, option.WithGRPCDialOption( grpc.WithNoProxy(), ), option.WithGRPCDialOption( grpc.WithLocalDNSResolution(), ), option.WithGRPCDialOption( grpc.WithIdleTimeout(5*time.Minute), ), option.WithGRPCDialOption( grpc.WithContextDialer(x.dialContext), ), option.WithGRPCDialOption( grpc.WithKeepaliveParams(x.keepalive), ), option.WithGRPCDialOption( grpc.WithStreamInterceptor(x.streamInterceptor), ), option.WithGRPCConnectionPool(2), option.WithQuotaProject(x.projectID), option.WithEndpoint(gcsEndpoint), option.WithRequestReason("pcap-sidecar"), option.WithUserAgent("pcap-sidecar"), option.WithTelemetryDisabled(), storage.WithDisabledClientMetrics(), ) if err != nil { return x, errors.Wrap(err, "failed to create gRPC GCS client") } return x.setBucketHandle(ctx, client) } func (x *libraryExporter) newObject( srcPcapFile *string, tgtPcapFile *string, ) *storage.ObjectHandle { attempts := uint8(0) return x.handle. Object(*tgtPcapFile). Retryer( storage.WithBackoff(gax.Backoff{ Initial: 2 * time.Second, Max: time.Duration(x.maxRetries) * x.retriesDelay * time.Second, }), storage.WithMaxAttempts(int(x.maxRetries)), storage.WithErrorFunc(func(err error) bool { // see: https://pkg.go.dev/cloud.google.com/go/storage#WithErrorFunc // GCS client always calls this function, even when `err` is `nil` if err == nil { // if `err` is `nil`, then prevent retyring by returning `false`: // - which we assume is the logical thing to do if there is no error... return false } attempts += 1 x.logger.LogEvent( zapcore.WarnLevel, sf.Format("failed to EXPORT file at attempt {0}: {1}", attempts, *srcPcapFile), PCAP_EXPORT, map[string]any{ "source": *srcPcapFile, "target": tgtPcapFile, "attempt": attempts, }, err, ) return true }), storage.WithPolicy(storage.RetryAlways), ) } func (x *libraryExporter) newObjectName( srcPcapFile *string, compress bool, ) string { tgtPcapFile := x.toTargetPcapFile(srcPcapFile, compress) parts := strings.Split(tgtPcapFile, "/") // skip local directory: `${0}/${1:PCAP_DIR}/...` return strings.Join(parts[2:], "/") } func (x *libraryExporter) setHeaders( ctx context.Context, ) context.Context { // [ToDo]: add details about: execution-environment. // see: https://cloud.google.com/storage/docs/audit-logging return callctx.SetHeaders(ctx, "x-goog-custom-audit-project", x.projectID, "x-goog-custom-audit-service", x.service, "x-goog-custom-audit-instance-id", x.instanceID, "x-goog-custom-audit-gcs-bucket", x.bucket, ) } func (x *libraryExporter) newWriter( ctx context.Context, srcPcapFile *string, tgtPcapFile *string, object *storage.ObjectHandle, ) *storage.Writer { // see: https://github.com/googleapis/google-cloud-go/blob/storage/v1.51.0/storage/storage.go#L1233 writer := object.NewWriter(x.setHeaders(ctx)) writer.Bucket = x.bucket writer.Name = *tgtPcapFile writer.Metadata = map[string]string{ "creator": "pcap-sidecar", "project": x.projectID, "instance": x.instanceID, } writer.ChunkSize = googleapi.DefaultUploadChunkSize return writer } func (x *libraryExporter) onExported( cw ClosableWriter, src *string, tgt *string, size *int64, ) error { x.logger.LogFsEvent( zapcore.InfoLevel, sf.Format("sent {0} bytes into gs://{1}/{2}", *size, x.bucket, *tgt), PCAP_EXPORT, *src, *tgt, *size, nil) return cw.Close() } func (x *libraryExporter) Export( ctx context.Context, srcPcapFile *string, compress bool, delete bool, ) (*string, *int64, error) { ctx = context.WithValue(ctx, sourcePcapFile, *srcPcapFile) tgtPcapFile := x.newObjectName(srcPcapFile, compress) ctx = context.WithValue(ctx, targetPcapFile, tgtPcapFile) object := x.newObject(srcPcapFile, &tgtPcapFile) writer := x.newWriter(ctx, srcPcapFile, &tgtPcapFile, object) pcapBytes, err := x.export(srcPcapFile, &tgtPcapFile, writer, compress, delete, x.onExported) return &tgtPcapFile, &pcapBytes, err } func NewClientLibraryExporter( ctx context.Context, logger *log.Logger, projectID string, service string, instanceID string, bucket string, directory string, maxRetries uint, retriesDelay uint, ) Exporter { x := newExporter(logger, directory, maxRetries, retriesDelay) exporter := &libraryExporter{ exporter: x, projectID: projectID, service: service, instanceID: instanceID, bucket: bucket, dialer: &net.Dialer{ Timeout: 5 * time.Minute, KeepAliveConfig: net.KeepAliveConfig{ Enable: true, Idle: 30 * time.Second, Interval: 15 * time.Second, Count: 2, }, }, keepalive: keepalive.ClientParameters{ Time: 60 * time.Second, Timeout: 10 * time.Second, PermitWithoutStream: true, }, } if exporter, err := exporter. initialize(ctx); err == nil { return exporter } else { logger.LogEvent( zapcore.ErrorLevel, "failed to create PCAP files client library exporter", PCAP_EXPORT, map[string]any{ "project": projectID, "bucket": bucket, }, err) } // return the NIL exporter by default return NewNilExporter(logger) }