func()

in src/go/pkg/cachewarmer/worker.go [86:164]


func (w *Worker) processWorkerJob(ctx context.Context, workerJob *WorkerJob) error {
	log.Debug.Printf("[Worker.processWorkerJob")
	defer log.Debug.Printf("Worker.processWorkerJob]")

	localPaths, err := w.mountAllWorkingPaths(workerJob)
	if err != nil {
		return fmt.Errorf("error mounting working paths: %v", err)
	}

	// randomly choose a mount path
	readPath := localPaths[rand.Intn(len(localPaths))]

	// is the workitem a file or directory
	isDirectory, err := IsDirectory(readPath)
	if err != nil {
		return fmt.Errorf("error determining type of path '%s': '%v'", readPath, err)
	}

	if isDirectory {
		log.Info.Printf("Queueing work items for directory %s", readPath)
		f, err := os.Open(readPath)
		if err != nil {
			return fmt.Errorf("error reading files from directory '%s': '%v'", readPath, err)
		}
		defer f.Close()
		lastRefreshVisibility := time.Now()
		workItemsQueued := 0
		for {
			// refresh the invisibility timer, so no-one steals it
			if time.Since(lastRefreshVisibility) > refreshWorkInterval {
				lastRefreshVisibility = time.Now()
				if err := w.Queues.StillProcessingWorkerJob(workerJob); err != nil {
					log.Error.Printf("error refreshing queue item: '%v'", err)
				}

			}
			dirEntries, err := f.Readdir(MinimumJobsOnDirRead)

			if len(dirEntries) == 0 && err == io.EOF {
				log.Info.Printf("finished reading directory '%s'", readPath)
				break
			}

			if err != nil && err != io.EOF {
				log.Error.Printf("error reading directory from directory '%s': '%v'", readPath, err)
				break
			}

			filteredFilenames := workerJob.FilterFiles(dirEntries)
			if len(filteredFilenames) > 0 {
				workItemsQueued += w.QueueWork(localPaths, filteredFilenames)
			}

			// verify that cancellation has not occurred
			if isCancelled(ctx) {
				break
			}
		}
		if workItemsQueued > 0 {
			log.Info.Printf("add %d jobs to the work queue [%d mounts]", workItemsQueued, len(localPaths))
		}
	} else if workerJob.StartByte == allFilesOrBytes || workerJob.StopByte == allFilesOrBytes {
		log.Info.Printf("Queueing work item for file %s", readPath)
		fileToWarm := InitializeFileToWarm(readPath, allFilesOrBytes, allFilesOrBytes)
		w.workQueue.AddWorkItem(fileToWarm)
	} else {
		// queue the file for read
		for i := workerJob.StartByte; i < workerJob.StopByte; i += MinimumSingleFileSize {
			end := i + MinimumSingleFileSize
			if end > workerJob.StopByte {
				end = workerJob.StopByte
			}
			log.Info.Printf("Queueing work item for file %s [%d,%d)", readPath, i, end)
			fileToWarm := InitializeFileToWarm(readPath, i, end)
			w.workQueue.AddWorkItem(fileToWarm)
		}
	}
	return nil
}