in pipe/file.go [1035:1073]
func (p *fileConsumer) fetchNextLow() bool {
//reader and file can be nil when directory is empty during
//NewConsumer
if p.reader != nil {
p.writeMessage()
if p.err == nil {
return true
}
if p.err != io.EOF && (!p.cfg.Compression || p.err != io.ErrUnexpectedEOF) {
log.E(p.err)
return true
}
log.E(p.file.Close())
p.reader = nil
p.file = nil
if p.pgpMD != nil && p.pgpMD.IsSigned && p.pgpMD.SignedBy != nil {
if p.pgpMD.SignatureError != nil {
log.Errorf("signature error: %v", p.pgpMD.SignatureError)
} else {
if p.pgpMD.SignedBy.PublicKey != nil {
log.Debugf("valid signature, signed by: %x", p.pgpMD.SignedBy.PublicKey.Fingerprint)
}
}
}
log.Debugf("Consumer closed: %v", p.name)
if atomic.LoadInt64(&p.text) == 1 && p.cfg.FileDelimited && len(p.msg) != 0 {
p.err = fmt.Errorf("corrupted file. Not ending with delimiter: %v %v", p.name, string(p.msg))
return true
}
p.err = nil
}
return false
}