func()

in outbound.go [38:138]


func (c *Connection) beginCall(ctx context.Context, serviceName, methodName string, callOptions *CallOptions) (*OutboundCall, error) {
	now := c.timeNow()

	switch state := c.readState(); state {
	case connectionActive:
		break
	case connectionStartClose, connectionInboundClosed, connectionClosed:
		return nil, ErrConnectionClosed
	default:
		return nil, errConnectionUnknownState{"beginCall", state}
	}

	deadline, ok := ctx.Deadline()
	if !ok {
		// This case is handled by validateCall, so we should
		// never get here.
		return nil, ErrTimeoutRequired
	}

	// If the timeToLive is less than a millisecond, it will be encoded as 0 on
	// the wire, hence we return a timeout immediately.
	timeToLive := deadline.Sub(now)
	if timeToLive < time.Millisecond {
		return nil, ErrTimeout
	}

	if err := ctx.Err(); err != nil {
		return nil, GetContextError(err)
	}

	requestID := c.NextMessageID()
	mex, err := c.outbound.newExchange(ctx, c.outboundCtxCancel, c.opts.FramePool, messageTypeCallReq, requestID, mexChannelBufferSize)
	if err != nil {
		return nil, err
	}

	// Close may have been called between the time we checked the state and us creating the exchange.
	if state := c.readState(); state != connectionActive {
		mex.shutdown()
		return nil, ErrConnectionClosed
	}

	// Note: We don't verify number of transport headers as the library doesn't
	// allow adding arbitrary headers. Ensure we never add >= 256 headers here.
	headers := transportHeaders{
		CallerName: c.localPeerInfo.ServiceName,
	}
	callOptions.setHeaders(headers)
	if opts := currentCallOptions(ctx); opts != nil {
		opts.overrideHeaders(headers)
	}

	call := new(OutboundCall)
	call.mex = mex
	call.conn = c
	call.callReq = callReq{
		id:         requestID,
		Headers:    headers,
		Service:    serviceName,
		TimeToLive: timeToLive,
	}
	call.statsReporter = c.statsReporter
	call.createStatsTags(c.commonStatsTags, callOptions, methodName)
	call.log = c.log.WithFields(LogField{"Out-Call", requestID})

	// TODO(mmihic): It'd be nice to do this without an fptr
	call.messageForFragment = func(initial bool) message {
		if initial {
			return &call.callReq
		}

		return new(callReqContinue)
	}

	call.contents = newFragmentingWriter(call.log, call, c.opts.ChecksumType.New())

	response := new(OutboundCallResponse)
	response.startedAt = now
	response.timeNow = c.timeNow
	response.requestState = callOptions.RequestState
	response.mex = mex
	response.log = c.log.WithFields(LogField{"Out-Response", requestID})
	response.span = c.startOutboundSpan(ctx, serviceName, methodName, call, now)
	response.messageForFragment = func(initial bool) message {
		if initial {
			return &response.callRes
		}

		return new(callResContinue)
	}
	response.contents = newFragmentingReader(response.log, response)
	response.statsReporter = call.statsReporter
	response.commonStatsTags = call.commonStatsTags

	call.response = response

	if err := call.writeMethod([]byte(methodName)); err != nil {
		return nil, err
	}
	return call, nil
}