func()

in pipe/file.go [605:650]


func (p *fileProducer) push(key string, in interface{}, batch bool) error {
	var bytes []byte
	switch b := in.(type) {
	case []byte:
		bytes = b
	default:
		return fmt.Errorf("file pipe can handle binary arrays only")
	}

	f, err := p.getFile(key)
	if err != nil {
		return err
	}

	defer func() {
		if err != nil {
			p.cancel(f)
		}
	}()

	//Prepend message with size in the case of binary delimited format
	if err = p.writeBinaryMsgLength(f, len(bytes)); err != nil {
		return err
	}

	if _, err = f.writer.Write(bytes); err != nil {
		return err
	}

	//In the case of text format apppend delimiter after the message
	if err = p.writeTextMsgDelimiter(f); err != nil {
		return err
	}

	f.offset += int64(len(bytes)) + 1
	f.nRecs++

	if !batch {
		if err = f.writer.Flush(); err != nil {
			return err
		}
		p.rotateOnSizeLimit(key, f)
	}

	return nil
}