func()

in dth/job.go [425:469]


func (w *Worker) Run(ctx context.Context) {
	// log.Println("Start Worker Job...")

	buffer := w.cfg.WorkerNumber
	if buffer <= 0 {
		buffer = 1 // Minimum 1
	}
	if buffer > 100 {
		buffer = 100 // Maximum 100
	}

	// A channel to block number of messages to be processed
	// Buffer size is cfg.WorkerNumber
	processCh := make(chan struct{}, buffer)

	// Channel to block number of objects/parts to be processed.
	// Buffer size is cfg.WorkerNumber * 2 - 1 (More buffer for multipart upload)
	transferCh := make(chan struct{}, buffer*2-1)

	for {
		msg, rh := w.sqs.ReceiveMessages(ctx)

		if msg == nil {
			log.Println("No messages, sleep...")
			time.Sleep(time.Second * 60)
			continue
		}

		obj, action := w.processMessage(ctx, msg, rh)
		if obj == nil { // Empty message
			continue
		}

		destKey := appendPrefix(&obj.Key, &w.cfg.DestPrefix)

		if action == Transfer {
			processCh <- struct{}{}
			go w.startMigration(ctx, obj, rh, destKey, transferCh, processCh)
		}
		if action == Delete {
			processCh <- struct{}{}
			go w.startDelete(ctx, obj, rh, destKey, processCh)
		}
	}
}