in tools/mongodb-hybrid-dlp/dlpfunction.go [522:722]
func (s *MongoScanner) Process(w http.ResponseWriter) error {
connectCtx, connectCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer connectCancel()
s.connect(connectCtx)
defer s.disconnect(connectCtx)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := s.Client.Ping(ctx, readpref.Primary())
if err != nil {
log.Error().Err(err).Msg("Great difficulties connecting to MongoDB server, probably you got no connectivity or bad connection string")
return err
}
// 16MB cache with LRU eviction policy
if s.Cache == nil {
var cacheSize int = 16 * 1024 * 1024
log.Info().Int("cacheSize", cacheSize).Msgf("Initialized cache: %d MB", cacheSize/(1024*1024))
s.Cache = gocache.NewCache().WithMaxMemoryUsage(cacheSize).WithEvictionPolicy(gocache.LeastRecentlyUsed)
}
errors := make([]chan error, 0)
documents := make(chan MongoChange, 10)
resumeTokens := make([]chan string, 0)
cancels := make([]context.CancelFunc, 0)
ctxs := make([]context.Context, 0)
var wg sync.WaitGroup
var index int = 0
// Load resume tokens
err = s.LoadState()
if err != nil {
log.Warn().Err(err).Msg("Failed to load state from bucket, possibly no state yet.")
err = nil
}
// There is really only one deployment, but we keep the parameters consistent
for _, deployment := range s.Deployments {
errors = append(errors, make(chan error, 1))
resumeTokens = append(resumeTokens, make(chan string, 1))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cancels = append(cancels, cancel)
ctxs = append(ctxs, ctx)
options := options.ChangeStream()
if s.LastChanges[deployment.Source].ResumeToken != "" {
log.Info().Str("resumeToken", s.LastChanges[deployment.Source].ResumeToken).Msg("Using resume token for deployment")
options.SetResumeAfter(bson.M{"_data": s.LastChanges[deployment.Source].ResumeToken})
}
cs, err := s.Client.Watch(ctx, mongo.Pipeline{}, options)
if err != nil {
return err
}
wg.Add(1)
go func(i int) {
defer wg.Done()
base := MongoChange{
Source: deployment.Source,
ConnectionString: s.ConnectionString,
}
s.ProcessChangeStream(ctxs[i], cs, base, documents, resumeTokens[i], errors[i])
}(index)
index += 1
}
// Watch entire databases
for _, database := range s.Databases {
db := s.Client.Database(database.Source.Database)
if err != nil {
return err
}
errors = append(errors, make(chan error, 1))
resumeTokens = append(resumeTokens, make(chan string, 1))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cancels = append(cancels, cancel)
ctxs = append(ctxs, ctx)
options := options.ChangeStream()
if s.LastChanges[database.Source].ResumeToken != "" {
log.Info().Str("resumeToken", s.LastChanges[database.Source].ResumeToken).Msg("Using resume token for database")
options.SetResumeAfter(bson.M{"_data": s.LastChanges[database.Source].ResumeToken})
}
cs, err := db.Watch(ctx, mongo.Pipeline{}, options)
if err != nil {
return err
}
wg.Add(1)
go func(i int) {
defer wg.Done()
base := MongoChange{
Source: database.Source,
ConnectionString: s.ConnectionString,
}
s.ProcessChangeStream(ctxs[i], cs, base, documents, resumeTokens[i], errors[i])
}(index)
index += 1
}
// Watch for specific collections
for _, col := range s.Collections {
db := s.Client.Database(col.Source.Database)
if err != nil {
return err
}
coll := db.Collection(col.Source.Collection)
if err != nil {
return err
}
errors = append(errors, make(chan error, 1))
resumeTokens = append(resumeTokens, make(chan string, 1))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cancels = append(cancels, cancel)
ctxs = append(ctxs, ctx)
options := options.ChangeStream()
if s.LastChanges[col.Source].ResumeToken != "" {
log.Info().Str("resumeToken", s.LastChanges[col.Source].ResumeToken).Msg("Using resume token for collection")
options.SetResumeAfter(bson.M{"_data": s.LastChanges[col.Source].ResumeToken})
}
cs, err := coll.Watch(ctx, mongo.Pipeline{}, options)
if err != nil {
return err
}
wg.Add(1)
go func(i int) {
defer wg.Done()
base := MongoChange{
Source: col.Source,
ConnectionString: s.ConnectionString,
}
s.ProcessChangeStream(ctxs[i], cs, base, documents, resumeTokens[i], errors[i])
}(index)
index += 1
}
inspectErrors := make(chan error, 1)
inspectCtx, inspectCancel := context.WithCancel(context.Background())
wg.Add(1)
go func() {
defer wg.Done()
s.InspectChanges(inspectCtx, documents, inspectErrors)
}()
var sleepCycles int = int(s.RunPeriod / (time.Second * 10))
// We take one 10 second cycle off to save stuff
for i := 0; i < sleepCycles-1; i++ {
time.Sleep(10 * time.Second)
fmt.Fprintf(w, "Still processing, cycle=%d/%d ...\n", i+1, sleepCycles-1)
if flush, ok := w.(http.Flusher); ok {
flush.Flush()
}
}
for i := 0; i < index; i++ {
cancels[i]()
}
inspectCancel()
wg.Wait()
// Same resume tokens
err = s.SaveState()
if err != nil {
log.Error().Err(err).Msg("Failed to save state to bucket!")
return err
}
// We've done our bit here, finish the job and let the next run start another one
if s.DlpClient != nil && s.DlpJobActive {
if s.GcpDlpTriggerJobId != "" {
finishJobReq := &dlppb.FinishDlpJobRequest{
Name: s.GcpDlpTriggerJobId,
}
log.Info().Str("jobID", s.GcpDlpTriggerJobId).Msg("Finishing the DLP job...")
finishCtx, _ := context.WithTimeout(context.Background(), 30*time.Second)
err = s.DlpClient.FinishDlpJob(finishCtx, finishJobReq)
if err != nil {
log.Error().Err(err).Msg("Finishing the DLP job errored out")
return err
}
}
s.DlpClient.Close()
}
return nil
}