func()

in dth/job.go [129:180]


func (f *Finder) Run(ctx context.Context) {

	if !f.sqs.IsQueueEmpty(ctx) {
		log.Fatalf("Queue might not be empty or Unknown error... Please try again later")
	}

	// Maximum number of queued batches to be sent to SQS
	var bufferSize int = 500

	// Assume sending messages is slower than listing and comparing
	// Create a channel to block the process not to generate too many messages to be sent.
	batchCh := make(chan struct{}, bufferSize)

	// Channel to buffer the messages
	msgCh := make(chan *string, bufferSize*f.cfg.MessageBatchSize)

	// Maximum number of finder threads in parallel
	// Create a channel to block
	// Note that bigger number needs more memory
	compareCh := make(chan struct{}, f.cfg.FinderNumber)

	var prefixes []*string
	log.Printf("Prefix List File: %s", f.cfg.SrcPrefixList)

	if len(f.cfg.SrcPrefixList) > 0 {
		prefixes = f.srcClient.ListSelectedPrefixes(ctx, &f.cfg.SrcPrefixList)
	} else {
		prefixes = f.srcClient.ListCommonPrefixes(ctx, f.cfg.FinderDepth, f.cfg.MaxKeys)
	}
	var wg sync.WaitGroup

	start := time.Now()

	for _, p := range prefixes {
		compareCh <- struct{}{}
		log.Printf("prefix: %s", *p)
		wg.Add(1)
		if f.cfg.SkipCompare {
			go f.directSend(ctx, p, batchCh, msgCh, compareCh, &wg)
		} else {
			go f.compareAndSend(ctx, p, batchCh, msgCh, compareCh, &wg)
		}
	}
	wg.Wait()

	close(batchCh)
	close(msgCh)
	close(compareCh)

	end := time.Since(start)
	log.Printf("Finder Job Completed in %v\n", end)
}