pipe/file.go (936 lines of code) (raw):
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package pipe
import (
"bufio"
"bytes"
"compress/gzip"
"crypto"
"crypto/sha256"
"database/sql"
"encoding/binary"
"encoding/json"
"fmt"
"hash"
"io"
"io/ioutil"
"os"
"path/filepath"
"sort"
"strings"
"sync/atomic"
"time"
"github.com/ProtonMail/go-crypto/openpgp"
"github.com/ProtonMail/go-crypto/openpgp/armor"
"github.com/ProtonMail/go-crypto/openpgp/packet" //"context"
"golang.org/x/sys/unix"
"github.com/fsnotify/fsnotify"
"github.com/uber/storagetapper/config"
"github.com/uber/storagetapper/log"
"github.com/uber/storagetapper/metrics"
)
//TODO: Support reading file currently open by producer
//TODO: Support offset persistence
const (
dirPerm = 0775
delimiter byte = '\n'
)
var signKeyPw = ""
var privateKeyPw = ""
type flushWriteCloser interface {
Write([]byte) (int, error)
Flush() error
Close() error
}
//fs calls abstraction to reuse most of the code in HDFS pipe
type fs interface {
MkdirAll(path string, perm os.FileMode) error
Rename(oldpath, newpath string) error
ReadDir(dirname string, listFrom string) ([]os.FileInfo, error)
OpenRead(name string, offset int64) (io.ReadCloser, error)
OpenWrite(name string) (flushWriteCloser, io.Seeker, error)
Remove(name string) error
Cancel(io.Closer) error
}
type filePipe struct {
datadir string
cfg config.PipeConfig
}
type file struct {
name string
key string
file io.WriteCloser //this is only used to cancel s3 streams, see fs.Cancel()
seek io.Seeker
hash hash.Hash
offset int64
nRecs int64
writer flushWriteCloser
//Open order dlist
prev *file
next *file
compressedSize int64
}
type stat struct {
NumRecs int64
Hash string
FileName string
}
// fileProducer synchronously pushes messages to File using topic specified during producer creation
type fileProducer struct {
*filePipe
header Header
topic string
files map[string]*file
//Linked list of files in open order, to be able to close in the same order
ffirst *file
flast *file
seqno int
fs fs
text int64 //Can be changed by SetFormat
metrics *metrics.FilePipeMetrics
stats map[string]*stat
}
// fileConsumer consumes messages from File using topic and partition specified during consumer creation
type fileConsumer struct {
*filePipe
baseConsumer
topic string
file io.ReadCloser
name string
reader *bufio.Reader
header Header
fs fs
text int64 //Determined by the Format field of the file header. See openFile
msg []byte
err error
metrics *metrics.FilePipeMetrics
pgpMD *openpgp.MessageDetails
watcher *fsnotify.Watcher
offset int64
}
type noopFlusher struct {
io.WriteCloser
}
func (f *noopFlusher) Flush() error {
return nil
}
type flushClose struct {
*bufio.Writer
}
func (f *flushClose) Close() error {
return f.Writer.Flush()
}
//hashWriter currently is used only to report number of bytes really written
//to file and count compressed file size for rotation
type hashWriter struct {
flushWriteCloser
hash hash.Hash
metrics *metrics.FilePipeMetrics
f *file
}
func (w *hashWriter) Write(b []byte) (int, error) {
n, err := w.flushWriteCloser.Write(b)
if w.hash != nil {
_, _ = w.hash.Write(b)
}
w.metrics.BytesWritten.Inc(int64(n))
if w.f != nil {
w.f.compressedSize += int64(n)
}
return n, err
}
type chainer struct {
w1 flushWriteCloser
w2 flushWriteCloser
}
func (c *chainer) Write(b []byte) (int, error) {
return c.w1.Write(b) // writes to c.w2
}
func (c *chainer) Close() error {
err1 := c.w1.Close()
err2 := c.w2.Close()
if err1 != nil {
return err1
}
return err2
}
func (c *chainer) Flush() error {
if err := c.w1.Flush(); err != nil {
return err
}
return c.w2.Flush()
}
func init() {
registerPlugin("file", initFilePipe)
}
func initFilePipe(cfg *config.PipeConfig, db *sql.DB) (Pipe, error) {
return &filePipe{cfg.BaseDir, *cfg}, nil
}
// Type returns Pipe type as File
func (p *filePipe) Type() string {
return "file"
}
// Config returns pipe configuration
func (p *filePipe) Config() *config.PipeConfig {
return &p.cfg
}
// Close releases resources associated with the pipe
func (p *filePipe) Close() error {
return nil
}
//NewProducer registers a new sync producer
func (p *filePipe) NewProducer(topic string) (Producer, error) {
m := metrics.NewFilePipeMetrics("pipe_producer", map[string]string{"topic": topic, "pipeType": "file"})
return &fileProducer{filePipe: p, topic: topic, files: make(map[string]*file), fs: &fileFS{}, metrics: m, stats: make(map[string]*stat)}, nil
}
func (p *filePipe) initConsumer(c *fileConsumer, fn fetchFunc) (Consumer, error) {
fname, offset, err := c.seek(c.topic, InitialOffset)
if log.E(err) {
return nil, err
}
if fname != "" {
if strings.HasSuffix(fname, ".open") {
c.offset = offset
} else {
c.openFile(fname, offset)
}
}
c.initBaseConsumer(fn)
return c, nil
}
//NewConsumer registers a new file consumer with context
func (p *filePipe) NewConsumer(topic string) (Consumer, error) {
w, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
m := metrics.NewFilePipeMetrics("pipe_consumer", map[string]string{"topic": topic, "pipeType": "file"})
c := &fileConsumer{filePipe: p, topic: topic, fs: &fileFS{}, metrics: m, watcher: w}
return p.initConsumer(c, c.fetchNext)
}
func topicPath(datadir string, topic string) string {
var r string
if datadir != "" {
r += datadir + "/"
}
if topic != "" {
r += topic
}
return r
}
func (p *fileProducer) topicPath(topic string) string {
return topicPath(p.datadir, topic)
}
func (p *fileConsumer) topicPath(topic string) string {
return topicPath(p.datadir, topic)
}
func (p *fileConsumer) nextFile(topic string, curFile string) (string, error) {
tp := p.topicPath(topic)
dir := filepath.Dir(tp)
if curFile == "" {
curFile = tp
}
files, err := p.fs.ReadDir(dir, curFile)
if err != nil {
if os.IsNotExist(err) {
return "", nil
}
return "", err
}
if len(files) == 0 {
return "", nil
}
i := sort.Search(len(files), func(i int) bool {
fn := dir + "/" + files[i].Name()
return fn > curFile
})
if i < len(files) {
fn := dir + "/" + files[i].Name()
if strings.HasPrefix(fn, tp) && !files[i].IsDir() {
log.Debugf("%v NextFile: %v, CurFile: %v", topic, files[i].Name(), curFile)
return files[i].Name(), nil
}
}
return "", nil
}
func (p *fileConsumer) seek(topic string, offset int64) (string, int64, error) {
tp := p.topicPath(topic)
dir := filepath.Dir(tp)
files, err := p.fs.ReadDir(dir, tp)
if err != nil {
if os.IsNotExist(err) {
return "", 0, nil
}
return "", 0, err
}
if len(files) == 0 {
return "", 0, nil
}
if offset == OffsetOldest {
for _, f := range files {
if strings.HasPrefix(dir+"/"+f.Name(), tp) && !f.IsDir() {
if strings.HasSuffix(f.Name(), ".open") {
return "", 0, nil
}
return f.Name(), 0, nil
}
}
return "", 0, nil
}
if offset == OffsetNewest {
for i := len(files) - 1; i >= 0; i-- {
if strings.HasPrefix(dir+"/"+files[i].Name(), tp) && !files[i].IsDir() {
return files[i].Name(), files[i].Size(), nil
}
}
return "", 0, nil
}
return "", 0, fmt.Errorf("arbitrary offsets not supported, only OffsetOldest and OffsetNewest offsets supported")
}
func (p *fileProducer) newFileName(key string) string {
p.seqno++ //Precaution to not generate file with the same name if timestamps are equal
format := "%s%010d.%03d.%s"
if p.cfg.Compression {
format += ".gz"
}
if p.cfg.Encryption.Enabled {
format += ".gpg"
}
return fmt.Sprintf(format+".open", p.topicPath(p.topic), time.Now().Unix(), p.seqno, key)
}
func (p *fileProducer) initCrypterWriter(filename string, writer io.WriteCloser) (io.WriteCloser, error) {
if len(p.cfg.Encryption.PublicKey) == 0 {
return nil, fmt.Errorf("public key is empty. required for producing encrypted stream")
}
block, err := armor.Decode(bytes.NewReader([]byte(p.cfg.Encryption.PublicKey)))
if log.E(err) {
return nil, err
}
if block.Type != openpgp.PublicKeyType {
return nil, fmt.Errorf("expected public key. got:%s", block.Type)
}
encEntity, err := openpgp.ReadEntity(packet.NewReader(block.Body))
if log.E(err) {
return nil, err
}
block, err = armor.Decode(bytes.NewReader([]byte(p.cfg.Encryption.SigningKey)))
if log.E(err) {
return nil, err
}
if block.Type != openpgp.PrivateKeyType {
return nil, fmt.Errorf("expected private key. got:%s", block.Type)
}
signEntity, err := openpgp.ReadEntity(packet.NewReader(block.Body))
if log.E(err) {
return nil, err
}
err = signEntity.PrivateKey.Decrypt([]byte(signKeyPw))
log.Debugf("signKeyPw %v", err)
if log.E(err) {
return nil, err
}
log.Debugf("pubKey: %x", encEntity.PrimaryKey.Fingerprint)
log.Debugf("signKey: %x", signEntity.PrimaryKey.Fingerprint)
hints := &openpgp.FileHints{
IsBinary: true,
FileName: filename,
ModTime: time.Now(),
}
writer, err = openpgp.Encrypt(writer, []*openpgp.Entity{encEntity}, signEntity, hints, nil)
if log.E(err) {
return nil, err
}
return writer, nil
}
func listInsert(p *fileProducer, n *file) {
if p.flast != nil {
p.flast.next = n
}
if p.ffirst == nil {
p.ffirst = n
}
p.flast = n
}
func listRemove(p *fileProducer, d *file) {
if d.prev != nil {
d.prev.next = d.next
} else {
p.ffirst = d.next
}
if d.next != nil {
d.next.prev = d.prev
} else {
p.flast = d.prev
}
}
func (p *fileProducer) newFile(key string) error {
if err := p.fs.MkdirAll(filepath.Dir(p.topicPath(p.topic)), dirPerm); err != nil {
return err
}
n := p.newFileName(key)
w, seeker, err := p.fs.OpenWrite(n)
if err != nil {
return err
}
// continue writing works for uncompressed unencrypted files only
var offset int64
if seeker != nil {
offset, err = seeker.Seek(0, io.SeekEnd)
if err != nil {
return err
}
}
h := sha256.New()
hw := &hashWriter{w, h, p.metrics, nil}
var writer flushWriteCloser = hw
if p.cfg.Encryption.Enabled {
w, err := p.initCrypterWriter(n, writer)
if err != nil {
return err
}
writer = &chainer{&noopFlusher{w}, writer}
}
writer = &chainer{&flushClose{bufio.NewWriter(writer)}, writer}
if p.cfg.Compression {
writer = &chainer{gzip.NewWriter(writer), writer}
}
_ = p.closeFile(p.files[key], true)
log.Debugf("Opened: %v, %v compression: %v", key, n, p.cfg.Compression)
f := &file{n, key, w, seeker, h, offset, 0, writer, p.flast, nil, offset}
hw.f = f
listInsert(p, f)
p.files[key] = f
p.metrics.FilesOpened.Inc(1)
return nil
}
func (p *fileProducer) getFile(key string) (*file, error) {
f := p.files[key]
if f == nil {
if err := p.newFile(key); err != nil {
return nil, err
}
f = p.files[key]
}
return f, nil
}
func (p *fileProducer) cancel(f *file) {
err := p.fs.Remove(strings.TrimSuffix(f.name, ".open"))
if err != nil && !os.IsNotExist(err) {
log.E(err)
}
err = p.fs.Remove(f.name)
if err != nil && !os.IsNotExist(err) {
log.E(err)
}
log.E(p.fs.Cancel(f.file))
}
func syncFsMetadata() error {
_, _, errno := unix.Syscall(unix.SYS_SYNC, 0, 0, 0)
if errno != 0 {
return errno
}
return nil
}
func (p *fileProducer) closeFile(f *file, graceful bool) error {
log.Debugf("Closed: %v", f)
if f == nil {
return nil
}
var rerr error
defer func() {
listRemove(p, f)
delete(p.files, f.key)
if rerr != nil || !graceful {
p.cancel(f)
}
}()
if err := f.writer.Close(); log.E(err) {
rerr = err
}
fn := strings.TrimSuffix(f.name, ".open")
if graceful && rerr == nil {
if err := p.fs.Rename(f.name, fn); log.E(err) {
rerr = err
}
}
p.stats[fn] = &stat{NumRecs: f.nRecs, Hash: fmt.Sprintf("%x", f.hash.Sum(nil)), FileName: fn}
p.metrics.FilesClosed.Inc(1)
log.E(syncFsMetadata())
log.Debugf("Closed: %v", f.name)
return rerr
}
func (p *fileProducer) writeBinaryMsgLength(f *file, len int) error {
if atomic.LoadInt64(&p.text) == 1 || !p.cfg.FileDelimited {
return nil
}
sz := make([]byte, 4)
binary.LittleEndian.PutUint32(sz, uint32(len))
_, err := f.writer.Write(sz)
return err
}
func (p *fileProducer) writeTextMsgDelimiter(f *file) error {
if atomic.LoadInt64(&p.text) == 0 || !p.cfg.FileDelimited {
return nil
}
o := make([]byte, 0)
o = append(o, delimiter)
_, err := f.writer.Write(o)
return err
}
func (p *fileProducer) rotateOnSizeLimit(key string, f *file) {
if (p.cfg.MaxFileDataSize != 0 && f.offset >= p.cfg.MaxFileDataSize) || (p.cfg.MaxFileSize != 0 && f.compressedSize > p.cfg.MaxFileSize) {
_ = p.closeFile(p.files[key], true)
}
}
//Push produces message to File topic
func (p *fileProducer) push(key string, in interface{}, batch bool) error {
var bytes []byte
switch b := in.(type) {
case []byte:
bytes = b
default:
return fmt.Errorf("file pipe can handle binary arrays only")
}
f, err := p.getFile(key)
if err != nil {
return err
}
defer func() {
if err != nil {
p.cancel(f)
}
}()
//Prepend message with size in the case of binary delimited format
if err = p.writeBinaryMsgLength(f, len(bytes)); err != nil {
return err
}
if _, err = f.writer.Write(bytes); err != nil {
return err
}
//In the case of text format apppend delimiter after the message
if err = p.writeTextMsgDelimiter(f); err != nil {
return err
}
f.offset += int64(len(bytes)) + 1
f.nRecs++
if !batch {
if err = f.writer.Flush(); err != nil {
return err
}
p.rotateOnSizeLimit(key, f)
}
return nil
}
//PushK sends a keyed message to File
func (p *fileProducer) PushK(key string, in interface{}) error {
return p.push(key, in, false)
}
//Push produces message to File topic
func (p *fileProducer) Push(in interface{}) error {
return p.push("default", in, false)
}
//PushBatch stashes a keyed message into batch which will be send to File by
//PushBatchCommit
func (p *fileProducer) PushBatch(key string, in interface{}) error {
return p.push(key, in, true)
}
//PushBatchCommit commits currently queued messages in the producer
func (p *fileProducer) PushBatchCommit() error {
//Flush and may be close in open order
f := p.ffirst
for f != nil {
if err := f.writer.Flush(); err != nil {
p.cancel(f)
return err
}
p.rotateOnSizeLimit(f.key, f)
f = f.next
}
return nil
}
func (p *fileProducer) PushSchema(key string, data []byte) error {
if err := p.PushBatchCommit(); err != nil {
return err
}
if key == "" {
key = "schema"
}
_ = p.closeFile(p.files[key], true)
if len(data) == 0 {
return nil
}
p.header.Schema = data
return p.push(key, data, false)
}
func (p *fileProducer) close(graceful bool) (err error) {
if len(p.files) == 0 {
return nil
}
for p.ffirst != nil {
if e := p.closeFile(p.ffirst, graceful); e != nil {
err = e
}
}
p.files = nil
return err
}
func (p *fileProducer) dumpStat(f io.Writer) error {
s := make([]*stat, 0)
for _, n := range p.stats {
s = append(s, n)
}
sort.Slice(s, func(i, j int) bool { return s[i].FileName < s[j].FileName })
l, err := json.Marshal(s)
if err != nil {
return err
}
if _, err := f.Write(l); err != nil {
return err
}
return nil
}
// Close removes unfinished files
func (p *fileProducer) Close() error {
err := p.close(true)
if err != nil {
return err
}
if p.cfg.EndOfStreamMark {
if err := p.fs.MkdirAll(filepath.Dir(p.topicPath(p.topic)), dirPerm); err != nil {
return err
}
name := filepath.Dir(p.topicPath(p.topic)) + "/_DONE"
f, _, err := p.fs.OpenWrite(name)
if err != nil {
return err
}
if err := p.dumpStat(f); err != nil {
return err
}
if err := f.Close(); err != nil {
return err
}
}
return nil
}
// CloseOnFailure removes unfinished files
func (p *fileProducer) CloseOnFailure() error {
return p.close(false)
}
//PartitionKey transforms input row key into partition key
func (p *fileProducer) PartitionKey(source string, key string) string {
if source == "snapshot" {
return "snapshot"
}
return "log"
}
func (p *fileConsumer) waitForNextFilePrepare() error {
err := p.watcher.Add(filepath.Dir(p.topicPath(p.topic)))
if err != nil {
if os.IsNotExist(err) {
err = p.watcher.Add(p.datadir)
}
}
return err
}
func (p *fileConsumer) waitForNextFileFinish(watcher *fsnotify.Watcher) error {
_ = watcher.Remove(filepath.Dir(p.topicPath(p.topic)))
_ = watcher.Remove(p.datadir)
return nil
}
func (p *fileConsumer) checkForNextFile(watcher *fsnotify.Watcher) bool {
var e bool
for {
select {
case event := <-watcher.Events:
if event.Op&(fsnotify.Create|fsnotify.Remove|fsnotify.Rename) != 0 {
log.Debugf("modified file during readdir: %v %v", event.Name, event.Op)
e = true
}
default:
return e
}
}
}
func (p *fileConsumer) waitForNextFile(watcher *fsnotify.Watcher) (bool, error) {
log.Debugf("Waiting for directory events %v", p.topic)
for {
select {
case event := <-watcher.Events:
if event.Op&(fsnotify.Create|fsnotify.Rename) != 0 {
log.Debugf("modified file: %v", event.Name)
return false, nil
}
case err := <-watcher.Errors:
return false, err
case <-p.ctx.Done():
return true, nil
}
}
}
func (p *fileConsumer) waitAndOpenNextFile() bool {
defer func() { log.E(p.waitForNextFileFinish(p.watcher)) }()
for {
//Need to start watching before p.nextFile() to avoid race condition
p.err = p.waitForNextFilePrepare()
if log.E(p.err) {
return true
}
nextFn, err := p.nextFile(p.topic, p.name)
if log.E(p.err) {
return true
}
// There was create file events while we were reading the directory in
// nextFile, so we need to start over to guarantee file ordering
if p.checkForNextFile(p.watcher) {
log.Debugf("directory was modified, restarting check for next file")
log.E(p.waitForNextFileFinish(p.watcher))
continue
}
if nextFn != "" && !strings.HasSuffix(nextFn, ".open") {
p.openFile(nextFn, p.offset)
p.offset = 0
return true
}
if p.cfg.NonBlocking {
return false
}
var ctxDone bool
ctxDone, p.err = p.waitForNextFile(p.watcher)
if log.E(err) {
return true
}
if ctxDone {
return false
}
log.E(p.waitForNextFileFinish(p.watcher))
}
}
func (p *fileConsumer) waitAndOpenNextFilePoll() bool {
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
for {
nextFn, err := p.nextFile(p.topic, p.name)
if log.E(err) {
p.err = err
return true
}
if nextFn != "" && !strings.HasSuffix(nextFn, ".open") {
p.openFile(nextFn, 0)
return true
}
if p.cfg.NonBlocking {
return false
}
select {
case <-ticker.C:
case <-p.ctx.Done():
return false
}
}
}
func (p *fileConsumer) initCrypterReader(reader io.Reader) (io.Reader, *openpgp.MessageDetails, error) {
if len(p.cfg.Encryption.PrivateKey) == 0 {
return nil, nil, fmt.Errorf("private key is empty. required for consuming encrypted stream")
}
block, err := armor.Decode(bytes.NewReader([]byte(p.cfg.Encryption.PrivateKey)))
if log.E(err) {
return nil, nil, err
}
if block.Type != openpgp.PrivateKeyType {
err = fmt.Errorf("expected private key. got:%s", block.Type)
log.E(err)
return nil, nil, err
}
privEntity, err := openpgp.ReadEntity(packet.NewReader(block.Body))
if log.E(err) {
return nil, nil, err
}
err = privEntity.PrivateKey.Decrypt([]byte(privateKeyPw))
if log.E(err) {
return nil, nil, err
}
md, err := openpgp.ReadMessage(reader, openpgp.EntityList{privEntity}, nil, &packet.Config{DefaultHash: crypto.SHA256})
if log.E(err) {
return nil, nil, err
}
return md.UnverifiedBody, md, nil
}
func (p *fileConsumer) openFileInitFilter() (err error) {
//TODO: Get encryption and compression type from Filters field of the header
if p.cfg.Encryption.Enabled || p.cfg.Compression {
//Header reader cached more then just a header, so need to reopen
log.E(p.file.Close())
p.file, err = p.fs.OpenRead(p.name, 0)
if log.E(err) {
return
}
var reader io.Reader = p.file
if p.cfg.Encryption.Enabled {
reader, p.pgpMD, err = p.initCrypterReader(reader)
if err != nil {
return
}
}
if p.cfg.Compression {
reader, err = gzip.NewReader(reader)
if log.E(err) {
return
}
}
p.reader = bufio.NewReader(reader)
}
return
}
func (p *fileConsumer) openFile(nextFn string, offset int64) {
dir := filepath.Dir(p.topicPath(p.topic)) + "/"
p.file, p.err = p.fs.OpenRead(dir+nextFn, 0)
if log.E(p.err) {
return
}
defer func() {
if p.err != nil {
p.reader = nil
if p.file != nil {
log.E(p.file.Close())
}
p.file = nil
}
}()
p.reader = bufio.NewReader(p.file)
p.header.Delimited = p.cfg.FileDelimited
if !p.header.Delimited {
p.err = fmt.Errorf("cannot consume non delimited file")
log.E(p.err)
return
}
if p.header.Format == "json" || p.header.Format == "text" {
atomic.StoreInt64(&p.text, 1)
}
if offset != 0 {
log.E(p.file.Close())
p.file, p.err = p.fs.OpenRead(dir+nextFn, offset)
if log.E(p.err) {
return
}
p.reader = bufio.NewReader(p.file)
}
p.name = dir + nextFn
p.err = p.openFileInitFilter()
if p.err != nil {
return
}
p.metrics.FilesOpened.Inc(1)
log.Debugf("Consumer opened: %v, header: %+v", p.name, p.header)
}
func (p *fileConsumer) writeMessage() {
if atomic.LoadInt64(&p.text) == 0 {
msg := make([]byte, 4)
_, p.err = p.reader.Read(msg)
if p.err == nil {
p.msg = make([]byte, binary.LittleEndian.Uint32(msg))
_, p.err = io.ReadFull(p.reader, p.msg)
/*
if p.err == nil {
log.Debugf("Consumed message: %x %p", p.msg, &p.baseConsumer)
}
*/
}
} else {
p.msg, p.err = p.reader.ReadBytes(delimiter)
if p.err == nil {
p.msg = p.msg[:len(p.msg)-1]
//log.Debugf("Consumed message: %x %p", p.msg, &p.baseConsumer)
}
}
}
func (p *fileConsumer) fetchNextLow() bool {
//reader and file can be nil when directory is empty during
//NewConsumer
if p.reader != nil {
p.writeMessage()
if p.err == nil {
return true
}
if p.err != io.EOF && (!p.cfg.Compression || p.err != io.ErrUnexpectedEOF) {
log.E(p.err)
return true
}
log.E(p.file.Close())
p.reader = nil
p.file = nil
if p.pgpMD != nil && p.pgpMD.IsSigned && p.pgpMD.SignedBy != nil {
if p.pgpMD.SignatureError != nil {
log.Errorf("signature error: %v", p.pgpMD.SignatureError)
} else {
if p.pgpMD.SignedBy.PublicKey != nil {
log.Debugf("valid signature, signed by: %x", p.pgpMD.SignedBy.PublicKey.Fingerprint)
}
}
}
log.Debugf("Consumer closed: %v", p.name)
if atomic.LoadInt64(&p.text) == 1 && p.cfg.FileDelimited && len(p.msg) != 0 {
p.err = fmt.Errorf("corrupted file. Not ending with delimiter: %v %v", p.name, string(p.msg))
return true
}
p.err = nil
}
return false
}
//fetchNext fetches next message from File and commits offset read
func (p *fileConsumer) fetchNext() (interface{}, error) {
for {
if p.fetchNextLow() {
return p.msg, p.err
}
if !p.waitAndOpenNextFile() {
return nil, p.err
}
if p.err != nil {
return p.msg, p.err
}
}
}
//fetchNext fetches next message from File and commits offset read
func (p *fileConsumer) fetchNextPoll() (interface{}, error) {
for {
if p.fetchNextLow() {
return p.msg, p.err
}
if !p.waitAndOpenNextFilePoll() {
return nil, p.err
}
if p.err != nil {
return p.msg, p.err
}
}
}
//Close closes consumer
func (p *fileConsumer) close(graceful bool) (err error) {
log.Debugf("Close consumer: %v", p.topic)
p.cancel()
p.wg.Wait()
if p.watcher != nil {
err = p.watcher.Close()
log.E(err)
}
if p.file != nil {
err = p.file.Close()
log.E(err)
}
return err
}
//Close closes consumer
func (p *fileConsumer) Close() error {
return p.close(true)
}
//Close closes consumer
func (p *fileConsumer) CloseOnFailure() error {
return p.close(false)
}
func (p *fileConsumer) SaveOffset() error {
return nil
}
func (p *fileConsumer) SetFormat(format string) {
p.header.Format = format
if format == "json" || format == "text" {
atomic.StoreInt64(&p.text, 1)
}
}
func (p *fileProducer) SetFormat(format string) {
p.header.Format = format
if format == "json" || format == "text" {
atomic.StoreInt64(&p.text, 1)
}
}
type fileFS struct {
}
func (p *fileFS) MkdirAll(path string, perm os.FileMode) error {
return os.MkdirAll(path, perm)
}
func (p *fileFS) Rename(oldpath, newpath string) error {
return os.Rename(oldpath, newpath)
}
func (p *fileFS) ReadDir(dirname string, _ string) ([]os.FileInfo, error) {
return ioutil.ReadDir(dirname)
}
func (p *fileFS) OpenRead(name string, offset int64) (io.ReadCloser, error) {
f, err := os.OpenFile(name, os.O_RDONLY, 0)
if err != nil {
return nil, err
}
_, err = f.Seek(offset, io.SeekStart)
if err != nil {
return nil, err
}
return f, nil
}
type fileWriter struct {
*os.File
}
func (f *fileWriter) Flush() error {
return f.File.Sync()
}
func (p *fileFS) OpenWrite(name string) (flushWriteCloser, io.Seeker, error) {
f, err := os.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0644)
return &fileWriter{f}, f, err
}
func (p *fileFS) Remove(path string) error {
return os.Remove(path)
}
func (p *fileFS) Cancel(f io.Closer) error {
return nil
}