func newLayerWriter()

in ecr/layer_writer.go [53:135]


func newLayerWriter(base *ecrBase, tracker docker.StatusTracker, ref string, desc ocispec.Descriptor) (content.Writer, error) {
	ctx, cancel := context.WithCancel(context.Background())
	ctx = log.WithLogger(ctx, log.G(ctx).WithField("desc", desc))
	reader, writer := io.Pipe()
	lw := &layerWriter{
		ctx:     ctx,
		base:    base,
		desc:    desc,
		buf:     writer,
		tracker: tracker,
		ref:     ref,
		err:     make(chan error),
	}

	// call InitiateLayerUpload and get upload ID
	initiateLayerUploadInput := &ecr.InitiateLayerUploadInput{
		RegistryId:     aws.String(base.ecrSpec.Registry()),
		RepositoryName: aws.String(base.ecrSpec.Repository),
	}
	initiateLayerUploadOutput, err := base.client.InitiateLayerUpload(initiateLayerUploadInput)
	if err != nil {
		cancel()
		return nil, err
	}
	lw.uploadID = aws.StringValue(initiateLayerUploadOutput.UploadId)
	partSize := aws.Int64Value(initiateLayerUploadOutput.PartSize)
	log.G(ctx).
		WithField("digest", desc.Digest.String()).
		WithField("uploadID", lw.uploadID).
		WithField("partSize", partSize).
		Debug("ecr.blob.init")

	go func() {
		defer cancel()
		defer close(lw.err)
		_, err := stream.ChunkedProcessor(reader, partSize, layerQueueSize,
			func(layerChunk *stream.Chunk) error {
				begin := layerChunk.BytesBegin
				end := layerChunk.BytesEnd
				bytesRead := end - begin
				log.G(ctx).
					WithField("digest", desc.Digest.String()).
					WithField("part", layerChunk.Part).
					WithField("begin", begin).
					WithField("end", end).
					WithField("bytes", bytesRead).
					Debug("ecr.layer.callback")

				uploadLayerPartInput := &ecr.UploadLayerPartInput{
					RegistryId:     aws.String(base.ecrSpec.Registry()),
					RepositoryName: aws.String(base.ecrSpec.Repository),
					UploadId:       aws.String(lw.uploadID),
					PartFirstByte:  aws.Int64(begin),
					PartLastByte:   aws.Int64(end),
					LayerPartBlob:  layerChunk.Bytes,
				}

				_, err := base.client.UploadLayerPart(uploadLayerPartInput)
				log.G(ctx).
					WithField("digest", desc.Digest.String()).
					WithField("part", layerChunk.Part).
					WithField("begin", begin).
					WithField("end", end).
					WithField("bytes", bytesRead).
					Debug("ecr.layer.callback end")
				if err == nil {
					var status docker.Status
					status, err = lw.tracker.GetStatus(lw.ref)
					if err == nil {
						status.Offset += int64(bytesRead) + 1
						status.UpdatedAt = time.Now()
						lw.tracker.SetStatus(lw.ref, status)
					}
				}
				return err
			})
		if err != nil {
			lw.err <- err
		}
		log.G(ctx).WithField("digest", desc.Digest.String()).Debug("ecr.layer upload done")
	}()
	return lw, nil
}