in dth/job.go [310:395]
func (f *Finder) directSend(ctx context.Context, prefix *string, batchCh chan struct{}, msgCh chan *string, compareCh chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
log.Printf("Scanning prefix /%s\n", *prefix)
token := ""
i, j := 0, 0
retry := 0
// batch := make([]*string, f.cfg.MessageBatchSize)
log.Printf("Start sending without comparison ...\n")
// start := time.Now()
for token != "End" {
// source := f.getSourceObjects(ctx, &token, prefix)
source, err := f.srcClient.ListObjects(ctx, &token, prefix, f.cfg.MaxKeys)
if err != nil {
log.Printf("Fail to get source list - %s\n", err.Error())
//
log.Printf("Sleep for 1 minute and try again...")
retry++
if retry <= MaxRetries {
time.Sleep(time.Minute * 1)
continue
} else {
log.Printf("Still unable to list source list after %d retries\n", MaxRetries)
// Log the last token and exit
log.Fatalf("The last token is %s\n", token)
}
}
// if a successful list, reset to 0
retry = 0
for _, obj := range source {
// TODO: Check if there is another way to compare
// Currently, map is used to search if such object exists in target
msgCh <- obj.toString()
i++
if i%f.cfg.MessageBatchSize == 0 {
wg.Add(1)
j++
if j%100 == 0 {
log.Printf("Found %d batches in prefix /%s\n", j, *prefix)
}
batchCh <- struct{}{}
// start a go routine to send messages in batch
go func(i int) {
defer wg.Done()
batch := make([]*string, i)
for a := 0; a < i; a++ {
batch[a] = <-msgCh
}
f.sqs.SendMessageInBatch(ctx, batch)
<-batchCh
}(f.cfg.MessageBatchSize)
i = 0
}
}
}
// For remainning objects.
if i != 0 {
j++
wg.Add(1)
batchCh <- struct{}{}
go func(i int) {
defer wg.Done()
batch := make([]*string, i)
for a := 0; a < i; a++ {
batch[a] = <-msgCh
}
f.sqs.SendMessageInBatch(ctx, batch)
<-batchCh
}(i)
}
// end := time.Since(start)
// log.Printf("Compared and Sent %d batches in %v", j, end)
log.Printf("Completed in prefix /%s, found %d batches in total", *prefix, j)
<-compareCh
}