in pipe/file.go [605:650]
func (p *fileProducer) push(key string, in interface{}, batch bool) error {
var bytes []byte
switch b := in.(type) {
case []byte:
bytes = b
default:
return fmt.Errorf("file pipe can handle binary arrays only")
}
f, err := p.getFile(key)
if err != nil {
return err
}
defer func() {
if err != nil {
p.cancel(f)
}
}()
//Prepend message with size in the case of binary delimited format
if err = p.writeBinaryMsgLength(f, len(bytes)); err != nil {
return err
}
if _, err = f.writer.Write(bytes); err != nil {
return err
}
//In the case of text format apppend delimiter after the message
if err = p.writeTextMsgDelimiter(f); err != nil {
return err
}
f.offset += int64(len(bytes)) + 1
f.nRecs++
if !batch {
if err = f.writer.Flush(); err != nil {
return err
}
p.rotateOnSizeLimit(key, f)
}
return nil
}