in component/azstorage/block_blob.go [1042:1122]
func (bb *BlockBlob) WriteFromFile(name string, metadata map[string]*string, fi *os.File) (err error) {
log.Trace("BlockBlob::WriteFromFile : name %s", name)
//defer exectime.StatTimeCurrentBlock("WriteFromFile::WriteFromFile")()
blobClient := bb.Container.NewBlockBlobClient(filepath.Join(bb.Config.prefixPath, name))
defer log.TimeTrack(time.Now(), "BlockBlob::WriteFromFile", name)
uploadPtr := to.Ptr(int64(1))
blockSize := bb.Config.blockSize
// get the size of the file
stat, err := fi.Stat()
if err != nil {
log.Err("BlockBlob::WriteFromFile : Failed to get file size %s [%s]", name, err.Error())
return err
}
// if the block size is not set then we configure it based on file size
if blockSize == 0 {
// based on file-size calculate block size
blockSize, err = bb.calculateBlockSize(name, stat.Size())
if err != nil {
return err
}
}
// Compute md5 of this file is requested by user
// If file is uploaded in one shot (no blocks created) then server is populating md5 on upload automatically.
// hence we take cost of calculating md5 only for files which are bigger in size and which will be converted to blocks.
md5sum := []byte{}
if bb.Config.updateMD5 && stat.Size() >= blockblob.MaxUploadBlobBytes {
md5sum, err = common.GetMD5(fi)
if err != nil {
// Md5 sum generation failed so set nil while uploading
log.Warn("BlockBlob::WriteFromFile : Failed to generate md5 of %s", name)
md5sum = []byte{0}
}
}
uploadOptions := &blockblob.UploadFileOptions{
BlockSize: blockSize,
Concurrency: bb.Config.maxConcurrency,
Metadata: metadata,
AccessTier: bb.Config.defaultTier,
HTTPHeaders: &blob.HTTPHeaders{
BlobContentType: to.Ptr(getContentType(name)),
BlobContentMD5: md5sum,
},
CPKInfo: bb.blobCPKOpt,
}
if common.MonitorBfs() && stat.Size() > 0 {
uploadOptions.Progress = func(bytesTransferred int64) {
trackUpload(name, bytesTransferred, stat.Size(), uploadPtr)
}
}
_, err = blobClient.UploadFile(context.Background(), fi, uploadOptions)
if err != nil {
serr := storeBlobErrToErr(err)
if serr == BlobIsUnderLease {
log.Err("BlockBlob::WriteFromFile : %s is under a lease, can not update file [%s]", name, err.Error())
return syscall.EIO
} else if serr == InvalidPermission {
log.Err("BlockBlob::WriteFromFile : Insufficient permissions for %s [%s]", name, err.Error())
return syscall.EACCES
} else {
log.Err("BlockBlob::WriteFromFile : Failed to upload blob %s [%s]", name, err.Error())
}
return err
} else {
log.Debug("BlockBlob::WriteFromFile : Upload complete of blob %v", name)
// store total bytes uploaded so far
if stat.Size() > 0 {
azStatsCollector.UpdateStats(stats_manager.Increment, bytesUploaded, stat.Size())
}
}
return nil
}