func()

in relay.go [293:369]


func (r *Relayer) Receive(f *Frame, fType frameType) (sent bool, failureReason string) {
	id := f.Header.ID

	// If we receive a response frame, we expect to find that ID in our outbound.
	// If we receive a request frame, we expect to find that ID in our inbound.
	items := r.receiverItems(fType)
	finished := finishesCall(f)

	// Stop the timeout if the call if finished.
	item, stopped, ok := items.Get(id, finished /* stopTimeout */)
	if !ok {
		r.logger.WithFields(
			LogField{"id", id},
		).Warn("Received a frame without a RelayItem.")
		return false, _relayErrorNotFound
	}
	if item.tomb || (finished && !stopped) {
		// Item has previously timed out, or is in the process of timing out.
		// TODO: metrics for late-arriving frames.
		return true, ""
	}

	// call res frames don't include the OK bit, so we can't wait until the last
	// frame of a relayed RPC to determine if the call succeeded.
	if fType == responseFrame || f.messageType() == messageTypeCancel {
		// If we've gotten a response frame, we're the originating relayer and
		// should handle stats.
		if succeeded, failMsg := determinesCallSuccess(f); succeeded {
			item.call.Succeeded()
		} else if len(failMsg) > 0 {
			item.call.Failed(failMsg)
		}
	}
	select {
	case r.conn.sendCh <- f:
	default:
		// Buffer is full, so drop this frame and cancel the call.

		// Since this is typically due to the send buffer being full, get send buffer
		// usage + limit and add that to the log.
		sendBuf, sendBufLimit, sendBufErr := r.conn.sendBufSize()
		now := r.conn.timeNow().UnixNano()
		logFields := []LogField{
			{"id", id},
			{"destConnSendBufferCurrent", sendBuf},
			{"destConnSendBufferLimit", sendBufLimit},
			{"sendChQueued", len(r.conn.sendCh)},
			{"sendChCapacity", cap(r.conn.sendCh)},
			{"lastActivityRead", r.conn.lastActivityRead.Load()},
			{"lastActivityWrite", r.conn.lastActivityRead.Load()},
			{"sinceLastActivityRead", time.Duration(now - r.conn.lastActivityRead.Load()).String()},
			{"sinceLastActivityWrite", time.Duration(now - r.conn.lastActivityWrite.Load()).String()},
		}
		if sendBufErr != nil {
			logFields = append(logFields, LogField{"destConnSendBufferError", sendBufErr.Error()})
		}

		r.logger.WithFields(logFields...).Warn("Dropping call due to slow connection.")

		items := r.receiverItems(fType)

		err := _relayErrorDestConnSlow
		// If we're dealing with a response frame, then the client is slow.
		if fType == responseFrame {
			err = _relayErrorSourceConnSlow
		}

		r.failRelayItem(items, id, err, errFrameNotSent)
		return false, err
	}

	if finished {
		r.finishRelayItem(items, id)
	}

	return true, ""
}