in src/go/pkg/cachewarmer/warmpathmanager.go [114:215]
func (m *WarmPathManager) processJob(ctx context.Context, warmPathJob *WarmPathJob) error {
id, popReceipt := warmPathJob.GetQueueMessageInfo()
log.Info.Printf("[WarmPathManager.processJob '%s' '%s'", id, popReceipt)
defer log.Info.Printf("WarmPathManager.processJob '%s' '%s']", id, popReceipt)
if len(warmPathJob.WarmTargetMountAddresses) == 0 {
if err := m.Queues.DeleteWarmPathJob(warmPathJob); err != nil {
log.Error.Printf("error removing job: %v", err)
}
return fmt.Errorf("there are no mount addresses specified in the job")
}
localMountPath := GetLocalMountPath(warmPathJob.WarmTargetMountAddresses[0], warmPathJob.WarmTargetExportPath)
if err := MountPath(warmPathJob.WarmTargetMountAddresses[0], warmPathJob.WarmTargetExportPath, localMountPath); err != nil {
return fmt.Errorf("error trying to mount %s:%s: %v", warmPathJob.WarmTargetMountAddresses[0], warmPathJob.WarmTargetExportPath, err)
}
log.Status.Printf("start processing %s", warmPathJob.WarmTargetPath)
defer log.Status.Printf("stop processing %s", warmPathJob.WarmTargetPath)
lastRefreshVisibility := time.Now()
folderSlice := []string{warmPathJob.WarmTargetPath}
for len(folderSlice) > 0 {
// check for cancelation between files
if isCancelled(ctx) {
log.Info.Printf("cancelation occurred while processing job files")
return nil
}
if time.Since(lastRefreshVisibility) > refreshWorkInterval {
lastRefreshVisibility = time.Now()
m.Queues.StillProcessingWarmPathJob(warmPathJob)
}
// dequeue the next folder
warmFolder := folderSlice[len(folderSlice)-1]
folderSlice[len(folderSlice)-1] = ""
folderSlice = folderSlice[:len(folderSlice)-1]
// queue up additional folders
fullWarmPath := path.Join(localMountPath, warmFolder)
dirEntries, err := ioutil.ReadDir(fullWarmPath)
if err != nil {
log.Error.Printf("error encountered reading directory '%s': %v", warmFolder, err)
continue
}
files, largeFiles, dirs := processDirEntries(dirEntries, warmPathJob)
// queue the directories
for _, dir := range dirs {
folderSlice = append(folderSlice, path.Join(warmFolder, dir.Name()))
}
// write a job for each large file
for _, largeFile := range largeFiles {
fullPath := path.Join(warmFolder, largeFile.Name())
fileSize := largeFile.Size()
for i := int64(0); i < fileSize; i += MaximumJobSize {
end := i + MaximumJobSize
if end > fileSize {
end = fileSize
}
log.Info.Printf("queuing worker job for file %s [%d,%d)", fullPath, i, end)
workerJob := InitializeWorkerJobForLargeFile(warmPathJob.WarmTargetMountAddresses, warmPathJob.WarmTargetExportPath, fullPath, i, end, warmPathJob.InclusionList, warmPathJob.ExclusionList, warmPathJob.MaxFileSizeBytes)
if err := m.Queues.WriteWorkerJob(workerJob); err != nil {
log.Error.Printf("error encountered writing worker job '%s': %v", fullPath, err)
}
}
}
// write a job for each group of files
if len(files) > 0 {
if len(files) < MaximumFilesToRead {
log.Info.Printf("queuing job for path %s", warmFolder)
workerJob := InitializeWorkerJob(warmPathJob.WarmTargetMountAddresses, warmPathJob.WarmTargetExportPath, warmFolder, warmPathJob.InclusionList, warmPathJob.ExclusionList, warmPathJob.MaxFileSizeBytes)
if err := m.Queues.WriteWorkerJob(workerJob); err != nil {
log.Error.Printf("error encountered writing worker job '%s': %v", warmFolder, err)
}
} else {
for i := 0; i < len(files); i += MaximumFilesToRead {
end := i + MaximumFilesToRead
if end >= len(files) {
end = len(files) - 1
}
log.Info.Printf("queuing job for path %s [%s,%s]", warmFolder, files[i].Name(), files[end].Name())
workerJob := InitializeWorkerJobWithFilter(warmPathJob.WarmTargetMountAddresses, warmPathJob.WarmTargetExportPath, warmFolder, files[i].Name(), files[end].Name(), warmPathJob.InclusionList, warmPathJob.ExclusionList, warmPathJob.MaxFileSizeBytes)
if err := m.Queues.WriteWorkerJob(workerJob); err != nil {
log.Error.Printf("error encountered writing worker job %s [%s,%s]: %v", warmFolder, files[i].Name(), files[end].Name(), err)
}
}
}
}
}
// remove the job file
if err := m.Queues.DeleteWarmPathJob(warmPathJob); err != nil {
log.Error.Printf("error removing job '%s' '%s' at end of processing: %v", id, popReceipt, err)
}
return nil
}