in src/go/pkg/cachewarmer/worker.go [86:164]
func (w *Worker) processWorkerJob(ctx context.Context, workerJob *WorkerJob) error {
log.Debug.Printf("[Worker.processWorkerJob")
defer log.Debug.Printf("Worker.processWorkerJob]")
localPaths, err := w.mountAllWorkingPaths(workerJob)
if err != nil {
return fmt.Errorf("error mounting working paths: %v", err)
}
// randomly choose a mount path
readPath := localPaths[rand.Intn(len(localPaths))]
// is the workitem a file or directory
isDirectory, err := IsDirectory(readPath)
if err != nil {
return fmt.Errorf("error determining type of path '%s': '%v'", readPath, err)
}
if isDirectory {
log.Info.Printf("Queueing work items for directory %s", readPath)
f, err := os.Open(readPath)
if err != nil {
return fmt.Errorf("error reading files from directory '%s': '%v'", readPath, err)
}
defer f.Close()
lastRefreshVisibility := time.Now()
workItemsQueued := 0
for {
// refresh the invisibility timer, so no-one steals it
if time.Since(lastRefreshVisibility) > refreshWorkInterval {
lastRefreshVisibility = time.Now()
if err := w.Queues.StillProcessingWorkerJob(workerJob); err != nil {
log.Error.Printf("error refreshing queue item: '%v'", err)
}
}
dirEntries, err := f.Readdir(MinimumJobsOnDirRead)
if len(dirEntries) == 0 && err == io.EOF {
log.Info.Printf("finished reading directory '%s'", readPath)
break
}
if err != nil && err != io.EOF {
log.Error.Printf("error reading directory from directory '%s': '%v'", readPath, err)
break
}
filteredFilenames := workerJob.FilterFiles(dirEntries)
if len(filteredFilenames) > 0 {
workItemsQueued += w.QueueWork(localPaths, filteredFilenames)
}
// verify that cancellation has not occurred
if isCancelled(ctx) {
break
}
}
if workItemsQueued > 0 {
log.Info.Printf("add %d jobs to the work queue [%d mounts]", workItemsQueued, len(localPaths))
}
} else if workerJob.StartByte == allFilesOrBytes || workerJob.StopByte == allFilesOrBytes {
log.Info.Printf("Queueing work item for file %s", readPath)
fileToWarm := InitializeFileToWarm(readPath, allFilesOrBytes, allFilesOrBytes)
w.workQueue.AddWorkItem(fileToWarm)
} else {
// queue the file for read
for i := workerJob.StartByte; i < workerJob.StopByte; i += MinimumSingleFileSize {
end := i + MinimumSingleFileSize
if end > workerJob.StopByte {
end = workerJob.StopByte
}
log.Info.Printf("Queueing work item for file %s [%d,%d)", readPath, i, end)
fileToWarm := InitializeFileToWarm(readPath, i, end)
w.workQueue.AddWorkItem(fileToWarm)
}
}
return nil
}