func()

in pipe/file.go [962:1011]


func (p *fileConsumer) openFile(nextFn string, offset int64) {
	dir := filepath.Dir(p.topicPath(p.topic)) + "/"
	p.file, p.err = p.fs.OpenRead(dir+nextFn, 0)
	if log.E(p.err) {
		return
	}

	defer func() {
		if p.err != nil {
			p.reader = nil
			if p.file != nil {
				log.E(p.file.Close())
			}
			p.file = nil
		}
	}()

	p.reader = bufio.NewReader(p.file)

	p.header.Delimited = p.cfg.FileDelimited

	if !p.header.Delimited {
		p.err = fmt.Errorf("cannot consume non delimited file")
		log.E(p.err)
		return
	}

	if p.header.Format == "json" || p.header.Format == "text" {
		atomic.StoreInt64(&p.text, 1)
	}

	if offset != 0 {
		log.E(p.file.Close())
		p.file, p.err = p.fs.OpenRead(dir+nextFn, offset)
		if log.E(p.err) {
			return
		}
		p.reader = bufio.NewReader(p.file)
	}

	p.name = dir + nextFn

	p.err = p.openFileInitFilter()
	if p.err != nil {
		return
	}

	p.metrics.FilesOpened.Inc(1)
	log.Debugf("Consumer opened: %v, header: %+v", p.name, p.header)
}