func()

in relay.go [426:532]


func (r *Relayer) handleCallReq(f *lazyCallReq) (shouldRelease bool, _ error) {
	if handled := r.handleLocalCallReq(f); handled {
		return _relayNoRelease, nil
	}

	call, err := r.relayHost.Start(f, r.relayConn)
	if err != nil {
		// If we have a RateLimitDropError we record the statistic, but
		// we *don't* send an error frame back to the client.
		if _, silentlyDrop := err.(relay.RateLimitDropError); silentlyDrop {
			if call != nil {
				call.Failed("relay-dropped")
				call.End()
			}
			return _relayShouldRelease, nil
		}
		if _, ok := err.(SystemError); !ok {
			err = NewSystemError(ErrCodeDeclined, err.Error())
		}
		if call != nil {
			call.Failed(GetSystemErrorCode(err).relayMetricsKey())
			call.End()
		}
		r.conn.SendSystemError(f.Header.ID, f.Span(), err)

		// If the RelayHost returns a protocol error, close the connection.
		if GetSystemErrorCode(err) == ErrCodeProtocol {
			return _relayShouldRelease, r.conn.close(LogField{"reason", "RelayHost returned protocol error"})
		}
		return _relayShouldRelease, nil
	}

	// Check that the current connection is in a valid state to handle a new call.
	if canHandle, state := r.canHandleNewCall(); !canHandle {
		call.Failed("relay-client-conn-inactive")
		call.End()
		err := errConnNotActive{"incoming", state}
		r.conn.SendSystemError(f.Header.ID, f.Span(), NewWrappedSystemError(ErrCodeDeclined, err))
		return _relayShouldRelease, err
	}

	// Get a remote connection and check whether it can handle this call.
	remoteConn, ok, err := r.getDestination(f, call)
	if err == nil && ok {
		if canHandle, state := remoteConn.relay.canHandleNewCall(); !canHandle {
			err = NewWrappedSystemError(ErrCodeNetwork, errConnNotActive{"selected remote", state})
			call.Failed("relay-remote-inactive")
			r.conn.SendSystemError(f.Header.ID, f.Span(), NewWrappedSystemError(ErrCodeDeclined, err))
		}
	}
	if err != nil || !ok {
		// Failed to get a remote connection, or the connection is not in the right
		// state to handle this call. Since we already incremented pending on
		// the current relay, we need to decrement it.
		r.decrementPending()
		call.End()
		return _relayShouldRelease, err
	}

	origID := f.Header.ID
	destinationID := remoteConn.NextMessageID()
	ttl := f.TTL()
	if ttl > r.maxTimeout {
		ttl = r.maxTimeout
		f.SetTTL(r.maxTimeout)
	}
	span := f.Span()

	var mutatedChecksum Checksum
	if len(f.arg2Appends) > 0 {
		mutatedChecksum = f.checksumType.New()
	}

	// The remote side of the relay doesn't need to track stats or call state.
	remoteConn.relay.addRelayItem(false /* isOriginator */, destinationID, f.Header.ID, r, ttl, span, call, nil /* mutatedChecksum */)
	relayToDest := r.addRelayItem(true /* isOriginator */, f.Header.ID, destinationID, remoteConn.relay, ttl, span, call, mutatedChecksum)

	f.Header.ID = destinationID

	// If we have appends, the size of the frame to be relayed will change, potentially going
	// over the max frame size. Do a fragmenting send which is slightly more expensive but
	// will handle fragmenting if it is needed.
	if len(f.arg2Appends) > 0 {
		if err := r.fragmentingSend(call, f, relayToDest, origID); err != nil {
			r.failRelayItem(r.outbound, origID, _relayArg2ModifyFailed, err)
			r.logger.WithFields(
				LogField{"id", origID},
				LogField{"err", err.Error()},
				LogField{"caller", string(f.Caller())},
				LogField{"dest", string(f.Service())},
				LogField{"method", string(f.Method())},
			).Warn("Failed to send call with modified arg2.")
		}

		// fragmentingSend always sends new frames in place of the old frame so we must
		// release it separately
		return _relayShouldRelease, nil
	}

	call.SentBytes(f.Frame.Header.FrameSize())
	sent, failure := relayToDest.destination.Receive(f.Frame, requestFrame)
	if !sent {
		r.failRelayItem(r.outbound, origID, failure, errFrameNotSent)
		return _relayShouldRelease, nil
	}
	return _relayNoRelease, nil
}