in vhdUploadCmdHandler.go [21:176]
func vhdUploadCmdHandler() cli.Command {
return cli.Command{
Name: "upload",
Usage: "Upload a local VHD to Azure storage as page blob",
Flags: []cli.Flag{
cli.StringFlag{
Name: "localvhdpath",
Usage: "Path to source VHD in the local machine.",
},
cli.StringFlag{
Name: "stgaccountname",
Usage: "Azure storage account name.",
},
cli.StringFlag{
Name: "stgaccountkey",
Usage: "Azure storage account key.",
},
cli.StringFlag{
Name: "containername",
Usage: "Name of the container holding destination page blob. (Default: vhds)",
},
cli.StringFlag{
Name: "blobname",
Usage: "Name of the destination page blob.",
},
cli.StringFlag{
Name: "parallelism",
Usage: "Number of concurrent goroutines to be used for upload",
},
cli.BoolFlag{
Name: "overwrite",
Usage: "Overwrite the blob if already exists.",
},
},
Action: func(c *cli.Context) error {
const PageBlobPageSize int64 = 2 * 1024 * 1024
localVHDPath := c.String("localvhdpath")
if localVHDPath == "" {
return errors.New("Missing required argument --localvhdpath")
}
stgAccountName := c.String("stgaccountname")
if stgAccountName == "" {
return errors.New("Missing required argument --stgaccountname")
}
stgAccountKey := c.String("stgaccountkey")
if stgAccountKey == "" {
return errors.New("Missing required argument --stgaccountkey")
}
containerName := c.String("containername")
if containerName == "" {
containerName = "vhds"
log.Println("Using default container 'vhds'")
}
blobName := c.String("blobname")
if blobName == "" {
return errors.New("Missing required argument --blobname")
}
if !strings.HasSuffix(strings.ToLower(blobName), ".vhd") {
blobName = blobName + ".vhd"
}
parallelism := int(0)
if c.IsSet("parallelism") {
p, err := strconv.ParseUint(c.String("parallelism"), 10, 32)
if err != nil {
return fmt.Errorf("invalid index value --parallelism: %s", err)
}
parallelism = int(p)
} else {
parallelism = 8 * runtime.NumCPU()
log.Printf("Using default parallelism [8*NumCPU] : %d\n", parallelism)
}
overwrite := c.IsSet("overwrite")
ensureVHDSanity(localVHDPath)
diskStream, err := diskstream.CreateNewDiskStream(localVHDPath)
if err != nil {
return err
}
defer diskStream.Close()
storageClient, err := storage.NewBasicClient(stgAccountName, stgAccountKey)
if err != nil {
return err
}
blobServiceClient := storageClient.GetBlobService()
if _, err = blobServiceClient.CreateContainerIfNotExists(containerName, storage.ContainerAccessTypePrivate); err != nil {
return err
}
blobExists, err := blobServiceClient.BlobExists(containerName, blobName)
if err != nil {
return err
}
resume := false
var blobMetaData *metadata.MetaData
if blobExists {
if !overwrite {
blobMetaData = getBlobMetaData(blobServiceClient, containerName, blobName)
resume = true
log.Printf("Blob with name '%s' already exists, checking upload can be resumed\n", blobName)
}
}
localMetaData := getLocalVHDMetaData(localVHDPath)
var rangesToSkip []*common.IndexRange
if resume {
if errs := metadata.CompareMetaData(blobMetaData, localMetaData); len(errs) != 0 {
printErrorsAndFatal(errs)
}
rangesToSkip = getAlreadyUploadedBlobRanges(blobServiceClient, containerName, blobName)
} else {
createBlob(blobServiceClient, containerName, blobName, diskStream.GetSize(), localMetaData)
}
uploadableRanges, err := upload.LocateUploadableRanges(diskStream, rangesToSkip, PageBlobPageSize)
if err != nil {
return err
}
uploadableRanges, err = upload.DetectEmptyRanges(diskStream, uploadableRanges)
if err != nil {
return err
}
cxt := &upload.DiskUploadContext{
VhdStream: diskStream,
UploadableRanges: uploadableRanges,
AlreadyProcessedBytes: common.TotalRangeLength(rangesToSkip),
BlobServiceClient: blobServiceClient,
ContainerName: containerName,
BlobName: blobName,
Parallelism: parallelism,
Resume: resume,
MD5Hash: localMetaData.FileMetaData.MD5Hash,
}
err = upload.Upload(cxt)
if err != nil {
return err
}
setBlobMD5Hash(blobServiceClient, containerName, blobName, localMetaData)
fmt.Println("\nUpload completed")
return nil
},
}
}