in relay.go [293:369]
func (r *Relayer) Receive(f *Frame, fType frameType) (sent bool, failureReason string) {
id := f.Header.ID
// If we receive a response frame, we expect to find that ID in our outbound.
// If we receive a request frame, we expect to find that ID in our inbound.
items := r.receiverItems(fType)
finished := finishesCall(f)
// Stop the timeout if the call if finished.
item, stopped, ok := items.Get(id, finished /* stopTimeout */)
if !ok {
r.logger.WithFields(
LogField{"id", id},
).Warn("Received a frame without a RelayItem.")
return false, _relayErrorNotFound
}
if item.tomb || (finished && !stopped) {
// Item has previously timed out, or is in the process of timing out.
// TODO: metrics for late-arriving frames.
return true, ""
}
// call res frames don't include the OK bit, so we can't wait until the last
// frame of a relayed RPC to determine if the call succeeded.
if fType == responseFrame || f.messageType() == messageTypeCancel {
// If we've gotten a response frame, we're the originating relayer and
// should handle stats.
if succeeded, failMsg := determinesCallSuccess(f); succeeded {
item.call.Succeeded()
} else if len(failMsg) > 0 {
item.call.Failed(failMsg)
}
}
select {
case r.conn.sendCh <- f:
default:
// Buffer is full, so drop this frame and cancel the call.
// Since this is typically due to the send buffer being full, get send buffer
// usage + limit and add that to the log.
sendBuf, sendBufLimit, sendBufErr := r.conn.sendBufSize()
now := r.conn.timeNow().UnixNano()
logFields := []LogField{
{"id", id},
{"destConnSendBufferCurrent", sendBuf},
{"destConnSendBufferLimit", sendBufLimit},
{"sendChQueued", len(r.conn.sendCh)},
{"sendChCapacity", cap(r.conn.sendCh)},
{"lastActivityRead", r.conn.lastActivityRead.Load()},
{"lastActivityWrite", r.conn.lastActivityRead.Load()},
{"sinceLastActivityRead", time.Duration(now - r.conn.lastActivityRead.Load()).String()},
{"sinceLastActivityWrite", time.Duration(now - r.conn.lastActivityWrite.Load()).String()},
}
if sendBufErr != nil {
logFields = append(logFields, LogField{"destConnSendBufferError", sendBufErr.Error()})
}
r.logger.WithFields(logFields...).Warn("Dropping call due to slow connection.")
items := r.receiverItems(fType)
err := _relayErrorDestConnSlow
// If we're dealing with a response frame, then the client is slow.
if fType == responseFrame {
err = _relayErrorSourceConnSlow
}
r.failRelayItem(items, id, err, errFrameNotSent)
return false, err
}
if finished {
r.finishRelayItem(items, id)
}
return true, ""
}