func()

in pubsub.go [276:308]


func (p *Pubsub) read(cnx net.Conn) error {
	var (
		reader = bufio.NewReader(cnx)
		buffer = bytes.NewBuffer(nil)
	)

	p.resubscribe(cnx)

	// The only thing that
	for {
		buffer.Reset()
		if err := ReadNextFull(buffer, reader); err != nil {
			p.mu.Lock()
			p.connection = nil
			p.mu.Unlock()
			return err
		}

		bytes := copyBytes(buffer.Bytes())
		parsed, err := ParsePublishCommand(bytes)
		if err != nil {
			continue // expected, we can get replies from subscriptions, which we'll ignore
		}

		p.mu.Lock()
		if parsed.IsPattern {
			p.patterns.broadcast(parsed.ChannelOrPattern, bytes)
		} else {
			p.channels.broadcast(parsed.ChannelOrPattern, bytes)
		}
		p.mu.Unlock()
	}
}