func()

in src/go/pkg/cachewarmer/warmpathmanager.go [114:215]


func (m *WarmPathManager) processJob(ctx context.Context, warmPathJob *WarmPathJob) error {
	id, popReceipt := warmPathJob.GetQueueMessageInfo()
	log.Info.Printf("[WarmPathManager.processJob '%s' '%s'", id, popReceipt)
	defer log.Info.Printf("WarmPathManager.processJob '%s' '%s']", id, popReceipt)

	if len(warmPathJob.WarmTargetMountAddresses) == 0 {
		if err := m.Queues.DeleteWarmPathJob(warmPathJob); err != nil {
			log.Error.Printf("error removing job: %v", err)
		}
		return fmt.Errorf("there are no mount addresses specified in the job")
	}

	localMountPath := GetLocalMountPath(warmPathJob.WarmTargetMountAddresses[0], warmPathJob.WarmTargetExportPath)
	if err := MountPath(warmPathJob.WarmTargetMountAddresses[0], warmPathJob.WarmTargetExportPath, localMountPath); err != nil {
		return fmt.Errorf("error trying to mount %s:%s: %v", warmPathJob.WarmTargetMountAddresses[0], warmPathJob.WarmTargetExportPath, err)
	}

	log.Status.Printf("start processing %s", warmPathJob.WarmTargetPath)
	defer log.Status.Printf("stop processing %s", warmPathJob.WarmTargetPath)

	lastRefreshVisibility := time.Now()
	folderSlice := []string{warmPathJob.WarmTargetPath}
	for len(folderSlice) > 0 {
		// check for cancelation between files
		if isCancelled(ctx) {
			log.Info.Printf("cancelation occurred while processing job files")
			return nil
		}
		if time.Since(lastRefreshVisibility) > refreshWorkInterval {
			lastRefreshVisibility = time.Now()
			m.Queues.StillProcessingWarmPathJob(warmPathJob)
		}

		// dequeue the next folder
		warmFolder := folderSlice[len(folderSlice)-1]
		folderSlice[len(folderSlice)-1] = ""
		folderSlice = folderSlice[:len(folderSlice)-1]

		// queue up additional folders
		fullWarmPath := path.Join(localMountPath, warmFolder)
		dirEntries, err := ioutil.ReadDir(fullWarmPath)
		if err != nil {
			log.Error.Printf("error encountered reading directory '%s': %v", warmFolder, err)
			continue
		}

		files, largeFiles, dirs := processDirEntries(dirEntries, warmPathJob)

		// queue the directories
		for _, dir := range dirs {
			folderSlice = append(folderSlice, path.Join(warmFolder, dir.Name()))
		}

		// write a job for each large file
		for _, largeFile := range largeFiles {
			fullPath := path.Join(warmFolder, largeFile.Name())

			fileSize := largeFile.Size()
			for i := int64(0); i < fileSize; i += MaximumJobSize {
				end := i + MaximumJobSize
				if end > fileSize {
					end = fileSize
				}
				log.Info.Printf("queuing worker job for file %s [%d,%d)", fullPath, i, end)
				workerJob := InitializeWorkerJobForLargeFile(warmPathJob.WarmTargetMountAddresses, warmPathJob.WarmTargetExportPath, fullPath, i, end, warmPathJob.InclusionList, warmPathJob.ExclusionList, warmPathJob.MaxFileSizeBytes)
				if err := m.Queues.WriteWorkerJob(workerJob); err != nil {
					log.Error.Printf("error encountered writing worker job '%s': %v", fullPath, err)
				}
			}
		}

		// write a job for each group of files
		if len(files) > 0 {
			if len(files) < MaximumFilesToRead {
				log.Info.Printf("queuing job for path %s", warmFolder)
				workerJob := InitializeWorkerJob(warmPathJob.WarmTargetMountAddresses, warmPathJob.WarmTargetExportPath, warmFolder, warmPathJob.InclusionList, warmPathJob.ExclusionList, warmPathJob.MaxFileSizeBytes)
				if err := m.Queues.WriteWorkerJob(workerJob); err != nil {
					log.Error.Printf("error encountered writing worker job '%s': %v", warmFolder, err)
				}
			} else {
				for i := 0; i < len(files); i += MaximumFilesToRead {
					end := i + MaximumFilesToRead
					if end >= len(files) {
						end = len(files) - 1
					}
					log.Info.Printf("queuing job for path %s [%s,%s]", warmFolder, files[i].Name(), files[end].Name())
					workerJob := InitializeWorkerJobWithFilter(warmPathJob.WarmTargetMountAddresses, warmPathJob.WarmTargetExportPath, warmFolder, files[i].Name(), files[end].Name(), warmPathJob.InclusionList, warmPathJob.ExclusionList, warmPathJob.MaxFileSizeBytes)
					if err := m.Queues.WriteWorkerJob(workerJob); err != nil {
						log.Error.Printf("error encountered writing worker job %s [%s,%s]: %v", warmFolder, files[i].Name(), files[end].Name(), err)
					}
				}
			}
		}
	}

	// remove the job file
	if err := m.Queues.DeleteWarmPathJob(warmPathJob); err != nil {
		log.Error.Printf("error removing job '%s' '%s' at end of processing: %v", id, popReceipt, err)
	}

	return nil
}