in component/xload/lister.go [135:225]
func (rl *remoteLister) Process(item *WorkItem) (int, error) {
relPath := item.Path // TODO:: xload : check this for subdirectory mounting
log.Debug("remoteLister::Process : Reading remote dir %s", relPath)
// this block will be executed only in the first list call for the remote directory
// so haven't made the listBlocked variable atomic
if !rl.listBlocked {
log.Debug("remoteLister::Process : Waiting for block-list-on-mount-sec before making the list call")
err := waitForListTimeout()
if err != nil {
log.Err("remoteLister::Process : unable to unmarshal block-list-on-mount-sec [%s]", err.Error())
return 0, err
}
rl.listBlocked = true
}
marker := ""
var cnt, iteration int
for {
entries, new_marker, err := rl.GetRemote().StreamDir(internal.StreamDirOptions{
Name: relPath,
Token: marker,
})
if err != nil {
log.Err("remoteLister::Process : Remote listing failed for %s [%s]", relPath, err.Error())
break
}
marker = new_marker
cnt += len(entries)
iteration++
log.Debug("remoteLister::Process : count: %d , iterations: %d", cnt, iteration)
// send number of items listed in current iteration to stats manager
rl.GetStatsManager().AddStats(&StatsItem{
Component: LISTER,
Name: relPath,
ListerCount: uint64(len(entries)),
})
for _, entry := range entries {
log.Debug("remoteLister::Process : Iterating: %s, Is directory: %v", entry.Path, entry.IsDir())
if entry.IsDir() {
// create directory in local
// spawn go routine for directory creation and then
// adding to the input channel of the listing component
// TODO:: xload : check how many threads can we spawn
go func(name string) {
localPath := filepath.Join(rl.path, name)
err = rl.mkdir(localPath)
// TODO:: xload : handle error
if err != nil {
log.Err("remoteLister::Process : Failed to create directory [%s]", err.Error())
return
}
// push the directory to input pool for its listing
rl.Schedule(&WorkItem{
CompName: rl.GetName(),
Path: name,
})
}(entry.Path)
} else {
fileMode := rl.defaultPermission
if !entry.IsModeDefault() {
fileMode = entry.Mode
}
// send file to the splitter's channel for chunking
rl.GetNext().Schedule(&WorkItem{
CompName: rl.GetNext().GetName(),
Path: entry.Path,
DataLen: uint64(entry.Size),
Mode: fileMode,
Atime: entry.Atime,
Mtime: entry.Mtime,
MD5: entry.MD5,
})
}
}
if len(new_marker) == 0 {
log.Debug("remoteLister::Process : remote listing done for %s", relPath)
break
}
}
return cnt, nil
}