in relay.go [535:594]
func (r *Relayer) handleNonCallReq(f *Frame) (shouldRelease bool, _ error) {
frameType := frameTypeFor(f)
finished := finishesCall(f)
// If we read a request frame, we need to use the outbound map to decide
// the destination. Otherwise, we use the inbound map.
items := r.outbound
if frameType == responseFrame {
items = r.inbound
}
// Stop the timeout if the call if finished.
item, stopped, ok := items.Get(f.Header.ID, finished /* stopTimeout */)
if !ok {
return _relayShouldRelease, errUnknownID
}
if item.tomb || (finished && !stopped) {
// Item has previously timed out, or is in the process of timing out.
// TODO: metrics for late-arriving frames.
return _relayShouldRelease, nil
}
switch f.messageType() {
case messageTypeCallRes:
// Invoke call.CallResponse() if we get a valid call response frame.
cr, err := newLazyCallRes(f)
if err == nil {
item.call.CallResponse(cr)
} else {
r.logger.WithFields(
ErrField(err),
LogField{"id", f.Header.ID},
).Error("Malformed callRes frame.")
}
case messageTypeCallReqContinue:
// Recalculate and update the checksum for this frame if it has non-nil item.mutatedChecksum
// (meaning the call was mutated) and it is a callReqContinue frame.
if item.mutatedChecksum != nil {
r.updateMutatedCallReqContinueChecksum(f, item.mutatedChecksum)
}
}
// Track sent/received bytes. We don't do this before we check
// for timeouts, since this should only be called before call.End().
item.reportRelayBytes(frameType, f.Header.FrameSize())
originalID := f.Header.ID
f.Header.ID = item.remapID
sent, failure := item.destination.Receive(f, frameType)
if !sent {
r.failRelayItem(items, originalID, failure, errFrameNotSent)
return _relayShouldRelease, nil
}
if finished {
r.finishRelayItem(items, originalID)
}
return _relayNoRelease, nil
}