func()

in component/xload/lister.go [135:225]


func (rl *remoteLister) Process(item *WorkItem) (int, error) {
	relPath := item.Path // TODO:: xload : check this for subdirectory mounting

	log.Debug("remoteLister::Process : Reading remote dir %s", relPath)

	// this block will be executed only in the first list call for the remote directory
	// so haven't made the listBlocked variable atomic
	if !rl.listBlocked {
		log.Debug("remoteLister::Process : Waiting for block-list-on-mount-sec before making the list call")
		err := waitForListTimeout()
		if err != nil {
			log.Err("remoteLister::Process : unable to unmarshal block-list-on-mount-sec [%s]", err.Error())
			return 0, err
		}
		rl.listBlocked = true
	}

	marker := ""
	var cnt, iteration int
	for {
		entries, new_marker, err := rl.GetRemote().StreamDir(internal.StreamDirOptions{
			Name:  relPath,
			Token: marker,
		})
		if err != nil {
			log.Err("remoteLister::Process : Remote listing failed for %s [%s]", relPath, err.Error())
			break
		}

		marker = new_marker
		cnt += len(entries)
		iteration++
		log.Debug("remoteLister::Process : count: %d , iterations: %d", cnt, iteration)

		// send number of items listed in current iteration to stats manager
		rl.GetStatsManager().AddStats(&StatsItem{
			Component:   LISTER,
			Name:        relPath,
			ListerCount: uint64(len(entries)),
		})

		for _, entry := range entries {
			log.Debug("remoteLister::Process : Iterating: %s, Is directory: %v", entry.Path, entry.IsDir())

			if entry.IsDir() {
				// create directory in local
				// spawn go routine for directory creation and then
				// adding to the input channel of the listing component
				// TODO:: xload : check how many threads can we spawn
				go func(name string) {
					localPath := filepath.Join(rl.path, name)
					err = rl.mkdir(localPath)
					// TODO:: xload : handle error
					if err != nil {
						log.Err("remoteLister::Process : Failed to create directory [%s]", err.Error())
						return
					}

					// push the directory to input pool for its listing
					rl.Schedule(&WorkItem{
						CompName: rl.GetName(),
						Path:     name,
					})
				}(entry.Path)
			} else {
				fileMode := rl.defaultPermission
				if !entry.IsModeDefault() {
					fileMode = entry.Mode
				}

				// send file to the splitter's channel for chunking
				rl.GetNext().Schedule(&WorkItem{
					CompName: rl.GetNext().GetName(),
					Path:     entry.Path,
					DataLen:  uint64(entry.Size),
					Mode:     fileMode,
					Atime:    entry.Atime,
					Mtime:    entry.Mtime,
					MD5:      entry.MD5,
				})
			}
		}

		if len(new_marker) == 0 {
			log.Debug("remoteLister::Process : remote listing done for %s", relPath)
			break
		}
	}

	return cnt, nil
}