func()

in go/pkg/client/cas.go [383:481]


func (c *Client) uploadNonUnified(ctx context.Context, data ...*uploadinfo.Entry) ([]digest.Digest, int64, error) {
	var dgs []digest.Digest
	ueList := make(map[digest.Digest]*uploadinfo.Entry)
	for _, ue := range data {
		dg := ue.Digest
		if dg.IsEmpty() {
			LogContextInfof(ctx, log.Level(2), "Skipping upload of empty blob %s", dg)
			continue
		}
		if _, ok := ueList[dg]; !ok {
			dgs = append(dgs, dg)
			ueList[dg] = ue
		}
	}

	missing, err := c.MissingBlobs(ctx, dgs)
	if err != nil {
		return nil, 0, err
	}
	LogContextInfof(ctx, log.Level(2), "%d items to store", len(missing))
	var batches [][]digest.Digest
	if c.useBatchOps {
		batches = c.makeBatches(ctx, missing, true)
	} else {
		LogContextInfof(ctx, log.Level(2), "Uploading them individually")
		for i := range missing {
			LogContextInfof(ctx, log.Level(3), "Creating single batch of blob %s", missing[i])
			batches = append(batches, missing[i:i+1])
		}
	}

	totalBytesTransferred := int64(0)

	eg, eCtx := errgroup.WithContext(ctx)
	for i, batch := range batches {
		i, batch := i, batch // https://golang.org/doc/faq#closures_and_goroutines
		eg.Go(func() error {
			if err := c.casUploaders.Acquire(eCtx, 1); err != nil {
				return err
			}
			defer c.casUploaders.Release(1)
			if i%logInterval == 0 {
				LogContextInfof(ctx, log.Level(2), "%d batches left to store", len(batches)-i)
			}
			if len(batch) > 1 {
				LogContextInfof(ctx, log.Level(3), "Uploading batch of %d blobs", len(batch))
				bchMap := make(map[digest.Digest][]byte)
				for _, dg := range batch {
					ue := ueList[dg]
					ch, err := chunker.New(ue, false, int(c.ChunkMaxSize))
					if err != nil {
						return err
					}

					data, err := ch.FullData()
					if err != nil {
						return err
					}

					if dg.Size != int64(len(data)) {
						return errors.Errorf("blob size changed while uploading, given:%d now:%d for %s", dg.Size, int64(len(data)), ue.Path)
					}

					bchMap[dg] = data
					atomic.AddInt64(&totalBytesTransferred, int64(len(data)))
				}
				if err := c.BatchWriteBlobs(eCtx, bchMap); err != nil {
					return err
				}
			} else {
				LogContextInfof(ctx, log.Level(3), "Uploading single blob with digest %s", batch[0])
				ue := ueList[batch[0]]
				dg := ue.Digest
				ch, err := chunker.New(ue, c.shouldCompress(dg.Size), int(c.ChunkMaxSize))
				if err != nil {
					return err
				}
				written, err := c.writeChunked(eCtx, c.writeRscName(dg), ch)
				if err != nil {
					return fmt.Errorf("failed to upload %s: %w", ue.Path, err)
				}
				atomic.AddInt64(&totalBytesTransferred, written)
			}
			if eCtx.Err() != nil {
				return eCtx.Err()
			}
			return nil
		})
	}

	LogContextInfof(ctx, log.Level(2), "Waiting for remaining jobs")
	err = eg.Wait()
	LogContextInfof(ctx, log.Level(2), "Done")
	if err != nil {
		LogContextInfof(ctx, log.Level(2), "Upload error: %v", err)
	}

	return missing, totalBytesTransferred, err
}