func()

in inbound.go [44:133]


func (c *Connection) handleCallReq(frame *Frame) bool {
	now := c.timeNow()
	switch state := c.readState(); state {
	case connectionActive:
		break
	case connectionStartClose, connectionInboundClosed, connectionClosed:
		c.SendSystemError(frame.Header.ID, callReqSpan(frame), ErrChannelClosed)
		return true
	default:
		panic(fmt.Errorf("unknown connection state for call req: %v", state))
	}

	callReq := new(callReq)
	callReq.id = frame.Header.ID
	initialFragment, err := parseInboundFragment(c.opts.FramePool, frame, callReq)
	if err != nil {
		// TODO(mmihic): Probably want to treat this as a protocol error
		c.log.WithFields(
			LogField{"header", frame.Header},
			ErrField(err),
		).Error("Couldn't decode initial fragment.")
		return true
	}

	call := new(InboundCall)
	call.conn = c
	ctx, cancel := newIncomingContext(c.baseContext, call, callReq.TimeToLive)

	mex, err := c.inbound.newExchange(ctx, cancel, c.opts.FramePool, callReq.messageType(), frame.Header.ID, mexChannelBufferSize)
	if err != nil {
		if err == errDuplicateMex {
			err = errInboundRequestAlreadyActive
		}
		c.log.WithFields(LogField{"header", frame.Header}).Error("Couldn't register exchange.")
		c.protocolError(frame.Header.ID, errInboundRequestAlreadyActive)
		return true
	}

	// Close may have been called between the time we checked the state and us creating the exchange.
	if c.readState() != connectionActive {
		mex.shutdown()
		return true
	}

	response := new(InboundCallResponse)
	response.call = call
	response.calledAt = now
	response.timeNow = c.timeNow
	response.span = c.extractInboundSpan(callReq)
	if response.span != nil {
		mex.ctx = opentracing.ContextWithSpan(mex.ctx, response.span)
	}
	response.mex = mex
	response.conn = c
	response.cancel = cancel
	response.log = c.log.WithFields(LogField{"In-Response", callReq.ID()})
	response.contents = newFragmentingWriter(response.log, response, initialFragment.checksumType.New())
	response.headers = transportHeaders{}
	response.messageForFragment = func(initial bool) message {
		if initial {
			callRes := new(callRes)
			callRes.Headers = response.headers
			callRes.ResponseCode = responseOK
			if response.applicationError {
				callRes.ResponseCode = responseApplicationError
			}
			return callRes
		}

		return new(callResContinue)
	}

	call.mex = mex
	call.initialFragment = initialFragment
	call.serviceName = string(callReq.Service)
	call.headers = callReq.Headers
	call.response = response
	call.log = c.log.WithFields(LogField{"In-Call", callReq.ID()})
	call.messageForFragment = func(initial bool) message { return new(callReqContinue) }
	call.contents = newFragmentingReader(call.log, call)
	call.statsReporter = c.statsReporter
	call.createStatsTags(c.commonStatsTags)

	response.statsReporter = c.statsReporter
	response.commonStatsTags = call.commonStatsTags

	setResponseHeaders(call.headers, response.headers)
	go c.dispatchInbound(c.connID, callReq.ID(), call, frame)
	return false
}