in dth/job.go [425:469]
func (w *Worker) Run(ctx context.Context) {
// log.Println("Start Worker Job...")
buffer := w.cfg.WorkerNumber
if buffer <= 0 {
buffer = 1 // Minimum 1
}
if buffer > 100 {
buffer = 100 // Maximum 100
}
// A channel to block number of messages to be processed
// Buffer size is cfg.WorkerNumber
processCh := make(chan struct{}, buffer)
// Channel to block number of objects/parts to be processed.
// Buffer size is cfg.WorkerNumber * 2 - 1 (More buffer for multipart upload)
transferCh := make(chan struct{}, buffer*2-1)
for {
msg, rh := w.sqs.ReceiveMessages(ctx)
if msg == nil {
log.Println("No messages, sleep...")
time.Sleep(time.Second * 60)
continue
}
obj, action := w.processMessage(ctx, msg, rh)
if obj == nil { // Empty message
continue
}
destKey := appendPrefix(&obj.Key, &w.cfg.DestPrefix)
if action == Transfer {
processCh <- struct{}{}
go w.startMigration(ctx, obj, rh, destKey, transferCh, processCh)
}
if action == Delete {
processCh <- struct{}{}
go w.startDelete(ctx, obj, rh, destKey, processCh)
}
}
}