func()

in dth/job.go [723:781]


func (w *Worker) startMultipartUpload(ctx context.Context, obj *Object, destKey, uploadID *string, parts map[int](*Part), transferCh chan struct{}) ([]*Part, error) {

	totalParts, chunkSize := w.getTotalParts(obj.Size)
	// log.Printf("Total parts are %d for %s\n", totalParts, obj.Key)

	var wg sync.WaitGroup

	partCh := make(chan *Part, totalParts)
	partErrorCh := make(chan error, totalParts) // Capture Errors

	for i := 0; i < totalParts; i++ {
		partNumber := i + 1

		if part, found := parts[partNumber]; found {
			// log.Printf("Part %d found with etag %s, no need to transfer again\n", partNumber, *part.etag)
			// Simply put the part info to the channel
			partCh <- part
		} else {
			// If not, upload the part
			wg.Add(1)
			transferCh <- struct{}{}

			go func(i int) {
				defer wg.Done()
				result := w.transfer(ctx, obj, destKey, int64(i*chunkSize), int64(chunkSize), uploadID, partNumber, nil)

				if result.err != nil {
					partErrorCh <- result.err
				} else {
					part := &Part{
						partNumber: i + 1,
						etag:       result.etag,
					}
					partCh <- part
				}

				<-transferCh
			}(i)
		}
	}

	wg.Wait()
	close(partErrorCh)
	close(partCh)

	for err := range partErrorCh {
		// returned when at least 1 error
		return nil, err
	}

	allParts := make([]*Part, totalParts)
	for i := 0; i < totalParts; i++ {
		// The list of parts must be in ascending order
		p := <-partCh
		allParts[p.partNumber-1] = p
	}

	return allParts, nil
}