func()

in dth/job.go [310:395]


func (f *Finder) directSend(ctx context.Context, prefix *string, batchCh chan struct{}, msgCh chan *string, compareCh chan struct{}, wg *sync.WaitGroup) {
	defer wg.Done()

	log.Printf("Scanning prefix /%s\n", *prefix)

	token := ""
	i, j := 0, 0
	retry := 0
	// batch := make([]*string, f.cfg.MessageBatchSize)

	log.Printf("Start sending without comparison ...\n")
	// start := time.Now()

	for token != "End" {
		// source := f.getSourceObjects(ctx, &token, prefix)
		source, err := f.srcClient.ListObjects(ctx, &token, prefix, f.cfg.MaxKeys)
		if err != nil {
			log.Printf("Fail to get source list - %s\n", err.Error())
			//
			log.Printf("Sleep for 1 minute and try again...")
			retry++

			if retry <= MaxRetries {
				time.Sleep(time.Minute * 1)
				continue
			} else {
				log.Printf("Still unable to list source list after %d retries\n", MaxRetries)
				// Log the last token and exit
				log.Fatalf("The last token is %s\n", token)
			}

		}

		// if a successful list, reset to 0
		retry = 0

		for _, obj := range source {
			// TODO: Check if there is another way to compare
			// Currently, map is used to search if such object exists in target
			msgCh <- obj.toString()
			i++
			if i%f.cfg.MessageBatchSize == 0 {
				wg.Add(1)
				j++
				if j%100 == 0 {
					log.Printf("Found %d batches in prefix /%s\n", j, *prefix)
				}
				batchCh <- struct{}{}

				// start a go routine to send messages in batch
				go func(i int) {
					defer wg.Done()
					batch := make([]*string, i)
					for a := 0; a < i; a++ {
						batch[a] = <-msgCh
					}

					f.sqs.SendMessageInBatch(ctx, batch)
					<-batchCh
				}(f.cfg.MessageBatchSize)
				i = 0
			}
		}
	}
	// For remainning objects.
	if i != 0 {
		j++
		wg.Add(1)
		batchCh <- struct{}{}
		go func(i int) {
			defer wg.Done()
			batch := make([]*string, i)
			for a := 0; a < i; a++ {
				batch[a] = <-msgCh
			}

			f.sqs.SendMessageInBatch(ctx, batch)
			<-batchCh
		}(i)
	}

	// end := time.Since(start)
	// log.Printf("Compared and Sent %d batches in %v", j, end)
	log.Printf("Completed in prefix /%s, found %d batches in total", *prefix, j)
	<-compareCh
}