ste/xfer-setProperties.go (177 lines of code) (raw):

package ste import ( "errors" "fmt" "net/http" "strings" "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/file" "github.com/Azure/azure-storage-azcopy/v10/common" ) func SetProperties(jptm IJobPartTransferMgr, _ pacer) { // If the transfer was cancelled, then reporting transfer as done and increasing the bytes transferred by the size of the source. if jptm.WasCanceled() { jptm.ReportTransferDone() return } // schedule the work as a chunk, so it will run on the main goroutine pool, instead of the // smaller "transfer initiation pool", where this code runs. id := common.NewChunkID(jptm.Info().Source, 0, 0) cf := createChunkFunc(true, jptm, id, func() { to := jptm.FromTo() switch to.From() { case common.ELocation.Blob(): setPropertiesBlob(jptm) case common.ELocation.BlobFS(): setPropertiesBlobFS(jptm) case common.ELocation.File(): setPropertiesFile(jptm) default: panic("Attempting set-properties on invalid location: " + to.From().String()) } }) jptm.ScheduleChunks(cf) } func setPropertiesBlob(jptm IJobPartTransferMgr) { info := jptm.Info() // Internal function which checks the transfer status and logs the msg respectively. // Sets the transfer status and Reports Transfer as Done. // Internal function is created to avoid redundancy of the above steps from several places in the api. transferDone := func(status common.TransferStatus, err error) { if status == common.ETransferStatus.Failed() { jptm.LogError(info.Source, "SET-PROPERTIES FAILED with error: ", err) } else { jptm.Log(common.LogInfo, fmt.Sprintf("SET-PROPERTIES SUCCESSFUL: %s", strings.Split(info.Destination, "?")[0])) } jptm.SetStatus(status) jptm.ResetSourceSize() // sets source size to 0 (made to be used by setProperties command to make number of bytes transferred = 0) jptm.ReportTransferDone() } bsc, err := jptm.SrcServiceClient().BlobServiceClient() if err != nil { transferDone(common.ETransferStatus.Failed(), err) return } srcBlobClient := bsc.NewContainerClient(jptm.Info().SrcContainer).NewBlobClient(info.SrcFilePath) PropertiesToTransfer := jptm.PropertiesToTransfer() _, metadata, blobTags, _ := jptm.ResourceDstData(nil) if PropertiesToTransfer.ShouldTransferTier() { rehydratePriority := info.RehydratePriority blockBlobTier, pageBlobTier := jptm.BlobTiers() var err error = nil if jptm.Info().SrcBlobType == blob.BlobTypeBlockBlob && blockBlobTier != common.EBlockBlobTier.None() && ValidateTier(jptm, to.Ptr(blockBlobTier.ToAccessTierType()), srcBlobClient, jptm.Context(), true) { _, err = srcBlobClient.SetTier(jptm.Context(), blockBlobTier.ToAccessTierType(), &blob.SetTierOptions{RehydratePriority: &rehydratePriority}) } // cannot return true for >1, therefore only one of these will run if jptm.Info().SrcBlobType == blob.BlobTypePageBlob && pageBlobTier != common.EPageBlobTier.None() && ValidateTier(jptm, to.Ptr(pageBlobTier.ToAccessTierType()), srcBlobClient, jptm.Context(), true) { _, err = srcBlobClient.SetTier(jptm.Context(), pageBlobTier.ToAccessTierType(), &blob.SetTierOptions{RehydratePriority: &rehydratePriority}) } if err != nil { errorHandlerForXferSetProperties(err, jptm, transferDone) return } // don't mark it a success just yet, because more properties might need to be changed } if PropertiesToTransfer.ShouldTransferMetaData() { _, err := srcBlobClient.SetMetadata(jptm.Context(), metadata, nil) //TODO the canonical thing in this is changing key value to upper case. How to go around it? if err != nil { errorHandlerForXferSetProperties(err, jptm, transferDone) return } } if PropertiesToTransfer.ShouldTransferBlobTags() { _, err := srcBlobClient.SetTags(jptm.Context(), blobTags, nil) if err != nil { errorHandlerForXferSetProperties(err, jptm, transferDone) return } } // marking it a successful flow, as no property has resulted in err != nil transferDone(common.ETransferStatus.Success(), nil) } func setPropertiesBlobFS(jptm IJobPartTransferMgr) { info := jptm.Info() // Internal function which checks the transfer status and logs the msg respectively. // Sets the transfer status and Report Transfer as Done. // Internal function is created to avoid redundancy of the above steps from several places in the api. transferDone := func(status common.TransferStatus, err error) { if status == common.ETransferStatus.Failed() { jptm.LogError(info.Source, "SET-PROPERTIES ERROR ", err) } else { jptm.Log(common.LogInfo, fmt.Sprintf("SET-PROPERTIES SUCCESSFUL: %s", strings.Split(info.Destination, "?")[0])) } jptm.SetStatus(status) jptm.ResetSourceSize() // sets source size to 0 (made to be used by setProperties command to make number of bytes transferred = 0) jptm.ReportTransferDone() } bsc, err := jptm.SrcServiceClient().BlobServiceClient() if err != nil { transferDone(common.ETransferStatus.Failed(), err) return } srcBlobClient := bsc.NewContainerClient(info.SrcContainer).NewBlobClient(info.SrcFilePath) PropertiesToTransfer := jptm.PropertiesToTransfer() _, metadata, blobTags, _ := jptm.ResourceDstData(nil) if PropertiesToTransfer.ShouldTransferTier() { rehydratePriority := info.RehydratePriority _, pageBlobTier := jptm.BlobTiers() var err error = nil if ValidateTier(jptm, to.Ptr(pageBlobTier.ToAccessTierType()), srcBlobClient, jptm.Context(), false) { _, err = srcBlobClient.SetTier(jptm.Context(), pageBlobTier.ToAccessTierType(), &blob.SetTierOptions{RehydratePriority: &rehydratePriority}) } if err != nil { errorHandlerForXferSetProperties(err, jptm, transferDone) return } // don't mark it a success just yet, because more properties might need to be changed } if PropertiesToTransfer.ShouldTransferMetaData() { _, err := srcBlobClient.SetMetadata(jptm.Context(), metadata, nil) if err != nil { errorHandlerForXferSetProperties(err, jptm, transferDone) return } } if PropertiesToTransfer.ShouldTransferBlobTags() { _, err := srcBlobClient.SetTags(jptm.Context(), blobTags, nil) if err != nil { errorHandlerForXferSetProperties(err, jptm, transferDone) return } } // marking it a successful flow, as no property has resulted in err != nil transferDone(common.ETransferStatus.Success(), nil) } func setPropertiesFile(jptm IJobPartTransferMgr) { info := jptm.Info() // Internal function which checks the transfer status and logs the msg respectively. // Sets the transfer status and Report Transfer as Done. // Internal function is created to avoid redundancy of the above steps from several places in the api. transferDone := func(status common.TransferStatus, err error) { if status == common.ETransferStatus.Failed() { jptm.LogError(info.Source, "SET-PROPERTIES ERROR ", err) } else { jptm.Log(common.LogInfo, fmt.Sprintf("SET-PROPERTIES SUCCESSFUL: %s", strings.Split(info.Destination, "?")[0])) } jptm.SetStatus(status) jptm.ResetSourceSize() // sets source size to 0 (made to be used by setProperties command to make number of bytes transferred = 0) jptm.ReportTransferDone() } s, err := jptm.SrcServiceClient().FileServiceClient() if err != nil { transferDone(common.ETransferStatus.Failed(), err) return } srcFileClient := s.NewShareClient(jptm.Info().SrcContainer).NewRootDirectoryClient().NewFileClient(jptm.Info().SrcFilePath) PropertiesToTransfer := jptm.PropertiesToTransfer() _, metadata, _, _ := jptm.ResourceDstData(nil) if PropertiesToTransfer.ShouldTransferTier() { // this case should have been picked up by front end and given error (changing tier is not available for File Storage) err := fmt.Errorf("trying to change tier of file") transferDone(common.ETransferStatus.Failed(), err) } if PropertiesToTransfer.ShouldTransferMetaData() { _, err := srcFileClient.SetMetadata(jptm.Context(), &file.SetMetadataOptions{Metadata: metadata}) if err != nil { errorHandlerForXferSetProperties(err, jptm, transferDone) return } } // TAGS NOT AVAILABLE FOR FILES transferDone(common.ETransferStatus.Success(), nil) } func errorHandlerForXferSetProperties(err error, jptm IJobPartTransferMgr, transferDone func(status common.TransferStatus, err error)) { var respErr *azcore.ResponseError if errors.As(err, &respErr) && respErr.StatusCode == http.StatusForbidden { // If the status code was 403, it means there was an authentication error, and we exit. // User can resume the job if completely ordered with a new sas. errMsg := fmt.Sprintf("Authentication Failed. The SAS is not correct or expired or does not have the correct permission %s", err.Error()) jptm.Log(common.LogError, errMsg) common.GetLifecycleMgr().Error(errMsg) // TODO : Migrate on azfile } // in all other cases, make the transfer as failed transferDone(common.ETransferStatus.Failed(), err) }