ste/JobPartPlan.go (269 lines of code) (raw):
package ste
import (
"errors"
"sync/atomic"
"unsafe"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-storage-azcopy/v10/common"
)
// dataSchemaVersion defines the data schema version of JobPart order files supported by
// current version of azcopy
// To be Incremented every time when we release azcopy with changed dataSchema
const DataSchemaVersion common.Version = 19
const (
CustomHeaderMaxBytes = 256
MetadataMaxBytes = 1000 // If > 65536, then jobPartPlanBlobData's MetadataLength field's type must change
BlobTagsMaxByte = 4000
)
// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type JobPartPlanMMF common.MMF
func (mmf *JobPartPlanMMF) Plan() *JobPartPlanHeader {
// getJobPartPlanPointer returns the memory map JobPartPlanHeader pointer
// casting the mmf slice's address to JobPartPlanHeader Pointer
slice := (*common.MMF)(mmf).Slice()
return (*JobPartPlanHeader)(unsafe.Pointer(&slice[0]))
}
func (mmf *JobPartPlanMMF) Unmap() { (*common.MMF)(mmf).Unmap() }
// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type IJobPartPlanHeader interface {
CommandString() string
JobPartStatus() common.JobStatus
JobStatus() common.JobStatus
SetJobPartStatus(newJobStatus common.JobStatus)
SetJobStatus(newJobStatus common.JobStatus)
Transfer(transferIndex uint32) *JobPartPlanTransfer
TransferSrcDstRelatives(transferIndex uint32) (relSource string, relDest string)
TransferSrcDstStrings(transferIndex uint32) (source string, destination string, isFolder bool)
TransferSrcPropertiesAndMetadata(transferIndex uint32) (h common.ResourceHTTPHeaders, metadata common.Metadata, blobType blob.BlobType, blobTier blob.AccessTier, s2sGetPropertiesInBackend bool, DestLengthValidation bool, s2sSourceChangeValidation bool, s2sInvalidMetadataHandleOption common.InvalidMetadataHandleOption, entityType common.EntityType, blobVersionID string, blobSnapshotID string, blobTags common.BlobTags)
}
// JobPartPlanHeader represents the header of Job Part's memory-mapped file
type JobPartPlanHeader struct {
// Once set, the following fields are constants; they should never be modified
Version common.Version // The version of data schema format of header; see the dataSchemaVersion constant
StartTime int64 // The start time of this part
JobID common.JobID // Job Part's JobID
PartNum common.PartNumber // Job Part's part number (0+)
SourceRootLength uint16 // The length of the source root path
SourceRoot [1000]byte // The root directory of the source
SourceExtraQueryLength uint16
SourceExtraQuery [1000]byte // Extra query params applicable to the source
DestinationRootLength uint16 // The length of the destination root path
DestinationRoot [1000]byte // The root directory of the destination
DestExtraQueryLength uint16
DestExtraQuery [1000]byte // Extra query params applicable to the dest
IsFinalPart bool // True if this is the Job's last part; else false
ForceWrite common.OverwriteOption // True if the existing blobs needs to be overwritten.
ForceIfReadOnly bool // Supplements ForceWrite with an additional setting for Azure Files. If true, the read-only attribute will be cleared before we overwrite
AutoDecompress bool // if true, source data with encodings that represent compression are automatically decompressed when downloading
Priority common.JobPriority // The Job Part's priority
TTLAfterCompletion uint32 // Time to live after completion is used to persists the file on disk of specified time after the completion of JobPartOrder
FromTo common.FromTo // The location of the transfer's source & destination
Fpo common.FolderPropertyOption // option specifying how folders will be handled
CommandStringLength uint32
NumTransfers uint32 // The number of transfers in the Job part
LogLevel common.LogLevel // This Job Part's minimal log level
DstBlobData JobPartPlanDstBlob // Additional data for blob destinations
DstLocalData JobPartPlanDstLocal // Additional data for local destinations
DstFileData JobPartPlanDstFile // Additional data for file destinations
PreservePermissions common.PreservePermissionsOption
PreserveSMBInfo bool
PreservePOSIXProperties bool
// S2SGetPropertiesInBackend represents whether to enable get S3 objects' or Azure files' properties during s2s copy in backend.
S2SGetPropertiesInBackend bool
// S2SSourceChangeValidation represents whether user wants to check if source has changed after enumerating.
S2SSourceChangeValidation bool
// DestLengthValidation represents whether the user wants to check if the destination has a different content-length
DestLengthValidation bool
// S2SInvalidMetadataHandleOption represents how user wants to handle invalid metadata.
S2SInvalidMetadataHandleOption common.InvalidMetadataHandleOption
// BlobFSRecursiveDelete represents whether the user wants to make a recursive call to the DFS endpoint or not
BlobFSRecursiveDelete bool
// Any fields below this comment are NOT constants; they may change over as the job part is processed.
// Care must be taken to read/write to these fields in a thread-safe way!
// jobStatus_doNotUse represents the current status of JobPartPlan
// jobStatus_doNotUse is a private member whose value can be accessed by Status and SetJobStatus
// jobStatus_doNotUse should not be directly accessed anywhere except by the Status and SetJobStatus
atomicJobStatus common.JobStatus
atomicPartStatus common.JobStatus
// For delete operation specify what to do with snapshots
DeleteSnapshotsOption common.DeleteSnapshotsOption
// Determine what to do with soft-deleted snapshots
PermanentDeleteOption common.PermanentDeleteOption
RehydratePriority common.RehydratePriorityType
}
// Status returns the job status stored in JobPartPlanHeader in thread-safe manner
func (jpph *JobPartPlanHeader) JobStatus() common.JobStatus {
return jpph.atomicJobStatus.AtomicLoad()
}
// SetJobStatus sets the job status in JobPartPlanHeader in thread-safe manner
func (jpph *JobPartPlanHeader) SetJobStatus(newJobStatus common.JobStatus) {
jpph.atomicJobStatus.AtomicStore(newJobStatus)
}
func (jpph *JobPartPlanHeader) JobPartStatus() common.JobStatus {
return jpph.atomicPartStatus.AtomicLoad()
}
func (jpph *JobPartPlanHeader) SetJobPartStatus(newJobStatus common.JobStatus) {
jpph.atomicPartStatus.AtomicStore(newJobStatus)
}
// Transfer api gives memory map JobPartPlanTransfer header for given index
func (jpph *JobPartPlanHeader) Transfer(transferIndex uint32) *JobPartPlanTransfer {
// get memory map JobPartPlan Header Pointer
if transferIndex >= jpph.NumTransfers {
panic(errors.New("requesting a transfer index greater than what is available"))
}
// (Job Part Plan's file address) + (header size) + (padding to 8 bytes) --> beginning of transfers in file
// Add (transfer size) * (transfer index)
transfersOffset := unsafe.Sizeof(*jpph) + uintptr(jpph.CommandStringLength)
transfersOffset = (transfersOffset + 7) & ^uintptr(7)
return (*JobPartPlanTransfer)(unsafe.Pointer((uintptr(unsafe.Pointer(jpph)) + transfersOffset) + (unsafe.Sizeof(JobPartPlanTransfer{}) * uintptr(transferIndex))))
}
// CommandString returns the command string given by user when job was created
func (jpph *JobPartPlanHeader) CommandString() string {
// Calculate the start address of the command string
start := uintptr(unsafe.Pointer(jpph)) + unsafe.Sizeof(*jpph)
// Create a slice from the calculated start address
commandSlice := unsafe.Slice((*byte)(unsafe.Pointer(start)), int(jpph.CommandStringLength))
return string(commandSlice)
}
func (jpph *JobPartPlanHeader) TransferSrcDstRelatives(transferIndex uint32) (relSource, relDest string) {
jppt := jpph.Transfer(transferIndex)
srcData := unsafe.Pointer(uintptr(unsafe.Pointer(jpph)) + uintptr(jppt.SrcOffset)) // Address of Job Part Plan + this transfer's src string offset
dstData := unsafe.Pointer(uintptr(unsafe.Pointer(jpph)) + uintptr(jppt.SrcOffset) + uintptr(jppt.SrcLength)) // Address of Job Part Plan + this transfer's src string offset + length of this transfer's src string
return unsafe.String((*byte)(srcData), int(jppt.SrcLength)), unsafe.String((*byte)(dstData), int(jppt.DstLength))
}
// TransferSrcDstDetail returns the source and destination string for a transfer at given transferIndex in JobPartOrder
// Also indication of entity type since that's often necessary to avoid ambiguity about what the source and dest are
func (jpph *JobPartPlanHeader) TransferSrcDstStrings(transferIndex uint32) (source, destination string, isFolder bool) {
srcRoot := string(jpph.SourceRoot[:jpph.SourceRootLength])
srcExtraQuery := string(jpph.SourceExtraQuery[:jpph.SourceExtraQueryLength])
dstRoot := string(jpph.DestinationRoot[:jpph.DestinationRootLength])
dstExtraQuery := string(jpph.DestExtraQuery[:jpph.DestExtraQueryLength])
jppt := jpph.Transfer(transferIndex)
isFolder = jppt.EntityType == common.EEntityType.Folder()
srcRelative, dstRelative := jpph.TransferSrcDstRelatives(transferIndex)
return common.GenerateFullPathWithQuery(srcRoot, srcRelative, srcExtraQuery),
common.GenerateFullPathWithQuery(dstRoot, dstRelative, dstExtraQuery),
isFolder
}
func (jpph *JobPartPlanHeader) getString(offset int64, length int16) string {
data := unsafe.Pointer(uintptr(unsafe.Pointer(jpph)) + uintptr(offset)) // Address of Job Part Plan + this string's offset
return unsafe.String((*byte)(data), int(length))
}
// TransferSrcPropertiesAndMetadata returns the SrcHTTPHeaders, properties and metadata for a transfer at given transferIndex in JobPartOrder
// TODO: Refactor return type to an object
func (jpph *JobPartPlanHeader) TransferSrcPropertiesAndMetadata(transferIndex uint32) (h common.ResourceHTTPHeaders, metadata common.Metadata, blobType blob.BlobType, blobTier blob.AccessTier,
s2sGetPropertiesInBackend bool, DestLengthValidation bool, s2sSourceChangeValidation bool, s2sInvalidMetadataHandleOption common.InvalidMetadataHandleOption, entityType common.EntityType, blobVersionID string, blobSnapshotID string, blobTags common.BlobTags) {
var err error
t := jpph.Transfer(transferIndex)
s2sGetPropertiesInBackend = jpph.S2SGetPropertiesInBackend
s2sSourceChangeValidation = jpph.S2SSourceChangeValidation
s2sInvalidMetadataHandleOption = jpph.S2SInvalidMetadataHandleOption
DestLengthValidation = jpph.DestLengthValidation
offset := t.SrcOffset + int64(t.SrcLength) + int64(t.DstLength)
entityType = t.EntityType
if t.SrcContentTypeLength != 0 {
h.ContentType = jpph.getString(offset, t.SrcContentTypeLength)
offset += int64(t.SrcContentTypeLength)
}
if t.SrcContentEncodingLength != 0 {
h.ContentEncoding = jpph.getString(offset, t.SrcContentEncodingLength)
offset += int64(t.SrcContentEncodingLength)
}
if t.SrcContentLanguageLength != 0 {
h.ContentLanguage = jpph.getString(offset, t.SrcContentLanguageLength)
offset += int64(t.SrcContentLanguageLength)
}
if t.SrcContentDispositionLength != 0 {
h.ContentDisposition = jpph.getString(offset, t.SrcContentDispositionLength)
offset += int64(t.SrcContentDispositionLength)
}
if t.SrcCacheControlLength != 0 {
h.CacheControl = jpph.getString(offset, t.SrcCacheControlLength)
offset += int64(t.SrcCacheControlLength)
}
if t.SrcContentMD5Length != 0 {
h.ContentMD5 = []byte(jpph.getString(offset, t.SrcContentMD5Length))
offset += int64(t.SrcContentMD5Length)
}
if t.SrcMetadataLength != 0 {
tmpMetaData := jpph.getString(offset, t.SrcMetadataLength)
metadata, err = common.UnMarshalToCommonMetadata(tmpMetaData)
common.PanicIfErr(err)
offset += int64(t.SrcMetadataLength)
}
if t.SrcBlobTypeLength != 0 {
tmpBlobTypeStr := []byte(jpph.getString(offset, t.SrcBlobTypeLength))
blobType = blob.BlobType(tmpBlobTypeStr)
offset += int64(t.SrcBlobTypeLength)
}
if t.SrcBlobTierLength != 0 {
tmpBlobTierStr := []byte(jpph.getString(offset, t.SrcBlobTierLength))
blobTier = blob.AccessTier(tmpBlobTierStr)
offset += int64(t.SrcBlobTierLength)
}
if t.SrcBlobVersionIDLength != 0 {
blobVersionID = jpph.getString(offset, t.SrcBlobVersionIDLength)
offset += int64(t.SrcBlobVersionIDLength)
}
if t.SrcBlobSnapshotIDLength != 0 {
blobSnapshotID = jpph.getString(offset, t.SrcBlobSnapshotIDLength)
offset += int64(t.SrcBlobSnapshotIDLength)
}
if t.SrcBlobTagsLength != 0 {
blobTagsString := jpph.getString(offset, t.SrcBlobTagsLength)
blobTags = common.ToCommonBlobTagsMap(blobTagsString)
offset += int64(t.SrcBlobTagsLength) //nolint:ineffassign
}
return
}
// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// JobPartPlanDstBlob holds additional settings required when the destination is a blob
type JobPartPlanDstBlob struct {
// Once set, the following fields are constants; they should never be modified
BlobType common.BlobType
// represents user decision to interpret the content-encoding from source file
NoGuessMimeType bool
// Specifies the length of MIME content type of the blob
ContentTypeLength uint16
// Specifies the MIME content type of the blob. The default type is application/octet-stream
ContentType [CustomHeaderMaxBytes]byte
// Specifies length of content encoding which have been applied to the blob.
ContentEncodingLength uint16
// Specifies which content encodings have been applied to the blob.
ContentEncoding [CustomHeaderMaxBytes]byte
// Specifies length of content language which has been applied to the blob.
ContentLanguageLength uint16
// Specifies which content language has been applied to the blob.
ContentLanguage [CustomHeaderMaxBytes]byte
// Specifies length of content disposition which has been applied to the blob.
ContentDispositionLength uint16
// Specifies the content disposition of the blob
ContentDisposition [CustomHeaderMaxBytes]byte
// Specifies the length of the cache control which has been applied to the blob.
CacheControlLength uint16
// Specifies the cache control of the blob
CacheControl [CustomHeaderMaxBytes]byte
// Specifies the tier if this is a block or page blob
BlockBlobTier common.BlockBlobTier
PageBlobTier common.PageBlobTier
// Controls uploading of MD5 hashes
PutMd5 bool
MetadataLength uint16
Metadata [MetadataMaxBytes]byte
BlobTagsLength uint16
BlobTags [BlobTagsMaxByte]byte
CpkInfo bool
IsSourceEncrypted bool
CpkScopeInfo [CustomHeaderMaxBytes]byte
CpkScopeInfoLength uint16
// Specifies the maximum size of block which determines the number of chunks and chunk size of a transfer
BlockSize int64
// Specifies the maximum size of a blob which can be uploaded by a single PUT request.
PutBlobSize int64
SetPropertiesFlags common.SetPropertiesFlags
DeleteDestinationFileIfNecessary bool
}
// JobPartPlanDstFile holds additional settings required when the destination is a file
type JobPartPlanDstFile struct {
TrailingDot common.TrailingDotOption
}
// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// jobPartPlanDstLocal holds additional settings required when the destination is a local file
type JobPartPlanDstLocal struct {
// Once set, the following fields are constants; they should never be modified
// Specifies whether the timestamp of destination file has to be set to the modified time of source file
PreserveLastModifiedTime bool
// says how MD5 verification failures should be actioned
MD5VerificationOption common.HashValidationOption
}
// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// JobPartPlanTransfer represent the header of Job Part's Transfer in Memory Map File
type JobPartPlanTransfer struct {
// Once set, the following fields are constants; they should never be modified
// SrcOffset represents the actual start offset transfer header written in JobPartOrder file
SrcOffset int64
// SrcLength represents the actual length of source string for specific transfer
SrcLength int16
// DstLength represents the actual length of destination string for specific transfer
DstLength int16
// ChunkCount represents the num of chunks a transfer is split into
// ChunkCount uint16 // TODO: Remove this, we need to determine it at runtime
// EntityType indicates whether this is a file or a folder
// We use a dedicated field for this because the alternative (of doing something fancy the names) was too complex and error-prone
EntityType common.EntityType
// ModifiedTime represents the last time at which source was modified before start of transfer stored as nanoseconds.
ModifiedTime int64
// SourceSize represents the actual size of the source on disk
SourceSize int64
// CompletionTime represents the time at which transfer was completed
CompletionTime uint64
// For S2S copy, per Transfer source's properties
// TODO: ensure the length is enough
SrcContentTypeLength int16
SrcContentEncodingLength int16
SrcContentLanguageLength int16
SrcContentDispositionLength int16
SrcCacheControlLength int16
SrcContentMD5Length int16
SrcMetadataLength int16
SrcBlobTypeLength int16
SrcBlobTierLength int16
SrcBlobVersionIDLength int16
SrcBlobSnapshotIDLength int16
SrcBlobTagsLength int16
// Any fields below this comment are NOT constants; they may change over as the transfer is processed.
// Care must be taken to read/write to these fields in a thread-safe way!
// atomicTransferStatus represents the status of current transfer (TransferInProgress, TransferFailed or TransfersCompleted)
// atomicTransferStatus should not be directly accessed anywhere except by transferStatus and setTransferStatus
atomicTransferStatus common.TransferStatus
// atomicErrorCode represents the storageError error code of the error with which the transfer got failed.
// atomicErrorCode has a default value (0) which means either there was no error or transfer failed because some non storageError.
// atomicErrorCode should not be directly accessed anywhere except by transferStatus and setTransferStatus
atomicErrorCode int32
}
// TransferStatus returns the transfer's status
func (jppt *JobPartPlanTransfer) TransferStatus() common.TransferStatus {
return jppt.atomicTransferStatus.AtomicLoad()
}
// SetTransferStatus sets the transfer's status
// overWrite flags if set to true overWrites the failed status.
// If overWrite flag is set to false, then status of transfer is set to failed won't be overWritten.
// overWrite flag is used while resuming the failed transfers where the errorCode are set to default i.e 0
func (jppt *JobPartPlanTransfer) SetTransferStatus(status common.TransferStatus, overWrite bool) {
if !overWrite {
common.AtomicMorphInt32((*int32)(&jppt.atomicTransferStatus),
func(startVal int32) (val int32, morphResult interface{}) {
// If current transfer status has some completed value, then it will not be changed.
return common.Iff(common.TransferStatus(startVal).StatusLocked(), startVal, int32(status)), nil
})
} else {
(&jppt.atomicTransferStatus).AtomicStore(status)
}
}
// ErrorCode returns the transfer's errorCode.
func (jppt *JobPartPlanTransfer) ErrorCode() int32 {
return atomic.LoadInt32(&jppt.atomicErrorCode)
}
// SetErrorCode sets the error code of the error if transfer failed.
// overWrite flags if set to true overWrites the atomicErrorCode.
// If overWrite flag is set to false, then errorCode won't be overwritten.
func (jppt *JobPartPlanTransfer) SetErrorCode(errorCode int32, overwrite bool) {
if !overwrite {
common.AtomicMorphInt32(&jppt.atomicErrorCode,
func(startErrorCode int32) (val int32, morphResult interface{}) {
// startErrorCode != 0 means that error code is already set.
// If current error code is already set to some error code, then it will not be changed.
return common.Iff(startErrorCode != 0, startErrorCode, errorCode), nil
})
} else {
atomic.StoreInt32(&jppt.atomicErrorCode, errorCode)
}
}