in conn.go [187:229]
func (c *Conn) handleReads() {
defer c.Close()
for {
if c.sessionCtx.Err() != nil {
return
}
dec, err := createDecoder(c.conn)
if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) {
return // don't make further attempts to read from closed connection, close goroutine
}
if err != nil {
log.Printf("could not read response packet: %v", err)
return
}
replyHeader := &proto.ReplyHeader{}
if err = dec.ReadRecord(replyHeader); err != nil {
log.Printf("could not decode reply header: %v", err)
return
}
if replyHeader.Xid == pingXID {
continue // ignore ping responses
}
value, ok := c.reqs.LoadAndDelete(replyHeader.Xid)
if !ok {
log.Printf("no matching request found for xid %d", replyHeader.Xid)
continue
}
pending := value.(*pendingRequest)
if replyHeader.Err != 0 {
code := Error(replyHeader.Err)
pending.error = &code
} else if err = dec.ReadRecord(pending.reply); err != nil {
log.Printf("could not decode reply record: %v", err)
return
}
pending.done <- struct{}{}
}
}