in dth/job.go [129:180]
func (f *Finder) Run(ctx context.Context) {
if !f.sqs.IsQueueEmpty(ctx) {
log.Fatalf("Queue might not be empty or Unknown error... Please try again later")
}
// Maximum number of queued batches to be sent to SQS
var bufferSize int = 500
// Assume sending messages is slower than listing and comparing
// Create a channel to block the process not to generate too many messages to be sent.
batchCh := make(chan struct{}, bufferSize)
// Channel to buffer the messages
msgCh := make(chan *string, bufferSize*f.cfg.MessageBatchSize)
// Maximum number of finder threads in parallel
// Create a channel to block
// Note that bigger number needs more memory
compareCh := make(chan struct{}, f.cfg.FinderNumber)
var prefixes []*string
log.Printf("Prefix List File: %s", f.cfg.SrcPrefixList)
if len(f.cfg.SrcPrefixList) > 0 {
prefixes = f.srcClient.ListSelectedPrefixes(ctx, &f.cfg.SrcPrefixList)
} else {
prefixes = f.srcClient.ListCommonPrefixes(ctx, f.cfg.FinderDepth, f.cfg.MaxKeys)
}
var wg sync.WaitGroup
start := time.Now()
for _, p := range prefixes {
compareCh <- struct{}{}
log.Printf("prefix: %s", *p)
wg.Add(1)
if f.cfg.SkipCompare {
go f.directSend(ctx, p, batchCh, msgCh, compareCh, &wg)
} else {
go f.compareAndSend(ctx, p, batchCh, msgCh, compareCh, &wg)
}
}
wg.Wait()
close(batchCh)
close(msgCh)
close(compareCh)
end := time.Since(start)
log.Printf("Finder Job Completed in %v\n", end)
}