func()

in pipe/file.go [462:512]


func (p *fileProducer) newFile(key string) error {
	if err := p.fs.MkdirAll(filepath.Dir(p.topicPath(p.topic)), dirPerm); err != nil {
		return err
	}

	n := p.newFileName(key)
	w, seeker, err := p.fs.OpenWrite(n)
	if err != nil {
		return err
	}

	// continue writing works for uncompressed unencrypted files only
	var offset int64
	if seeker != nil {
		offset, err = seeker.Seek(0, io.SeekEnd)
		if err != nil {
			return err
		}
	}

	h := sha256.New()
	hw := &hashWriter{w, h, p.metrics, nil}
	var writer flushWriteCloser = hw

	if p.cfg.Encryption.Enabled {
		w, err := p.initCrypterWriter(n, writer)
		if err != nil {
			return err
		}
		writer = &chainer{&noopFlusher{w}, writer}
	}

	writer = &chainer{&flushClose{bufio.NewWriter(writer)}, writer}
	if p.cfg.Compression {
		writer = &chainer{gzip.NewWriter(writer), writer}
	}

	_ = p.closeFile(p.files[key], true)

	log.Debugf("Opened: %v, %v compression: %v", key, n, p.cfg.Compression)

	f := &file{n, key, w, seeker, h, offset, 0, writer, p.flast, nil, offset}
	hw.f = f

	listInsert(p, f)
	p.files[key] = f

	p.metrics.FilesOpened.Inc(1)

	return nil
}