in server.go [67:118]
func (s *connection) Start() {
reader := bufio.NewReader(s.cnx)
defer func() {
s.toSendCond.L.Lock()
s.isClosed = true
s.toSendCond.Broadcast()
s.toSendCond.L.Unlock()
s.pubsub.UnsubscribeAll(s)
clientsCount.Dec()
}()
clientsCount.Inc()
logrus.Debug("redplex/server: accepted connection")
go s.loopWrite()
for {
method, args, err := ParseRequest(reader)
if err != nil {
logrus.WithError(err).Debug("redplex/server: error reading command, terminating client connection")
return
}
switch method {
case commandSubscribe:
for _, channel := range args {
s.pubsub.Subscribe(Listener{false, string(channel), s})
}
case commandPSubscribe:
for _, channel := range args {
s.pubsub.Subscribe(Listener{true, string(channel), s})
}
case commandUnsubscribe:
for _, channel := range args {
s.pubsub.Unsubscribe(Listener{false, string(channel), s})
}
case commandPUnsubscribe:
for _, channel := range args {
s.pubsub.Unsubscribe(Listener{true, string(channel), s})
}
case commandQuit:
logrus.Debug("redplex/server: terminating connection at client's request")
return
default:
s.cnx.Write([]byte(fmt.Sprintf("-ERR unknown command '%s'\r\n", method)))
continue
}
for _, channel := range args {
s.cnx.Write(SubscribeResponse(method, channel))
}
}
}