in pubsub.go [276:308]
func (p *Pubsub) read(cnx net.Conn) error {
var (
reader = bufio.NewReader(cnx)
buffer = bytes.NewBuffer(nil)
)
p.resubscribe(cnx)
// The only thing that
for {
buffer.Reset()
if err := ReadNextFull(buffer, reader); err != nil {
p.mu.Lock()
p.connection = nil
p.mu.Unlock()
return err
}
bytes := copyBytes(buffer.Bytes())
parsed, err := ParsePublishCommand(bytes)
if err != nil {
continue // expected, we can get replies from subscriptions, which we'll ignore
}
p.mu.Lock()
if parsed.IsPattern {
p.patterns.broadcast(parsed.ChannelOrPattern, bytes)
} else {
p.channels.broadcast(parsed.ChannelOrPattern, bytes)
}
p.mu.Unlock()
}
}