ste/JobPartPlanFileName.go (327 lines of code) (raw):

package ste import ( "encoding/binary" "errors" "fmt" "io" "math" "os" "reflect" "strings" "time" "unsafe" "github.com/Azure/azure-storage-azcopy/v10/common" ) type JobPartPlanFileName string func (jppfn *JobPartPlanFileName) Exists() bool { _, err := os.Stat(jppfn.GetJobPartPlanPath()) return err == nil } func (jppfn *JobPartPlanFileName) GetJobPartPlanPath() string { if common.AzcopyJobPlanFolder != "" { return fmt.Sprintf("%s%s%s", common.AzcopyJobPlanFolder, common.AZCOPY_PATH_SEPARATOR_STRING, string(*jppfn)) } else { return string(*jppfn) } } const JobPartPlanFileNameFormat = "%v--%05d.steV%d" // TODO: This needs testing func (jpfn JobPartPlanFileName) Parse() (jobID common.JobID, partNumber common.PartNumber, err error) { var dataSchemaVersion common.Version // n, err := fmt.Sscanf(string(jpfn), jobPartPlanFileNameFormat, &jobID, &partNumber, &dataSchemaVersion) // if err != nil || n != 3 { // panic(err) // } // if dataSchemaVersion != DataSchemaVersion { // err = fmt.Errorf("job part Plan file's data schema version ('%d') doesn't match whatthis app requires ('%d')", dataSchemaVersion, DataSchemaVersion) // } // TODO: confirm the alternative approach. fmt.Sscanf not working for reading back string into struct JobId. jpfnSplit := strings.Split(string(jpfn), "--") jobId, err := common.ParseJobID(jpfnSplit[0]) if err != nil { err = fmt.Errorf("failed to parse the JobId from JobPartFileName %s. Failed with error %w", string(jpfn), err) //nolint:staticcheck common.GetLifecycleMgr().Warn(err.Error()) } jobID = jobId n, err := fmt.Sscanf(jpfnSplit[1], "%05d.steV%d", &partNumber, &dataSchemaVersion) if err != nil || n != 2 { panic(err) } if dataSchemaVersion != DataSchemaVersion { err = fmt.Errorf("job part Plan file's data schema version ('%d') doesn't match whatthis app requires ('%d')", dataSchemaVersion, DataSchemaVersion) } return } func (jpfn JobPartPlanFileName) Delete() error { return os.Remove(string(jpfn)) } func (jpfn JobPartPlanFileName) Map() *JobPartPlanMMF { // opening the file with given filename file, err := os.OpenFile(jpfn.GetJobPartPlanPath(), os.O_RDWR, common.DEFAULT_FILE_PERM) common.PanicIfErr(err) // Ensure the file gets closed (although we can continue to use the MMF) defer file.Close() fileInfo, err := file.Stat() common.PanicIfErr(err) mmf, err := common.NewMMF(file, true, 0, fileInfo.Size()) common.PanicIfErr(err) return (*JobPartPlanMMF)(mmf) } // createJobPartPlanFile creates the memory map JobPartPlanHeader using the given JobPartOrder and JobPartPlanBlobData func (jpfn JobPartPlanFileName) Create(order common.CopyJobPartOrderRequest) { if jpfn.Exists() { panic(fmt.Sprint("Duplicate job created. You probably shouldn't ever see this, but if you do, try cleaning out", jpfn.GetJobPartPlanPath())) } // Validate that the passed-in strings can fit in their respective fields if len(order.SourceRoot.Value) > len(JobPartPlanHeader{}.SourceRoot) { panic(fmt.Errorf("source root string is too large: %q", order.SourceRoot)) } if len(order.SourceRoot.ExtraQuery) > len(JobPartPlanHeader{}.SourceExtraQuery) { panic(fmt.Errorf("source extra query strings too large: %q", order.SourceRoot.ExtraQuery)) } if len(order.DestinationRoot.Value) > len(JobPartPlanHeader{}.DestinationRoot) { panic(fmt.Errorf("destination root string is too large: %q", order.DestinationRoot)) } if len(order.DestinationRoot.ExtraQuery) > len(JobPartPlanHeader{}.DestExtraQuery) { panic(fmt.Errorf("destination extra query strings too large: %q", order.DestinationRoot.ExtraQuery)) } if len(order.BlobAttributes.ContentType) > len(JobPartPlanDstBlob{}.ContentType) { panic(fmt.Errorf("content type string is too large: %q", order.BlobAttributes.ContentType)) } if len(order.BlobAttributes.ContentEncoding) > len(JobPartPlanDstBlob{}.ContentEncoding) { panic(fmt.Errorf("content encoding string is too large: %q", order.BlobAttributes.ContentEncoding)) } if len(order.BlobAttributes.ContentLanguage) > len(JobPartPlanDstBlob{}.ContentLanguage) { panic(fmt.Errorf("content language string is too large: %q", order.BlobAttributes.ContentLanguage)) } if len(order.BlobAttributes.ContentDisposition) > len(JobPartPlanDstBlob{}.ContentDisposition) { panic(fmt.Errorf("content disposition string is too large: %q", order.BlobAttributes.ContentDisposition)) } if len(order.BlobAttributes.CacheControl) > len(JobPartPlanDstBlob{}.CacheControl) { panic(fmt.Errorf("cache control string is too large: %q", order.BlobAttributes.CacheControl)) } if len(order.BlobAttributes.Metadata) > len(JobPartPlanDstBlob{}.Metadata) { panic(fmt.Errorf("metadata string is too large: %q", order.BlobAttributes.Metadata)) } if len(order.BlobAttributes.BlobTagsString) > len(JobPartPlanDstBlob{}.BlobTags) { panic(fmt.Errorf("blob tags string is too large: %q", order.BlobAttributes.BlobTagsString)) } // This nested function writes a structure value to an io.Writer & returns the number of bytes written writeValue := func(writer io.Writer, v interface{}) int64 { rv := reflect.ValueOf(v) structSize := reflect.TypeOf(v).Elem().Size() byteSlice := unsafe.Slice((*byte)(rv.UnsafePointer()), int(structSize)) err := binary.Write(writer, binary.LittleEndian, byteSlice) common.PanicIfErr(err) return int64(structSize) } eof := int64(0) /* * Following Steps are executed: * 1. Get File Name from JobId and Part Number * 2. Create the File with filename * 3. Create Job Part Plan From Job Part Order * 4. Write Data to file * 5. Close the file * 6. Return File Name */ // create the Job Part Plan file // planPathname := planDir + "/" + string(jpfn) file, err := os.Create(jpfn.GetJobPartPlanPath()) if err != nil { panic(fmt.Errorf("couldn't create job part plan file %q: %w", jpfn, err)) } defer file.Close() // If block size from the front-end is set to 0 // store the block-size as 0. While getting the transfer Info // auto correction logic will apply. If the block-size stored is not 0 // it means that user provided some block-size and auto-correct will not // apply. blockSize := order.BlobAttributes.BlockSizeInBytes // if blockSize == 0 { // TODO: Fix below // blockSize = common.DefaultBlockBlobBlockSize // /*switch order.BlobAttributes.BlobType { // case common.BlobType{}.Block(): // blockSize = common.DefaultBlockBlobBlockSize // case common.BlobType{}.Append(): // blockSize = common.DefaultAppendBlobBlockSize // case common.BlobType{}.Page(): // blockSize = common.DefaultPageBlobChunkSize // default: // panic(errors.New("unrecognized blob type")) // }*/ // } putBlobSize := order.BlobAttributes.PutBlobSizeInBytes // Initialize the Job Part's Plan header jpph := JobPartPlanHeader{ Version: DataSchemaVersion, StartTime: time.Now().UnixNano(), JobID: order.JobID, PartNum: order.PartNum, SourceRootLength: uint16(len(order.SourceRoot.Value)), SourceExtraQueryLength: uint16(len(order.SourceRoot.ExtraQuery)), DestinationRootLength: uint16(len(order.DestinationRoot.Value)), DestExtraQueryLength: uint16(len(order.DestinationRoot.ExtraQuery)), IsFinalPart: order.IsFinalPart, ForceWrite: order.ForceWrite, ForceIfReadOnly: order.ForceIfReadOnly, AutoDecompress: order.AutoDecompress, Priority: order.Priority, TTLAfterCompletion: uint32(time.Time{}.Nanosecond()), FromTo: order.FromTo, Fpo: order.Fpo, CommandStringLength: uint32(len(order.CommandString)), NumTransfers: uint32(len(order.Transfers.List)), LogLevel: order.LogLevel, DstBlobData: JobPartPlanDstBlob{ BlobType: order.BlobAttributes.BlobType, NoGuessMimeType: order.BlobAttributes.NoGuessMimeType, ContentTypeLength: uint16(len(order.BlobAttributes.ContentType)), ContentEncodingLength: uint16(len(order.BlobAttributes.ContentEncoding)), ContentDispositionLength: uint16(len(order.BlobAttributes.ContentDisposition)), ContentLanguageLength: uint16(len(order.BlobAttributes.ContentLanguage)), CacheControlLength: uint16(len(order.BlobAttributes.CacheControl)), PutMd5: order.BlobAttributes.PutMd5, // here because it relates to uploads (blob destination) BlockBlobTier: order.BlobAttributes.BlockBlobTier, PageBlobTier: order.BlobAttributes.PageBlobTier, MetadataLength: uint16(len(order.BlobAttributes.Metadata)), BlockSize: blockSize, PutBlobSize: putBlobSize, BlobTagsLength: uint16(len(order.BlobAttributes.BlobTagsString)), CpkInfo: order.CpkOptions.CpkInfo, CpkScopeInfoLength: uint16(len(order.CpkOptions.CpkScopeInfo)), IsSourceEncrypted: order.CpkOptions.IsSourceEncrypted, SetPropertiesFlags: order.SetPropertiesFlags, DeleteDestinationFileIfNecessary: order.BlobAttributes.DeleteDestinationFileIfNecessary, }, DstLocalData: JobPartPlanDstLocal{ PreserveLastModifiedTime: order.BlobAttributes.PreserveLastModifiedTime, MD5VerificationOption: order.BlobAttributes.MD5ValidationOption, // here because it relates to downloads (file destination) }, PreservePermissions: order.PreserveSMBPermissions, PreserveSMBInfo: order.PreserveSMBInfo, PreservePOSIXProperties: order.PreservePOSIXProperties, // For S2S copy, per JobPartPlan info S2SGetPropertiesInBackend: order.S2SGetPropertiesInBackend, S2SSourceChangeValidation: order.S2SSourceChangeValidation, S2SInvalidMetadataHandleOption: order.S2SInvalidMetadataHandleOption, DestLengthValidation: order.DestLengthValidation, BlobFSRecursiveDelete: order.BlobFSRecursiveDelete, atomicJobStatus: common.EJobStatus.InProgress(), // We default to InProgress DeleteSnapshotsOption: order.BlobAttributes.DeleteSnapshotsOption, PermanentDeleteOption: order.BlobAttributes.PermanentDeleteOption, RehydratePriority: order.BlobAttributes.RehydratePriority, DstFileData: JobPartPlanDstFile{ TrailingDot: order.FileAttributes.TrailingDot, }, } // Copy any strings into their respective fields // do NOT copy Source/DestinationRoot.SAS, since we do NOT persist SASs copy(jpph.SourceRoot[:], order.SourceRoot.Value) copy(jpph.SourceExtraQuery[:], order.SourceRoot.ExtraQuery) copy(jpph.DestinationRoot[:], order.DestinationRoot.Value) copy(jpph.DestExtraQuery[:], order.DestinationRoot.ExtraQuery) copy(jpph.DstBlobData.ContentType[:], order.BlobAttributes.ContentType) copy(jpph.DstBlobData.ContentEncoding[:], order.BlobAttributes.ContentEncoding) copy(jpph.DstBlobData.ContentLanguage[:], order.BlobAttributes.ContentLanguage) copy(jpph.DstBlobData.ContentDisposition[:], order.BlobAttributes.ContentDisposition) copy(jpph.DstBlobData.CacheControl[:], order.BlobAttributes.CacheControl) copy(jpph.DstBlobData.Metadata[:], order.BlobAttributes.Metadata) copy(jpph.DstBlobData.BlobTags[:], order.BlobAttributes.BlobTagsString) copy(jpph.DstBlobData.CpkScopeInfo[:], order.CpkOptions.CpkScopeInfo) eof += writeValue(file, &jpph) // write the command string in the JobPart Plan file bytesWritten, err := file.WriteString(order.CommandString) if err != nil { panic(err) } eof += int64(bytesWritten) // ensure 8 byte alignment so that Atomic fields of JobPartPlanTransfer can actually be accessed atomically paddingLen := ((eof + 7) & ^7) - eof if paddingLen != 0 { bytesWritten, err := file.Write(make([]byte, paddingLen)) if err != nil { panic(err) } eof += int64(bytesWritten) } // srcDstStringsOffset points to after the header & all the transfers; this is where the src/dst strings go for each transfer srcDstStringsOffset := make([]int64, jpph.NumTransfers) // Initialize the offset for the 1st transfer's src/dst strings currentSrcStringOffset := eof + int64(unsafe.Sizeof(JobPartPlanTransfer{}))*int64(jpph.NumTransfers) // Write each transfer to the Job Part Plan file (except for the src/dst strings; comes come later) for t := range order.Transfers.List { if len(order.Transfers.List[t].Source) > math.MaxInt16 || len(order.Transfers.List[t].Destination) > math.MaxInt16 { panic(fmt.Sprintf("The file %s exceeds azcopy's current maximum path length on either the source or the destination.", order.Transfers.List[t].Source)) } // Prepare info for JobPartPlanTransfer // Sending Metadata type to Transfer could ensure strong type validation. // TODO: discuss the performance drop of marshaling metadata twice srcMetadataLength := 0 if order.Transfers.List[t].Metadata != nil { metadataStr, err := order.Transfers.List[t].Metadata.Marshal() if err != nil { panic(err) } srcMetadataLength = len(metadataStr) } if srcMetadataLength > math.MaxInt16 { panic(fmt.Sprintf("The metadata on source file %s exceeds azcopy's current maximum metadata length, and cannot be processed.", order.Transfers.List[t].Source)) } srcBlobTagsLength := 0 if order.Transfers.List[t].BlobTags != nil { blobTagsStr := order.Transfers.List[t].BlobTags.ToString() srcBlobTagsLength = len(blobTagsStr) } if srcBlobTagsLength > math.MaxInt16 { panic(fmt.Sprintf("The length of tags %s exceeds maximum allowed length, and cannot be processed.", order.Transfers.List[t].BlobTags)) } // Create & initialize this transfer's Job Part Plan Transfer jppt := JobPartPlanTransfer{ SrcOffset: currentSrcStringOffset, // SrcOffset of the src string SrcLength: int16(len(order.Transfers.List[t].Source)), DstLength: int16(len(order.Transfers.List[t].Destination)), EntityType: order.Transfers.List[t].EntityType, ModifiedTime: order.Transfers.List[t].LastModifiedTime.UnixNano(), SourceSize: order.Transfers.List[t].SourceSize, CompletionTime: 0, // For S2S copy, per Transfer source's properties SrcContentTypeLength: int16(len(order.Transfers.List[t].ContentType)), SrcContentEncodingLength: int16(len(order.Transfers.List[t].ContentEncoding)), SrcContentLanguageLength: int16(len(order.Transfers.List[t].ContentLanguage)), SrcContentDispositionLength: int16(len(order.Transfers.List[t].ContentDisposition)), SrcCacheControlLength: int16(len(order.Transfers.List[t].CacheControl)), SrcContentMD5Length: int16(len(order.Transfers.List[t].ContentMD5)), SrcMetadataLength: int16(srcMetadataLength), SrcBlobTypeLength: int16(len(order.Transfers.List[t].BlobType)), SrcBlobTierLength: int16(len(order.Transfers.List[t].BlobTier)), SrcBlobVersionIDLength: int16(len(order.Transfers.List[t].BlobVersionID)), SrcBlobSnapshotIDLength: int16(len(order.Transfers.List[t].BlobSnapshotID)), SrcBlobTagsLength: int16(srcBlobTagsLength), atomicTransferStatus: common.ETransferStatus.Started(), // Default // ChunkNum: getNumChunks(uint64(order.Transfers.List[t].SourceSize), uint64(data.BlockSize)), } eof += writeValue(file, &jppt) // Write the transfer entry // The NEXT transfer's src/dst string come after THIS transfer's src/dst strings srcDstStringsOffset[t] = currentSrcStringOffset currentSrcStringOffset += int64(jppt.SrcLength + jppt.DstLength + jppt.SrcContentTypeLength + jppt.SrcContentEncodingLength + jppt.SrcContentLanguageLength + jppt.SrcContentDispositionLength + jppt.SrcCacheControlLength + jppt.SrcContentMD5Length + jppt.SrcMetadataLength + jppt.SrcBlobTypeLength + jppt.SrcBlobTierLength + jppt.SrcBlobVersionIDLength + jppt.SrcBlobSnapshotIDLength + jppt.SrcBlobTagsLength) } // All the transfers were written; now write each transfer's src/dst strings for t := range order.Transfers.List { // Sanity check: Verify that we are were we think we are and that no bug has occurred if eof != srcDstStringsOffset[t] { panic(errors.New("job plan file's EOF and the transfer's offset didn't line up; filename: " + order.Transfers.List[t].Source)) } // Write the src & dst strings to the job part plan file bytesWritten, err := file.WriteString(order.Transfers.List[t].Source) common.PanicIfErr(err) eof += int64(bytesWritten) // write the destination string in memory map file bytesWritten, err = file.WriteString(order.Transfers.List[t].Destination) common.PanicIfErr(err) eof += int64(bytesWritten) // For S2S copy (and, in the case of Content-MD5, always), write the src properties if len(order.Transfers.List[t].ContentType) != 0 { bytesWritten, err = file.WriteString(order.Transfers.List[t].ContentType) common.PanicIfErr(err) eof += int64(bytesWritten) } if len(order.Transfers.List[t].ContentEncoding) != 0 { bytesWritten, err = file.WriteString(order.Transfers.List[t].ContentEncoding) common.PanicIfErr(err) eof += int64(bytesWritten) } if len(order.Transfers.List[t].ContentLanguage) != 0 { bytesWritten, err = file.WriteString(order.Transfers.List[t].ContentLanguage) common.PanicIfErr(err) eof += int64(bytesWritten) } if len(order.Transfers.List[t].ContentDisposition) != 0 { bytesWritten, err = file.WriteString(order.Transfers.List[t].ContentDisposition) common.PanicIfErr(err) eof += int64(bytesWritten) } if len(order.Transfers.List[t].CacheControl) != 0 { bytesWritten, err = file.WriteString(order.Transfers.List[t].CacheControl) common.PanicIfErr(err) eof += int64(bytesWritten) } if order.Transfers.List[t].ContentMD5 != nil { // if non-nil but 0 len, will simply not be read by the consumer (since length is zero) bytesWritten, err = file.WriteString(string(order.Transfers.List[t].ContentMD5)) common.PanicIfErr(err) eof += int64(bytesWritten) } // For S2S copy, write the src metadata if order.Transfers.List[t].Metadata != nil { metadataStr, err := order.Transfers.List[t].Metadata.Marshal() common.PanicIfErr(err) bytesWritten, err = file.WriteString(metadataStr) common.PanicIfErr(err) eof += int64(bytesWritten) } if len(order.Transfers.List[t].BlobType) != 0 { bytesWritten, err = file.WriteString(string(order.Transfers.List[t].BlobType)) common.PanicIfErr(err) eof += int64(bytesWritten) } if len(order.Transfers.List[t].BlobTier) != 0 { bytesWritten, err = file.WriteString(string(order.Transfers.List[t].BlobTier)) common.PanicIfErr(err) eof += int64(bytesWritten) } if len(order.Transfers.List[t].BlobVersionID) != 0 { bytesWritten, err = file.WriteString(order.Transfers.List[t].BlobVersionID) common.PanicIfErr(err) eof += int64(bytesWritten) } if len(order.Transfers.List[t].BlobSnapshotID) != 0 { bytesWritten, err = file.WriteString(order.Transfers.List[t].BlobSnapshotID) common.PanicIfErr(err) eof += int64(bytesWritten) } // For S2S copy, write the source tags in job part plan transfer if len(order.Transfers.List[t].BlobTags) != 0 { blobTagsStr := order.Transfers.List[t].BlobTags.ToString() bytesWritten, err = file.WriteString(blobTagsStr) common.PanicIfErr(err) eof += int64(bytesWritten) } } // the file is closed to due to defer above }