func()

in cmd/zc_traverser_local.go [450:640]


func (t *localTraverser) prepareHashingThreads(preprocessor objectMorpher, processor objectProcessor, filters []ObjectFilter) (finalizer func(existingErr error) error, hashingProcessor func(obj StoredObject) error) {
	if t.targetHashType == common.ESyncHashType.None() { // if no hashing is needed, do nothing.
		return func(existingErr error) error {
			return existingErr // nothing to overwrite with, no-op
		}, processor
	}

	// set up for threaded hashing
	t.hashTargetChannel = make(chan string, 1_000) // "reasonable" backlog
	// Use half of the available CPU cores for hashing to prevent throttling the STE too hard if hashing is still occurring when the first job part gets sent out
	hashingThreadCount := runtime.NumCPU() / 2
	hashError := make(chan error, hashingThreadCount)
	wg := &sync.WaitGroup{}
	immediateStopHashing := int32(0)

	// create return wrapper to handle hashing errors
	finalizer = func(existingErr error) error {
		if existingErr != nil {
			close(t.hashTargetChannel)                  // stop sending hashes
			atomic.StoreInt32(&immediateStopHashing, 1) // force the end of hashing
			wg.Wait()                                   // Await the finalization of all hashing

			return existingErr // discard all hashing errors
		} else {
			close(t.hashTargetChannel) // stop sending hashes

			wg.Wait()                    // Await the finalization of all hashing
			close(hashError)             // close out the error channel
			for err := range hashError { // inspect all hashing errors
				if err != nil {
					return err
				}
			}

			return nil
		}
	}

	// wrap the processor, preventing a data race
	commitMutex := &sync.Mutex{}
	mutexProcessor := func(proc objectProcessor) objectProcessor {
		return func(object StoredObject) error {
			commitMutex.Lock() // prevent committing two objects at once to prevent a data race
			defer commitMutex.Unlock()
			err := proc(object)

			return err
		}
	}
	processor = mutexProcessor(processor)

	// spin up hashing threads
	for i := 0; i < hashingThreadCount; i++ {
		wg.Add(1)

		go func() {
			defer wg.Done() // mark the hashing thread as completed

			for relPath := range t.hashTargetChannel {
				if atomic.LoadInt32(&immediateStopHashing) == 1 { // should we stop hashing?
					return
				}

				fullPath := filepath.Join(t.fullPath, relPath)
				fi, err := os.Stat(fullPath) // query LMT & if it's a directory
				if err != nil {
					err = fmt.Errorf("failed to get properties of file result %s: %w", relPath, err)
					hashError <- err
					return
				}

				if fi.IsDir() { // this should never happen
					panic(relPath)
				}

				f, err := os.OpenFile(fullPath, os.O_RDONLY, 0644) // perm is not used here since it's RO
				if err != nil {
					err = fmt.Errorf("failed to open file for reading result %s: %w", relPath, err)
					hashError <- err
					return
				}

				var hasher hash.Hash // set up hasher
				switch t.targetHashType {
				case common.ESyncHashType.MD5():
					hasher = md5.New()
				}

				// hash.Hash provides a writer type, allowing us to do a (small, 32MB to be precise) buffered write into the hasher and avoid memory concerns
				_, err = io.Copy(hasher, f)
				if err != nil {
					err = fmt.Errorf("failed to read file into hasher result %s: %w", relPath, err)
					hashError <- err
					return
				}

				sum := hasher.Sum([]byte{})

				hashData := common.SyncHashData{
					Mode: t.targetHashType,
					Data: base64.StdEncoding.EncodeToString(sum),
					LMT:  fi.ModTime(),
				}

				// failing to store hash data doesn't mean we can't transfer (e.g. RO directory)
				err = t.hashAdapter.SetHashData(relPath, &hashData)
				if err != nil {
					common.LogHashStorageFailure()
					if azcopyScanningLogger != nil {
						azcopyScanningLogger.Log(common.LogError, fmt.Sprintf("failed to write hash data for %s: %s", relPath, err.Error()))
					}
				}

				err = processIfPassedFilters(filters,
					newStoredObject(
						func(storedObject *StoredObject) {
							// apply the hash data
							// storedObject.hashData = hashData
							switch hashData.Mode {
							case common.ESyncHashType.MD5():
								storedObject.md5 = sum
							default: // no-op
							}

							if preprocessor != nil {
								// apply the original preprocessor
								preprocessor(storedObject)
							}
						},
						fi.Name(),
						strings.ReplaceAll(relPath, common.DeterminePathSeparator(t.fullPath), common.AZCOPY_PATH_SEPARATOR_STRING),

						common.EEntityType.File(),
						fi.ModTime(),
						fi.Size(),
						noContentProps, // Local MD5s are computed in the STE, and other props don't apply to local files
						noBlobProps,
						noMetadata,
						"", // Local has no such thing as containers
					),
					processor, // the original processor is wrapped in the mutex processor.
				)
				_, err = getProcessingError(err)
				if err != nil {
					hashError <- err
					return
				}
			}
		}()
	}

	// wrap the processor, try to grab hashes, or defer processing to the goroutines
	hashingProcessor = func(storedObject StoredObject) error {
		if storedObject.entityType != common.EEntityType.File() {
			// the original processor is wrapped in the mutex processor.
			return processor(storedObject) // no process folders
		}

		if strings.HasSuffix(path.Base(storedObject.relativePath), common.AzCopyHashDataStream) {
			return nil // do not process hash data files.
		}

		hashData, err := t.GetHashData(storedObject.relativePath)

		if err != nil {
			switch err {
			case ErrorNoHashPresent, ErrorHashNoLongerValid, ErrorHashNotCompatible:
				// the original processor is wrapped in the mutex processor.
				return processor(storedObject) // There is no hash data, so this file will be overwritten (in theory).
			case ErrorHashAsyncCalculation:
				return nil // File will be processed later
			default:
				return err // Cannot get or create hash data for some reason
			}
		}

		// storedObject.hashData = hashData
		switch hashData.Mode {
		case common.ESyncHashType.MD5():
			md5data, _ := base64.StdEncoding.DecodeString(hashData.Data) // If decode fails, treat it like no hash is present.
			storedObject.md5 = md5data
		default: // do nothing, no hash is present.
		}

		// delay the mutex until after potentially long-running operations
		// the original processor is wrapped in the mutex processor.
		return processor(storedObject)
	}

	return finalizer, hashingProcessor
}