in ecr/layer_writer.go [53:135]
func newLayerWriter(base *ecrBase, tracker docker.StatusTracker, ref string, desc ocispec.Descriptor) (content.Writer, error) {
ctx, cancel := context.WithCancel(context.Background())
ctx = log.WithLogger(ctx, log.G(ctx).WithField("desc", desc))
reader, writer := io.Pipe()
lw := &layerWriter{
ctx: ctx,
base: base,
desc: desc,
buf: writer,
tracker: tracker,
ref: ref,
err: make(chan error),
}
// call InitiateLayerUpload and get upload ID
initiateLayerUploadInput := &ecr.InitiateLayerUploadInput{
RegistryId: aws.String(base.ecrSpec.Registry()),
RepositoryName: aws.String(base.ecrSpec.Repository),
}
initiateLayerUploadOutput, err := base.client.InitiateLayerUpload(initiateLayerUploadInput)
if err != nil {
cancel()
return nil, err
}
lw.uploadID = aws.StringValue(initiateLayerUploadOutput.UploadId)
partSize := aws.Int64Value(initiateLayerUploadOutput.PartSize)
log.G(ctx).
WithField("digest", desc.Digest.String()).
WithField("uploadID", lw.uploadID).
WithField("partSize", partSize).
Debug("ecr.blob.init")
go func() {
defer cancel()
defer close(lw.err)
_, err := stream.ChunkedProcessor(reader, partSize, layerQueueSize,
func(layerChunk *stream.Chunk) error {
begin := layerChunk.BytesBegin
end := layerChunk.BytesEnd
bytesRead := end - begin
log.G(ctx).
WithField("digest", desc.Digest.String()).
WithField("part", layerChunk.Part).
WithField("begin", begin).
WithField("end", end).
WithField("bytes", bytesRead).
Debug("ecr.layer.callback")
uploadLayerPartInput := &ecr.UploadLayerPartInput{
RegistryId: aws.String(base.ecrSpec.Registry()),
RepositoryName: aws.String(base.ecrSpec.Repository),
UploadId: aws.String(lw.uploadID),
PartFirstByte: aws.Int64(begin),
PartLastByte: aws.Int64(end),
LayerPartBlob: layerChunk.Bytes,
}
_, err := base.client.UploadLayerPart(uploadLayerPartInput)
log.G(ctx).
WithField("digest", desc.Digest.String()).
WithField("part", layerChunk.Part).
WithField("begin", begin).
WithField("end", end).
WithField("bytes", bytesRead).
Debug("ecr.layer.callback end")
if err == nil {
var status docker.Status
status, err = lw.tracker.GetStatus(lw.ref)
if err == nil {
status.Offset += int64(bytesRead) + 1
status.UpdatedAt = time.Now()
lw.tracker.SetStatus(lw.ref, status)
}
}
return err
})
if err != nil {
lw.err <- err
}
log.G(ctx).WithField("digest", desc.Digest.String()).Debug("ecr.layer upload done")
}()
return lw, nil
}