in outbound.go [38:138]
func (c *Connection) beginCall(ctx context.Context, serviceName, methodName string, callOptions *CallOptions) (*OutboundCall, error) {
now := c.timeNow()
switch state := c.readState(); state {
case connectionActive:
break
case connectionStartClose, connectionInboundClosed, connectionClosed:
return nil, ErrConnectionClosed
default:
return nil, errConnectionUnknownState{"beginCall", state}
}
deadline, ok := ctx.Deadline()
if !ok {
// This case is handled by validateCall, so we should
// never get here.
return nil, ErrTimeoutRequired
}
// If the timeToLive is less than a millisecond, it will be encoded as 0 on
// the wire, hence we return a timeout immediately.
timeToLive := deadline.Sub(now)
if timeToLive < time.Millisecond {
return nil, ErrTimeout
}
if err := ctx.Err(); err != nil {
return nil, GetContextError(err)
}
requestID := c.NextMessageID()
mex, err := c.outbound.newExchange(ctx, c.outboundCtxCancel, c.opts.FramePool, messageTypeCallReq, requestID, mexChannelBufferSize)
if err != nil {
return nil, err
}
// Close may have been called between the time we checked the state and us creating the exchange.
if state := c.readState(); state != connectionActive {
mex.shutdown()
return nil, ErrConnectionClosed
}
// Note: We don't verify number of transport headers as the library doesn't
// allow adding arbitrary headers. Ensure we never add >= 256 headers here.
headers := transportHeaders{
CallerName: c.localPeerInfo.ServiceName,
}
callOptions.setHeaders(headers)
if opts := currentCallOptions(ctx); opts != nil {
opts.overrideHeaders(headers)
}
call := new(OutboundCall)
call.mex = mex
call.conn = c
call.callReq = callReq{
id: requestID,
Headers: headers,
Service: serviceName,
TimeToLive: timeToLive,
}
call.statsReporter = c.statsReporter
call.createStatsTags(c.commonStatsTags, callOptions, methodName)
call.log = c.log.WithFields(LogField{"Out-Call", requestID})
// TODO(mmihic): It'd be nice to do this without an fptr
call.messageForFragment = func(initial bool) message {
if initial {
return &call.callReq
}
return new(callReqContinue)
}
call.contents = newFragmentingWriter(call.log, call, c.opts.ChecksumType.New())
response := new(OutboundCallResponse)
response.startedAt = now
response.timeNow = c.timeNow
response.requestState = callOptions.RequestState
response.mex = mex
response.log = c.log.WithFields(LogField{"Out-Response", requestID})
response.span = c.startOutboundSpan(ctx, serviceName, methodName, call, now)
response.messageForFragment = func(initial bool) message {
if initial {
return &response.callRes
}
return new(callResContinue)
}
response.contents = newFragmentingReader(response.log, response)
response.statsReporter = call.statsReporter
response.commonStatsTags = call.commonStatsTags
call.response = response
if err := call.writeMethod([]byte(methodName)); err != nil {
return nil, err
}
return call, nil
}