in inbound.go [44:133]
func (c *Connection) handleCallReq(frame *Frame) bool {
now := c.timeNow()
switch state := c.readState(); state {
case connectionActive:
break
case connectionStartClose, connectionInboundClosed, connectionClosed:
c.SendSystemError(frame.Header.ID, callReqSpan(frame), ErrChannelClosed)
return true
default:
panic(fmt.Errorf("unknown connection state for call req: %v", state))
}
callReq := new(callReq)
callReq.id = frame.Header.ID
initialFragment, err := parseInboundFragment(c.opts.FramePool, frame, callReq)
if err != nil {
// TODO(mmihic): Probably want to treat this as a protocol error
c.log.WithFields(
LogField{"header", frame.Header},
ErrField(err),
).Error("Couldn't decode initial fragment.")
return true
}
call := new(InboundCall)
call.conn = c
ctx, cancel := newIncomingContext(c.baseContext, call, callReq.TimeToLive)
mex, err := c.inbound.newExchange(ctx, cancel, c.opts.FramePool, callReq.messageType(), frame.Header.ID, mexChannelBufferSize)
if err != nil {
if err == errDuplicateMex {
err = errInboundRequestAlreadyActive
}
c.log.WithFields(LogField{"header", frame.Header}).Error("Couldn't register exchange.")
c.protocolError(frame.Header.ID, errInboundRequestAlreadyActive)
return true
}
// Close may have been called between the time we checked the state and us creating the exchange.
if c.readState() != connectionActive {
mex.shutdown()
return true
}
response := new(InboundCallResponse)
response.call = call
response.calledAt = now
response.timeNow = c.timeNow
response.span = c.extractInboundSpan(callReq)
if response.span != nil {
mex.ctx = opentracing.ContextWithSpan(mex.ctx, response.span)
}
response.mex = mex
response.conn = c
response.cancel = cancel
response.log = c.log.WithFields(LogField{"In-Response", callReq.ID()})
response.contents = newFragmentingWriter(response.log, response, initialFragment.checksumType.New())
response.headers = transportHeaders{}
response.messageForFragment = func(initial bool) message {
if initial {
callRes := new(callRes)
callRes.Headers = response.headers
callRes.ResponseCode = responseOK
if response.applicationError {
callRes.ResponseCode = responseApplicationError
}
return callRes
}
return new(callResContinue)
}
call.mex = mex
call.initialFragment = initialFragment
call.serviceName = string(callReq.Service)
call.headers = callReq.Headers
call.response = response
call.log = c.log.WithFields(LogField{"In-Call", callReq.ID()})
call.messageForFragment = func(initial bool) message { return new(callReqContinue) }
call.contents = newFragmentingReader(call.log, call)
call.statsReporter = c.statsReporter
call.createStatsTags(c.commonStatsTags)
response.statsReporter = c.statsReporter
response.commonStatsTags = call.commonStatsTags
setResponseHeaders(call.headers, response.headers)
go c.dispatchInbound(c.connID, callReq.ID(), call, frame)
return false
}