func()

in pipe/file.go [825:869]


func (p *fileConsumer) waitAndOpenNextFile() bool {
	defer func() { log.E(p.waitForNextFileFinish(p.watcher)) }()
	for {
		//Need to start watching before p.nextFile() to avoid race condition
		p.err = p.waitForNextFilePrepare()
		if log.E(p.err) {
			return true
		}

		nextFn, err := p.nextFile(p.topic, p.name)
		if log.E(p.err) {
			return true
		}

		// There was create file events while we were reading the directory in
		// nextFile, so we need to start over to guarantee file ordering
		if p.checkForNextFile(p.watcher) {
			log.Debugf("directory was modified, restarting check for next file")
			log.E(p.waitForNextFileFinish(p.watcher))
			continue
		}

		if nextFn != "" && !strings.HasSuffix(nextFn, ".open") {
			p.openFile(nextFn, p.offset)
			p.offset = 0
			return true
		}

		if p.cfg.NonBlocking {
			return false
		}

		var ctxDone bool
		ctxDone, p.err = p.waitForNextFile(p.watcher)
		if log.E(err) {
			return true
		}

		if ctxDone {
			return false
		}

		log.E(p.waitForNextFileFinish(p.watcher))
	}
}