in relay.go [426:532]
func (r *Relayer) handleCallReq(f *lazyCallReq) (shouldRelease bool, _ error) {
if handled := r.handleLocalCallReq(f); handled {
return _relayNoRelease, nil
}
call, err := r.relayHost.Start(f, r.relayConn)
if err != nil {
// If we have a RateLimitDropError we record the statistic, but
// we *don't* send an error frame back to the client.
if _, silentlyDrop := err.(relay.RateLimitDropError); silentlyDrop {
if call != nil {
call.Failed("relay-dropped")
call.End()
}
return _relayShouldRelease, nil
}
if _, ok := err.(SystemError); !ok {
err = NewSystemError(ErrCodeDeclined, err.Error())
}
if call != nil {
call.Failed(GetSystemErrorCode(err).relayMetricsKey())
call.End()
}
r.conn.SendSystemError(f.Header.ID, f.Span(), err)
// If the RelayHost returns a protocol error, close the connection.
if GetSystemErrorCode(err) == ErrCodeProtocol {
return _relayShouldRelease, r.conn.close(LogField{"reason", "RelayHost returned protocol error"})
}
return _relayShouldRelease, nil
}
// Check that the current connection is in a valid state to handle a new call.
if canHandle, state := r.canHandleNewCall(); !canHandle {
call.Failed("relay-client-conn-inactive")
call.End()
err := errConnNotActive{"incoming", state}
r.conn.SendSystemError(f.Header.ID, f.Span(), NewWrappedSystemError(ErrCodeDeclined, err))
return _relayShouldRelease, err
}
// Get a remote connection and check whether it can handle this call.
remoteConn, ok, err := r.getDestination(f, call)
if err == nil && ok {
if canHandle, state := remoteConn.relay.canHandleNewCall(); !canHandle {
err = NewWrappedSystemError(ErrCodeNetwork, errConnNotActive{"selected remote", state})
call.Failed("relay-remote-inactive")
r.conn.SendSystemError(f.Header.ID, f.Span(), NewWrappedSystemError(ErrCodeDeclined, err))
}
}
if err != nil || !ok {
// Failed to get a remote connection, or the connection is not in the right
// state to handle this call. Since we already incremented pending on
// the current relay, we need to decrement it.
r.decrementPending()
call.End()
return _relayShouldRelease, err
}
origID := f.Header.ID
destinationID := remoteConn.NextMessageID()
ttl := f.TTL()
if ttl > r.maxTimeout {
ttl = r.maxTimeout
f.SetTTL(r.maxTimeout)
}
span := f.Span()
var mutatedChecksum Checksum
if len(f.arg2Appends) > 0 {
mutatedChecksum = f.checksumType.New()
}
// The remote side of the relay doesn't need to track stats or call state.
remoteConn.relay.addRelayItem(false /* isOriginator */, destinationID, f.Header.ID, r, ttl, span, call, nil /* mutatedChecksum */)
relayToDest := r.addRelayItem(true /* isOriginator */, f.Header.ID, destinationID, remoteConn.relay, ttl, span, call, mutatedChecksum)
f.Header.ID = destinationID
// If we have appends, the size of the frame to be relayed will change, potentially going
// over the max frame size. Do a fragmenting send which is slightly more expensive but
// will handle fragmenting if it is needed.
if len(f.arg2Appends) > 0 {
if err := r.fragmentingSend(call, f, relayToDest, origID); err != nil {
r.failRelayItem(r.outbound, origID, _relayArg2ModifyFailed, err)
r.logger.WithFields(
LogField{"id", origID},
LogField{"err", err.Error()},
LogField{"caller", string(f.Caller())},
LogField{"dest", string(f.Service())},
LogField{"method", string(f.Method())},
).Warn("Failed to send call with modified arg2.")
}
// fragmentingSend always sends new frames in place of the old frame so we must
// release it separately
return _relayShouldRelease, nil
}
call.SentBytes(f.Frame.Header.FrameSize())
sent, failure := relayToDest.destination.Receive(f.Frame, requestFrame)
if !sent {
r.failRelayItem(r.outbound, origID, failure, errFrameNotSent)
return _relayShouldRelease, nil
}
return _relayNoRelease, nil
}