in pipe/file.go [962:1011]
func (p *fileConsumer) openFile(nextFn string, offset int64) {
dir := filepath.Dir(p.topicPath(p.topic)) + "/"
p.file, p.err = p.fs.OpenRead(dir+nextFn, 0)
if log.E(p.err) {
return
}
defer func() {
if p.err != nil {
p.reader = nil
if p.file != nil {
log.E(p.file.Close())
}
p.file = nil
}
}()
p.reader = bufio.NewReader(p.file)
p.header.Delimited = p.cfg.FileDelimited
if !p.header.Delimited {
p.err = fmt.Errorf("cannot consume non delimited file")
log.E(p.err)
return
}
if p.header.Format == "json" || p.header.Format == "text" {
atomic.StoreInt64(&p.text, 1)
}
if offset != 0 {
log.E(p.file.Close())
p.file, p.err = p.fs.OpenRead(dir+nextFn, offset)
if log.E(p.err) {
return
}
p.reader = bufio.NewReader(p.file)
}
p.name = dir + nextFn
p.err = p.openFileInitFilter()
if p.err != nil {
return
}
p.metrics.FilesOpened.Inc(1)
log.Debugf("Consumer opened: %v, header: %+v", p.name, p.header)
}