in conn.go [596:682]
func (c *Conn) connReader() {
defer func() {
close(c.rxDone)
c.close()
}()
var sessionsByRemoteChannel = make(map[uint16]*Session)
var err error
for {
if err != nil {
debug.Log(0, "RX (connReader %p): terminal error: %v", c, err)
c.rxErr = err
return
}
var fr frames.Frame
fr, err = c.readFrame()
if err != nil {
continue
}
debug.Log(0, "RX (connReader %p): %s", c, fr)
var (
session *Session
ok bool
)
switch body := fr.Body.(type) {
// Server initiated close.
case *frames.PerformClose:
// connWriter will send the close performative ack on its way out.
// it's a SHOULD though, not a MUST.
if body.Error == nil {
return
}
err = body.Error
continue
// RemoteChannel should be used when frame is Begin
case *frames.PerformBegin:
if body.RemoteChannel == nil {
// since we only support remotely-initiated sessions, this is an error
// TODO: it would be ideal to not have this kill the connection
err = fmt.Errorf("%T: nil RemoteChannel", fr.Body)
continue
}
c.sessionsByChannelMu.RLock()
session, ok = c.sessionsByChannel[*body.RemoteChannel]
c.sessionsByChannelMu.RUnlock()
if !ok {
// this can happen if NewSession() exits due to the context expiring/cancelled
// before the begin ack is received.
err = fmt.Errorf("unexpected remote channel number %d", *body.RemoteChannel)
continue
}
session.remoteChannel = fr.Channel
sessionsByRemoteChannel[fr.Channel] = session
case *frames.PerformEnd:
session, ok = sessionsByRemoteChannel[fr.Channel]
if !ok {
err = fmt.Errorf("%T: didn't find channel %d in sessionsByRemoteChannel (PerformEnd)", fr.Body, fr.Channel)
continue
}
// we MUST remove the remote channel from our map as soon as we receive
// the ack (i.e. before passing it on to the session mux) on the session
// ending since the numbers are recycled.
delete(sessionsByRemoteChannel, fr.Channel)
c.deleteSession(session)
default:
// pass on performative to the correct session
session, ok = sessionsByRemoteChannel[fr.Channel]
if !ok {
err = fmt.Errorf("%T: didn't find channel %d in sessionsByRemoteChannel", fr.Body, fr.Channel)
continue
}
}
q := session.rxQ.Acquire()
q.Enqueue(fr.Body)
session.rxQ.Release(q)
debug.Log(2, "RX (connReader %p): mux frame to Session (%p): %s", c, session, fr)
}
}