in cmd/zc_traverser_local.go [450:640]
func (t *localTraverser) prepareHashingThreads(preprocessor objectMorpher, processor objectProcessor, filters []ObjectFilter) (finalizer func(existingErr error) error, hashingProcessor func(obj StoredObject) error) {
if t.targetHashType == common.ESyncHashType.None() { // if no hashing is needed, do nothing.
return func(existingErr error) error {
return existingErr // nothing to overwrite with, no-op
}, processor
}
// set up for threaded hashing
t.hashTargetChannel = make(chan string, 1_000) // "reasonable" backlog
// Use half of the available CPU cores for hashing to prevent throttling the STE too hard if hashing is still occurring when the first job part gets sent out
hashingThreadCount := runtime.NumCPU() / 2
hashError := make(chan error, hashingThreadCount)
wg := &sync.WaitGroup{}
immediateStopHashing := int32(0)
// create return wrapper to handle hashing errors
finalizer = func(existingErr error) error {
if existingErr != nil {
close(t.hashTargetChannel) // stop sending hashes
atomic.StoreInt32(&immediateStopHashing, 1) // force the end of hashing
wg.Wait() // Await the finalization of all hashing
return existingErr // discard all hashing errors
} else {
close(t.hashTargetChannel) // stop sending hashes
wg.Wait() // Await the finalization of all hashing
close(hashError) // close out the error channel
for err := range hashError { // inspect all hashing errors
if err != nil {
return err
}
}
return nil
}
}
// wrap the processor, preventing a data race
commitMutex := &sync.Mutex{}
mutexProcessor := func(proc objectProcessor) objectProcessor {
return func(object StoredObject) error {
commitMutex.Lock() // prevent committing two objects at once to prevent a data race
defer commitMutex.Unlock()
err := proc(object)
return err
}
}
processor = mutexProcessor(processor)
// spin up hashing threads
for i := 0; i < hashingThreadCount; i++ {
wg.Add(1)
go func() {
defer wg.Done() // mark the hashing thread as completed
for relPath := range t.hashTargetChannel {
if atomic.LoadInt32(&immediateStopHashing) == 1 { // should we stop hashing?
return
}
fullPath := filepath.Join(t.fullPath, relPath)
fi, err := os.Stat(fullPath) // query LMT & if it's a directory
if err != nil {
err = fmt.Errorf("failed to get properties of file result %s: %w", relPath, err)
hashError <- err
return
}
if fi.IsDir() { // this should never happen
panic(relPath)
}
f, err := os.OpenFile(fullPath, os.O_RDONLY, 0644) // perm is not used here since it's RO
if err != nil {
err = fmt.Errorf("failed to open file for reading result %s: %w", relPath, err)
hashError <- err
return
}
var hasher hash.Hash // set up hasher
switch t.targetHashType {
case common.ESyncHashType.MD5():
hasher = md5.New()
}
// hash.Hash provides a writer type, allowing us to do a (small, 32MB to be precise) buffered write into the hasher and avoid memory concerns
_, err = io.Copy(hasher, f)
if err != nil {
err = fmt.Errorf("failed to read file into hasher result %s: %w", relPath, err)
hashError <- err
return
}
sum := hasher.Sum([]byte{})
hashData := common.SyncHashData{
Mode: t.targetHashType,
Data: base64.StdEncoding.EncodeToString(sum),
LMT: fi.ModTime(),
}
// failing to store hash data doesn't mean we can't transfer (e.g. RO directory)
err = t.hashAdapter.SetHashData(relPath, &hashData)
if err != nil {
common.LogHashStorageFailure()
if azcopyScanningLogger != nil {
azcopyScanningLogger.Log(common.LogError, fmt.Sprintf("failed to write hash data for %s: %s", relPath, err.Error()))
}
}
err = processIfPassedFilters(filters,
newStoredObject(
func(storedObject *StoredObject) {
// apply the hash data
// storedObject.hashData = hashData
switch hashData.Mode {
case common.ESyncHashType.MD5():
storedObject.md5 = sum
default: // no-op
}
if preprocessor != nil {
// apply the original preprocessor
preprocessor(storedObject)
}
},
fi.Name(),
strings.ReplaceAll(relPath, common.DeterminePathSeparator(t.fullPath), common.AZCOPY_PATH_SEPARATOR_STRING),
common.EEntityType.File(),
fi.ModTime(),
fi.Size(),
noContentProps, // Local MD5s are computed in the STE, and other props don't apply to local files
noBlobProps,
noMetadata,
"", // Local has no such thing as containers
),
processor, // the original processor is wrapped in the mutex processor.
)
_, err = getProcessingError(err)
if err != nil {
hashError <- err
return
}
}
}()
}
// wrap the processor, try to grab hashes, or defer processing to the goroutines
hashingProcessor = func(storedObject StoredObject) error {
if storedObject.entityType != common.EEntityType.File() {
// the original processor is wrapped in the mutex processor.
return processor(storedObject) // no process folders
}
if strings.HasSuffix(path.Base(storedObject.relativePath), common.AzCopyHashDataStream) {
return nil // do not process hash data files.
}
hashData, err := t.GetHashData(storedObject.relativePath)
if err != nil {
switch err {
case ErrorNoHashPresent, ErrorHashNoLongerValid, ErrorHashNotCompatible:
// the original processor is wrapped in the mutex processor.
return processor(storedObject) // There is no hash data, so this file will be overwritten (in theory).
case ErrorHashAsyncCalculation:
return nil // File will be processed later
default:
return err // Cannot get or create hash data for some reason
}
}
// storedObject.hashData = hashData
switch hashData.Mode {
case common.ESyncHashType.MD5():
md5data, _ := base64.StdEncoding.DecodeString(hashData.Data) // If decode fails, treat it like no hash is present.
storedObject.md5 = md5data
default: // do nothing, no hash is present.
}
// delay the mutex until after potentially long-running operations
// the original processor is wrapped in the mutex processor.
return processor(storedObject)
}
return finalizer, hashingProcessor
}