func()

in dth/job.go [217:307]


func (f *Finder) compareAndSend(ctx context.Context, prefix *string, batchCh chan struct{}, msgCh chan *string, compareCh chan struct{}, wg *sync.WaitGroup) {
	defer wg.Done()

	log.Printf("Comparing within prefix /%s\n", *prefix)
	target := f.getTargetObjects(ctx, prefix)

	token := ""
	i, j := 0, 0
	retry := 0
	// batch := make([]*string, f.cfg.MessageBatchSize)

	log.Printf("Start comparing and sending...\n")
	// start := time.Now()

	for token != "End" {
		// source := f.getSourceObjects(ctx, &token, prefix)
		source, err := f.srcClient.ListObjects(ctx, &token, prefix, f.cfg.MaxKeys)
		if err != nil {
			log.Printf("Fail to get source list - %s\n", err.Error())
			//
			log.Printf("Sleep for 1 minute and try again...")
			retry++

			if retry <= MaxRetries {
				time.Sleep(time.Minute * 1)
				continue
			} else {
				log.Printf("Still unable to list source list after %d retries\n", MaxRetries)
				// Log the last token and exit
				log.Fatalf("The last token is %s\n", token)
			}

		}

		// if a successful list, reset to 0
		retry = 0

		for _, obj := range source {
			// TODO: Check if there is another way to compare
			// Currently, map is used to search if such object exists in target
			if tsize, found := target[obj.Key]; !found || *tsize != obj.Size {
				// log.Printf("Find a difference %s - %d\n", key, size)
				// batch[i] = obj.toString()
				msgCh <- obj.toString()
				i++
				if i%f.cfg.MessageBatchSize == 0 {
					wg.Add(1)
					j++
					if j%100 == 0 {
						log.Printf("Found %d batches in prefix /%s\n", j, *prefix)
					}
					batchCh <- struct{}{}

					// start a go routine to send messages in batch
					go func(i int) {
						defer wg.Done()
						batch := make([]*string, i)
						for a := 0; a < i; a++ {
							batch[a] = <-msgCh
						}

						f.sqs.SendMessageInBatch(ctx, batch)
						<-batchCh
					}(f.cfg.MessageBatchSize)
					i = 0
				}
			}
		}
	}
	// For remainning objects.
	if i != 0 {
		j++
		wg.Add(1)
		batchCh <- struct{}{}
		go func(i int) {
			defer wg.Done()
			batch := make([]*string, i)
			for a := 0; a < i; a++ {
				batch[a] = <-msgCh
			}

			f.sqs.SendMessageInBatch(ctx, batch)
			<-batchCh
		}(i)
	}

	// end := time.Since(start)
	// log.Printf("Compared and Sent %d batches in %v", j, end)
	log.Printf("Completed in prefix /%s, found %d batches in total", *prefix, j)
	<-compareCh
}