func()

in conn.go [596:682]


func (c *Conn) connReader() {
	defer func() {
		close(c.rxDone)
		c.close()
	}()

	var sessionsByRemoteChannel = make(map[uint16]*Session)
	var err error
	for {
		if err != nil {
			debug.Log(0, "RX (connReader %p): terminal error: %v", c, err)
			c.rxErr = err
			return
		}

		var fr frames.Frame
		fr, err = c.readFrame()
		if err != nil {
			continue
		}

		debug.Log(0, "RX (connReader %p): %s", c, fr)

		var (
			session *Session
			ok      bool
		)

		switch body := fr.Body.(type) {
		// Server initiated close.
		case *frames.PerformClose:
			// connWriter will send the close performative ack on its way out.
			// it's a SHOULD though, not a MUST.
			if body.Error == nil {
				return
			}
			err = body.Error
			continue

		// RemoteChannel should be used when frame is Begin
		case *frames.PerformBegin:
			if body.RemoteChannel == nil {
				// since we only support remotely-initiated sessions, this is an error
				// TODO: it would be ideal to not have this kill the connection
				err = fmt.Errorf("%T: nil RemoteChannel", fr.Body)
				continue
			}
			c.sessionsByChannelMu.RLock()
			session, ok = c.sessionsByChannel[*body.RemoteChannel]
			c.sessionsByChannelMu.RUnlock()
			if !ok {
				// this can happen if NewSession() exits due to the context expiring/cancelled
				// before the begin ack is received.
				err = fmt.Errorf("unexpected remote channel number %d", *body.RemoteChannel)
				continue
			}

			session.remoteChannel = fr.Channel
			sessionsByRemoteChannel[fr.Channel] = session

		case *frames.PerformEnd:
			session, ok = sessionsByRemoteChannel[fr.Channel]
			if !ok {
				err = fmt.Errorf("%T: didn't find channel %d in sessionsByRemoteChannel (PerformEnd)", fr.Body, fr.Channel)
				continue
			}
			// we MUST remove the remote channel from our map as soon as we receive
			// the ack (i.e. before passing it on to the session mux) on the session
			// ending since the numbers are recycled.
			delete(sessionsByRemoteChannel, fr.Channel)
			c.deleteSession(session)

		default:
			// pass on performative to the correct session
			session, ok = sessionsByRemoteChannel[fr.Channel]
			if !ok {
				err = fmt.Errorf("%T: didn't find channel %d in sessionsByRemoteChannel", fr.Body, fr.Channel)
				continue
			}
		}

		q := session.rxQ.Acquire()
		q.Enqueue(fr.Body)
		session.rxQ.Release(q)
		debug.Log(2, "RX (connReader %p): mux frame to Session (%p): %s", c, session, fr)
	}
}