func()

in tools/mongodb-hybrid-dlp/dlpfunction.go [522:722]


func (s *MongoScanner) Process(w http.ResponseWriter) error {
	connectCtx, connectCancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer connectCancel()

	s.connect(connectCtx)
	defer s.disconnect(connectCtx)

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	err := s.Client.Ping(ctx, readpref.Primary())
	if err != nil {
		log.Error().Err(err).Msg("Great difficulties connecting to MongoDB server, probably you got no connectivity or bad connection string")
		return err
	}

	// 16MB cache with LRU eviction policy
	if s.Cache == nil {
		var cacheSize int = 16 * 1024 * 1024
		log.Info().Int("cacheSize", cacheSize).Msgf("Initialized cache: %d MB", cacheSize/(1024*1024))
		s.Cache = gocache.NewCache().WithMaxMemoryUsage(cacheSize).WithEvictionPolicy(gocache.LeastRecentlyUsed)
	}

	errors := make([]chan error, 0)
	documents := make(chan MongoChange, 10)
	resumeTokens := make([]chan string, 0)
	cancels := make([]context.CancelFunc, 0)
	ctxs := make([]context.Context, 0)
	var wg sync.WaitGroup
	var index int = 0

	// Load resume tokens
	err = s.LoadState()
	if err != nil {
		log.Warn().Err(err).Msg("Failed to load state from bucket, possibly no state yet.")
		err = nil
	}

	// There is really only one deployment, but we keep the parameters consistent
	for _, deployment := range s.Deployments {
		errors = append(errors, make(chan error, 1))
		resumeTokens = append(resumeTokens, make(chan string, 1))
		ctx, cancel := context.WithCancel(context.Background())
		defer cancel()
		cancels = append(cancels, cancel)
		ctxs = append(ctxs, ctx)

		options := options.ChangeStream()
		if s.LastChanges[deployment.Source].ResumeToken != "" {
			log.Info().Str("resumeToken", s.LastChanges[deployment.Source].ResumeToken).Msg("Using resume token for deployment")
			options.SetResumeAfter(bson.M{"_data": s.LastChanges[deployment.Source].ResumeToken})
		}

		cs, err := s.Client.Watch(ctx, mongo.Pipeline{}, options)
		if err != nil {
			return err
		}

		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			base := MongoChange{
				Source:           deployment.Source,
				ConnectionString: s.ConnectionString,
			}
			s.ProcessChangeStream(ctxs[i], cs, base, documents, resumeTokens[i], errors[i])
		}(index)
		index += 1
	}

	// Watch entire databases
	for _, database := range s.Databases {
		db := s.Client.Database(database.Source.Database)
		if err != nil {
			return err
		}

		errors = append(errors, make(chan error, 1))
		resumeTokens = append(resumeTokens, make(chan string, 1))
		ctx, cancel := context.WithCancel(context.Background())
		defer cancel()
		cancels = append(cancels, cancel)
		ctxs = append(ctxs, ctx)

		options := options.ChangeStream()
		if s.LastChanges[database.Source].ResumeToken != "" {
			log.Info().Str("resumeToken", s.LastChanges[database.Source].ResumeToken).Msg("Using resume token for database")
			options.SetResumeAfter(bson.M{"_data": s.LastChanges[database.Source].ResumeToken})
		}

		cs, err := db.Watch(ctx, mongo.Pipeline{}, options)
		if err != nil {
			return err
		}

		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			base := MongoChange{
				Source:           database.Source,
				ConnectionString: s.ConnectionString,
			}
			s.ProcessChangeStream(ctxs[i], cs, base, documents, resumeTokens[i], errors[i])
		}(index)
		index += 1
	}

	// Watch for specific collections
	for _, col := range s.Collections {
		db := s.Client.Database(col.Source.Database)
		if err != nil {
			return err
		}

		coll := db.Collection(col.Source.Collection)
		if err != nil {
			return err
		}

		errors = append(errors, make(chan error, 1))
		resumeTokens = append(resumeTokens, make(chan string, 1))
		ctx, cancel := context.WithCancel(context.Background())
		defer cancel()
		cancels = append(cancels, cancel)
		ctxs = append(ctxs, ctx)

		options := options.ChangeStream()
		if s.LastChanges[col.Source].ResumeToken != "" {
			log.Info().Str("resumeToken", s.LastChanges[col.Source].ResumeToken).Msg("Using resume token for collection")
			options.SetResumeAfter(bson.M{"_data": s.LastChanges[col.Source].ResumeToken})
		}

		cs, err := coll.Watch(ctx, mongo.Pipeline{}, options)
		if err != nil {
			return err
		}

		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			base := MongoChange{
				Source:           col.Source,
				ConnectionString: s.ConnectionString,
			}
			s.ProcessChangeStream(ctxs[i], cs, base, documents, resumeTokens[i], errors[i])
		}(index)
		index += 1
	}

	inspectErrors := make(chan error, 1)
	inspectCtx, inspectCancel := context.WithCancel(context.Background())
	wg.Add(1)
	go func() {
		defer wg.Done()
		s.InspectChanges(inspectCtx, documents, inspectErrors)
	}()

	var sleepCycles int = int(s.RunPeriod / (time.Second * 10))
	// We take one 10 second cycle off to save stuff
	for i := 0; i < sleepCycles-1; i++ {
		time.Sleep(10 * time.Second)
		fmt.Fprintf(w, "Still processing, cycle=%d/%d ...\n", i+1, sleepCycles-1)
		if flush, ok := w.(http.Flusher); ok {
			flush.Flush()
		}
	}

	for i := 0; i < index; i++ {
		cancels[i]()
	}
	inspectCancel()
	wg.Wait()

	// Same resume tokens
	err = s.SaveState()
	if err != nil {
		log.Error().Err(err).Msg("Failed to save state to bucket!")
		return err
	}

	// We've done our bit here, finish the job and let the next run start another one
	if s.DlpClient != nil && s.DlpJobActive {
		if s.GcpDlpTriggerJobId != "" {
			finishJobReq := &dlppb.FinishDlpJobRequest{
				Name: s.GcpDlpTriggerJobId,
			}

			log.Info().Str("jobID", s.GcpDlpTriggerJobId).Msg("Finishing the DLP job...")

			finishCtx, _ := context.WithTimeout(context.Background(), 30*time.Second)
			err = s.DlpClient.FinishDlpJob(finishCtx, finishJobReq)
			if err != nil {
				log.Error().Err(err).Msg("Finishing the DLP job errored out")
				return err
			}
		}
		s.DlpClient.Close()
	}

	return nil
}