in pipe/file.go [825:869]
func (p *fileConsumer) waitAndOpenNextFile() bool {
defer func() { log.E(p.waitForNextFileFinish(p.watcher)) }()
for {
//Need to start watching before p.nextFile() to avoid race condition
p.err = p.waitForNextFilePrepare()
if log.E(p.err) {
return true
}
nextFn, err := p.nextFile(p.topic, p.name)
if log.E(p.err) {
return true
}
// There was create file events while we were reading the directory in
// nextFile, so we need to start over to guarantee file ordering
if p.checkForNextFile(p.watcher) {
log.Debugf("directory was modified, restarting check for next file")
log.E(p.waitForNextFileFinish(p.watcher))
continue
}
if nextFn != "" && !strings.HasSuffix(nextFn, ".open") {
p.openFile(nextFn, p.offset)
p.offset = 0
return true
}
if p.cfg.NonBlocking {
return false
}
var ctxDone bool
ctxDone, p.err = p.waitForNextFile(p.watcher)
if log.E(err) {
return true
}
if ctxDone {
return false
}
log.E(p.waitForNextFileFinish(p.watcher))
}
}