func()

in tools/mongodb-hybrid-dlp/dlpfunction.go [277:382]


func (s *MongoScanner) HybridInspect(ctx context.Context, change MongoChange, original map[string]interface{}, redacted bson.D) error {
	var err error
	if s.DlpClient == nil {
		options := []option.ClientOption{option.WithUserAgent("google-pso-tool/mongodb-hybrid-dlp/0.1.0")}
		if s.GcpDlpEndpoint != "" {
			options = append(options, option.WithEndpoint(s.GcpDlpEndpoint))
		}
		if s.GcpBillingProject != "" {
			options = append(options, option.WithQuotaProject(s.GcpBillingProject))
		}
		s.DlpClient, err = dlp.NewClient(ctx, options...)
		if err != nil {
			return err
		}
	}

	marshaledDoc, err := bson.MarshalExtJSON(redacted, true, false)
	if err != nil {
		log.Error().Err(err).Msg("Failed to marshal redacted document")
		return err
	}

	marshaledDocHash := sha256.New()
	marshaledDocHash.Write([]byte(marshaledDoc))
	hashSum := fmt.Sprintf("%x", marshaledDocHash.Sum(nil))
	if _, exists := s.Cache.Get(hashSum); exists {
		log.Info().Str("hash", hashSum).Msg("Document already processed")
		return nil
	}

	contentItem := &dlppb.ContentItem{
		DataItem: &dlppb.ContentItem_Value{
			Value: string(marshaledDoc),
		},
	}

	container := &dlppb.Container{
		Type:         change.DlpType(),
		FullPath:     change.DlpFullPath(),
		RelativePath: "/" + change.DlpRelativePath(),
		RootPath:     "/" + change.DlpRootPath(),
		Version:      change.DlpVersion(),
	}

	labels := map[string]string{}

	hybridFindingDetails := &dlppb.HybridFindingDetails{
		ContainerDetails: container,
		Labels:           labels,
	}

	hybridContentItem := &dlppb.HybridContentItem{
		Item:           contentItem,
		FindingDetails: hybridFindingDetails,
	}

	if !s.DlpJobActive {
		activateJobReq := &dlppb.ActivateJobTriggerRequest{
			Name: s.GcpDlpTriggerName,
		}

		log.Info().Str("triggerID", s.GcpDlpTriggerName).Msg("Activating DLP job...")
		activateRes, err := s.DlpClient.ActivateJobTrigger(ctx, activateJobReq)
		if err != nil {
			if !strings.Contains(err.Error(), "already running") {
				log.Error().Err(err).Msg("DLP job activation failed")
				return err
			}
			s.DlpJobActive = true
			log.Warn().Str("triggerID", s.GcpDlpTriggerName).Msg("Job is already running")

			listReq := &dlppb.ListDlpJobsRequest{
				Parent: fmt.Sprintf("projects/%s", s.GcpBillingProject),
			}
			for resp, err := range s.DlpClient.ListDlpJobs(ctx, listReq).All() {
				if err != nil {
					break
				}
				if resp.GetJobTriggerName() == s.GcpDlpTriggerName && resp.GetState() == dlppb.DlpJob_ACTIVE {
					s.GcpDlpTriggerJobId = resp.GetName()
					log.Warn().Str("jobID", s.GcpDlpTriggerJobId).Msg("Found existing active job in ACTIVE state")
				}
			}
		} else {
			s.GcpDlpTriggerJobId = activateRes.Name
			log.Info().Str("jobID", s.GcpDlpTriggerJobId).Msg("DLP trigger job activated")

			s.DlpJobActive = true
		}
	}

	req := &dlppb.HybridInspectJobTriggerRequest{
		Name:       s.GcpDlpTriggerName,
		HybridItem: hybridContentItem,
	}

	// Send the hybrid inspect request.
	_, err = s.DlpClient.HybridInspectJobTrigger(ctx, req)
	if err != nil {
		return err
	}

	s.Cache.Set(hashSum, true)

	return nil
}