func()

in e2etest/newe2e_resource_managers_blob.go [470:630]


func (b *BlobObjectResourceManager) CreateWithOptions(a Asserter, body ObjectContentContainer, properties ObjectProperties, options *BlobObjectCreateOptions) {
	a.HelperMarker().Helper()
	opts := DerefOrZero(options)
	blobProps := properties.BlobProperties

	copyMeta := func() common.Metadata {
		out := make(common.Metadata)

		for k, v := range properties.Metadata {
			out[k] = pointerTo(*v) // deep copy props
		}

		return out
	}

	switch b.entityType {
	case common.EEntityType.Folder():
		// Override body; must be empty
		body = NewZeroObjectContentContainer(0)

		// Set folder meta
		properties.Metadata = copyMeta()
		properties.Metadata[common.POSIXFolderMeta] = pointerTo("true")

		// Override blob type
		properties.BlobProperties.Type = pointerTo(blob.BlobTypeBlockBlob)
	case common.EEntityType.Symlink():
		// body should already be path

		// Set symlink meta
		properties.Metadata = copyMeta()
		properties.Metadata[common.POSIXSymlinkMeta] = pointerTo("true")

		// Override blob type
		properties.BlobProperties.Type = pointerTo(blob.BlobTypeBlockBlob)
	case common.EEntityType.File(): // no-op
	}

	switch DerefOrZero(blobProps.Type) {
	case "", blob.BlobTypeBlockBlob:
		blockSize := DerefOrDefault(opts.BlockSize, common.DefaultBlockBlobBlockSize)
		bodySize := body.Size()

		if bodySize < blockSize*common.MaxNumberOfBlocksPerBlob {
			// resize until fits
			for ; bodySize >= common.MaxNumberOfBlocksPerBlob*blockSize; blockSize = 2 * blockSize {
			}
		}

		_, err := b.Container.InternalClient.NewBlockBlobClient(b.Path).UploadStream(ctx, body.Reader(), &blockblob.UploadStreamOptions{
			BlockSize:               blockSize,
			Concurrency:             runtime.NumCPU(),
			TransactionalValidation: blob.TransferValidationTypeComputeCRC64(),
			HTTPHeaders:             properties.HTTPHeaders.ToBlob(),
			Metadata:                properties.Metadata,
			AccessTier:              blobProps.BlockBlobAccessTier,
			Tags:                    blobProps.Tags,
			CPKInfo:                 opts.CpkOptions.GetCPKInfo(),
			CPKScopeInfo:            opts.CpkOptions.GetCPKScopeInfo(),
		})
		a.NoError("Block blob upload", err)
	case blob.BlobTypePageBlob:
		// TODO : Investigate bug in multistep uploader for PageBlob. (WI 28334208)
		client := b.Container.InternalClient.NewPageBlobClient(b.Path)
		blockSize := DerefOrDefault(opts.BlockSize, common.DefaultPageBlobChunkSize)
		size := body.Size()
		_, err := client.Create(
			ctx,
			size,
			&pageblob.CreateOptions{
				Tags:         blobProps.Tags,
				Metadata:     properties.Metadata,
				Tier:         blobProps.PageBlobAccessTier,
				HTTPHeaders:  properties.HTTPHeaders.ToBlob(),
				CPKInfo:      opts.CpkOptions.GetCPKInfo(),
				CPKScopeInfo: opts.CpkOptions.GetCPKScopeInfo(),
			})
		a.NoError("Page blob create", err)

		msu := &MultiStepUploader{BlockSize: blockSize}
		blockCount := msu.GetBlockCount(size)
		reader := body.Reader()

		offset := int64(0)
		blockIndex := int64(0)

		for range blockCount {
			buf := make([]byte, blockSize)
			n, err := reader.Read(buf)
			if err != nil && err != io.EOF {
				a.Assert(fmt.Sprintf("failed to read content (offset %d (block %d/%d), total %d): %s", offset, blockIndex, blockCount, size, err.Error()), Equal{}, true)
			}
			buf = buf[:n] // reduce buffer size for block

			_, err = client.UploadPages(
				ctx,
				streaming.NopCloser(bytes.NewReader(buf)),
				blob.HTTPRange{Offset: offset, Count: int64(n)},
				&pageblob.UploadPagesOptions{
					TransactionalValidation: blob.TransferValidationTypeComputeCRC64(),
					CPKInfo:                 opts.CpkOptions.GetCPKInfo(),
					CPKScopeInfo:            opts.CpkOptions.GetCPKScopeInfo(),
				})
			a.NoError("Page blob upload", err)
			offset += int64(n)
			blockIndex++
		}
	case blob.BlobTypeAppendBlob:
		// TODO : Investigate bug in multistep uploader for AppendBlob. (WI 28334208)
		blockSize := DerefOrDefault(opts.BlockSize, common.DefaultBlockBlobBlockSize)
		size := body.Size()

		if size < blockSize*common.MaxNumberOfBlocksPerBlob {
			// resize until fits
			for ; size >= common.MaxNumberOfBlocksPerBlob*blockSize; blockSize = 2 * blockSize {
			}
		}

		client := b.Container.InternalClient.NewAppendBlobClient(b.Path)

		_, err := client.Create(ctx, &appendblob.CreateOptions{
			HTTPHeaders:  properties.HTTPHeaders.ToBlob(),
			CPKInfo:      opts.CpkOptions.GetCPKInfo(),
			CPKScopeInfo: opts.CpkOptions.GetCPKScopeInfo(),
			Tags:         blobProps.Tags,
			Metadata:     properties.Metadata,
		})
		a.NoError("Append blob create", err)

		msu := &MultiStepUploader{BlockSize: blockSize}
		blockCount := msu.GetBlockCount(size)
		reader := body.Reader()

		offset := int64(0)
		blockIndex := int64(0)

		for range blockCount {
			buf := make([]byte, blockSize)
			n, err := reader.Read(buf)
			if err != nil && err != io.EOF {
				a.Assert(fmt.Sprintf("failed to read content (offset %d (block %d/%d), total %d): %s", offset, blockIndex, blockCount, size, err.Error()), Equal{}, true)
			}
			buf = buf[:n] // reduce buffer size for block

			_, err = client.AppendBlock(ctx, streaming.NopCloser(bytes.NewReader(buf)), &appendblob.AppendBlockOptions{
				TransactionalValidation: blob.TransferValidationTypeComputeCRC64(),
				AppendPositionAccessConditions: &appendblob.AppendPositionAccessConditions{
					AppendPosition: pointerTo(offset),
					MaxSize:        pointerTo(offset + int64(n)),
				},
				CPKInfo:      opts.CpkOptions.GetCPKInfo(),
				CPKScopeInfo: opts.CpkOptions.GetCPKScopeInfo(),
			})
			a.NoError("Append blob upload", err)
			offset += int64(n)
			blockIndex++
		}
	}

	TrackResourceCreation(a, b)
}