dlp/snippets/inspect/inspect_gcs_with_sampling.go (118 lines of code) (raw):
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package inspect
// [START dlp_inspect_gcs_with_sampling]
import (
"context"
"fmt"
"io"
"time"
dlp "cloud.google.com/go/dlp/apiv2"
"cloud.google.com/go/dlp/apiv2/dlppb"
"cloud.google.com/go/pubsub"
)
// inspectGcsFileWithSampling inspects a storage with sampling
func inspectGcsFileWithSampling(w io.Writer, projectID, gcsUri, topicID, subscriptionId string) error {
// projectId := "your-project-id"
// gcsUri := "gs://" + "your-bucket-name" + "/path/to/your/file.txt"
// topicID := "your-pubsub-topic-id"
// subscriptionId := "your-pubsub-subscription-id"
ctx := context.Background()
// Initialize a client once and reuse it to send multiple requests. Clients
// are safe to use across goroutines. When the client is no longer needed,
// call the Close method to cleanup its resources.
client, err := dlp.NewClient(ctx)
if err != nil {
return err
}
// Closing the client safely cleans up background resources.
defer client.Close()
// Specify the GCS file to be inspected and sampling configuration
var cloudStorageOptions = &dlppb.CloudStorageOptions{
FileSet: &dlppb.CloudStorageOptions_FileSet{
Url: gcsUri,
},
BytesLimitPerFile: int64(200),
FileTypes: []dlppb.FileType{
dlppb.FileType_TEXT_FILE,
},
FilesLimitPercent: int32(90),
SampleMethod: dlppb.CloudStorageOptions_RANDOM_START,
}
var storageConfig = &dlppb.StorageConfig{
Type: &dlppb.StorageConfig_CloudStorageOptions{
CloudStorageOptions: cloudStorageOptions,
},
}
// Specify the type of info the inspection will look for.
// See https://cloud.google.com/dlp/docs/infotypes-reference for complete list of info types
// Specify how the content should be inspected.
var inspectConfig = &dlppb.InspectConfig{
InfoTypes: []*dlppb.InfoType{
{Name: "PERSON_NAME"},
},
ExcludeInfoTypes: true,
IncludeQuote: true,
MinLikelihood: dlppb.Likelihood_POSSIBLE,
}
// Create a PubSub Client used to listen for when the inspect job finishes.
pubsubClient, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return err
}
defer pubsubClient.Close()
// Create a PubSub subscription we can use to listen for messages.
// Create the Topic if it doesn't exist.
t := pubsubClient.Topic(topicID)
if exists, err := t.Exists(ctx); err != nil {
return err
} else if !exists {
if t, err = pubsubClient.CreateTopic(ctx, topicID); err != nil {
return err
}
}
// Create the Subscription if it doesn't exist.
s := pubsubClient.Subscription(subscriptionId)
if exists, err := s.Exists(ctx); err != nil {
return err
} else if !exists {
if s, err = pubsubClient.CreateSubscription(ctx, subscriptionId, pubsub.SubscriptionConfig{Topic: t}); err != nil {
return err
}
}
// topic is the PubSub topic string where messages should be sent.
topic := "projects/" + projectID + "/topics/" + topicID
var action = &dlppb.Action{
Action: &dlppb.Action_PubSub{
PubSub: &dlppb.Action_PublishToPubSub{
Topic: topic,
},
},
}
// Configure the long running job we want the service to perform.
var inspectJobConfig = &dlppb.InspectJobConfig{
StorageConfig: storageConfig,
InspectConfig: inspectConfig,
Actions: []*dlppb.Action{
action,
},
}
// Create the request for the job configured above.
req := &dlppb.CreateDlpJobRequest{
Parent: fmt.Sprintf("projects/%s/locations/global", projectID),
Job: &dlppb.CreateDlpJobRequest_InspectJob{
InspectJob: inspectJobConfig,
},
}
// Use the client to send the request.
j, err := client.CreateDlpJob(ctx, req)
if err != nil {
return err
}
fmt.Fprintf(w, "Job Created: %v", j.GetName())
// Wait for the inspect job to finish by waiting for a PubSub message.
// This only waits for 10 minutes. For long jobs, consider using a truly
// asynchronous execution model such as Cloud Functions.
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
err = s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
// If this is the wrong job, do not process the result.
if msg.Attributes["DlpJobName"] != j.GetName() {
msg.Nack()
return
}
msg.Ack()
// Stop listening for more messages.
defer cancel()
resp, err := client.GetDlpJob(ctx, &dlppb.GetDlpJobRequest{
Name: j.GetName(),
})
if err != nil {
fmt.Fprintf(w, "Error getting completed job: %v\n", err)
return
}
r := resp.GetInspectDetails().GetResult().GetInfoTypeStats()
if len(r) == 0 {
fmt.Fprintf(w, "No results")
return
}
for _, s := range r {
fmt.Fprintf(w, "\nFound %v instances of infoType %v\n", s.GetCount(), s.GetInfoType().GetName())
}
})
if err != nil {
return err
}
return nil
}
// [END dlp_inspect_gcs_with_sampling]