in tools/mongodb-hybrid-dlp/dlpfunction.go [277:382]
func (s *MongoScanner) HybridInspect(ctx context.Context, change MongoChange, original map[string]interface{}, redacted bson.D) error {
var err error
if s.DlpClient == nil {
options := []option.ClientOption{option.WithUserAgent("google-pso-tool/mongodb-hybrid-dlp/0.1.0")}
if s.GcpDlpEndpoint != "" {
options = append(options, option.WithEndpoint(s.GcpDlpEndpoint))
}
if s.GcpBillingProject != "" {
options = append(options, option.WithQuotaProject(s.GcpBillingProject))
}
s.DlpClient, err = dlp.NewClient(ctx, options...)
if err != nil {
return err
}
}
marshaledDoc, err := bson.MarshalExtJSON(redacted, true, false)
if err != nil {
log.Error().Err(err).Msg("Failed to marshal redacted document")
return err
}
marshaledDocHash := sha256.New()
marshaledDocHash.Write([]byte(marshaledDoc))
hashSum := fmt.Sprintf("%x", marshaledDocHash.Sum(nil))
if _, exists := s.Cache.Get(hashSum); exists {
log.Info().Str("hash", hashSum).Msg("Document already processed")
return nil
}
contentItem := &dlppb.ContentItem{
DataItem: &dlppb.ContentItem_Value{
Value: string(marshaledDoc),
},
}
container := &dlppb.Container{
Type: change.DlpType(),
FullPath: change.DlpFullPath(),
RelativePath: "/" + change.DlpRelativePath(),
RootPath: "/" + change.DlpRootPath(),
Version: change.DlpVersion(),
}
labels := map[string]string{}
hybridFindingDetails := &dlppb.HybridFindingDetails{
ContainerDetails: container,
Labels: labels,
}
hybridContentItem := &dlppb.HybridContentItem{
Item: contentItem,
FindingDetails: hybridFindingDetails,
}
if !s.DlpJobActive {
activateJobReq := &dlppb.ActivateJobTriggerRequest{
Name: s.GcpDlpTriggerName,
}
log.Info().Str("triggerID", s.GcpDlpTriggerName).Msg("Activating DLP job...")
activateRes, err := s.DlpClient.ActivateJobTrigger(ctx, activateJobReq)
if err != nil {
if !strings.Contains(err.Error(), "already running") {
log.Error().Err(err).Msg("DLP job activation failed")
return err
}
s.DlpJobActive = true
log.Warn().Str("triggerID", s.GcpDlpTriggerName).Msg("Job is already running")
listReq := &dlppb.ListDlpJobsRequest{
Parent: fmt.Sprintf("projects/%s", s.GcpBillingProject),
}
for resp, err := range s.DlpClient.ListDlpJobs(ctx, listReq).All() {
if err != nil {
break
}
if resp.GetJobTriggerName() == s.GcpDlpTriggerName && resp.GetState() == dlppb.DlpJob_ACTIVE {
s.GcpDlpTriggerJobId = resp.GetName()
log.Warn().Str("jobID", s.GcpDlpTriggerJobId).Msg("Found existing active job in ACTIVE state")
}
}
} else {
s.GcpDlpTriggerJobId = activateRes.Name
log.Info().Str("jobID", s.GcpDlpTriggerJobId).Msg("DLP trigger job activated")
s.DlpJobActive = true
}
}
req := &dlppb.HybridInspectJobTriggerRequest{
Name: s.GcpDlpTriggerName,
HybridItem: hybridContentItem,
}
// Send the hybrid inspect request.
_, err = s.DlpClient.HybridInspectJobTrigger(ctx, req)
if err != nil {
return err
}
s.Cache.Set(hashSum, true)
return nil
}