in pipe/file.go [462:512]
func (p *fileProducer) newFile(key string) error {
if err := p.fs.MkdirAll(filepath.Dir(p.topicPath(p.topic)), dirPerm); err != nil {
return err
}
n := p.newFileName(key)
w, seeker, err := p.fs.OpenWrite(n)
if err != nil {
return err
}
// continue writing works for uncompressed unencrypted files only
var offset int64
if seeker != nil {
offset, err = seeker.Seek(0, io.SeekEnd)
if err != nil {
return err
}
}
h := sha256.New()
hw := &hashWriter{w, h, p.metrics, nil}
var writer flushWriteCloser = hw
if p.cfg.Encryption.Enabled {
w, err := p.initCrypterWriter(n, writer)
if err != nil {
return err
}
writer = &chainer{&noopFlusher{w}, writer}
}
writer = &chainer{&flushClose{bufio.NewWriter(writer)}, writer}
if p.cfg.Compression {
writer = &chainer{gzip.NewWriter(writer), writer}
}
_ = p.closeFile(p.files[key], true)
log.Debugf("Opened: %v, %v compression: %v", key, n, p.cfg.Compression)
f := &file{n, key, w, seeker, h, offset, 0, writer, p.flast, nil, offset}
hw.f = f
listInsert(p, f)
p.files[key] = f
p.metrics.FilesOpened.Inc(1)
return nil
}