func()

in runtime/tchannel_client.go [176:292]


func (c *TChannelClient) call(
	ctx context.Context,
	call *tchannelOutboundCall,
	reqHeaders map[string]string,
	req, resp RWTStruct,
) (success bool, resHeaders map[string]string, err error) {
	defer func() {
		call.finish(ctx, err)
		if call.resHeaders == nil {
			call.resHeaders = make(map[string]string)
		}
		call.resHeaders[ClientResponseDurationKey] = call.duration.String()
	}()

	timeoutAndRetryOptions := GetTimeoutAndRetryOptions(ctx)

	call.start()

	reqUUID := RequestUUIDFromCtx(ctx)
	if reqUUID != "" {
		if reqHeaders == nil {
			reqHeaders = make(map[string]string)
		}
		reqHeaders[c.requestUUIDHeaderKey] = reqUUID
	}

	// Start passing the MaxAttempt field which will be used while creating the RetryOptions.
	// Note : No impact on the existing clients because MaxAttempt will be passed as 0 and it will default to 5 while retrying the execution.
	// More details can be found at https://t3.uberinternal.com/browse/EDGE-8526
	retryOpts := tchannel.RetryOptions{
		TimeoutPerAttempt: c.timeoutPerAttempt,
		MaxAttempts:       c.maxAttempts,
	}

	//override timeout and retry config with endpoint level’s config
	//when retryCount is 0, we assume endpoint level’s config is not provided

	timeout := c.timeout
	if timeoutAndRetryOptions != nil && timeoutAndRetryOptions.MaxAttempts != 0 {
		retryOpts = tchannel.RetryOptions{
			TimeoutPerAttempt: timeoutAndRetryOptions.RequestTimeoutPerAttemptInMs,
			MaxAttempts:       timeoutAndRetryOptions.MaxAttempts,
		}
		timeout = timeoutAndRetryOptions.OverallTimeoutInMs
	}

	ctxBuilder := tchannel.NewContextBuilder(timeout).
		SetParentContext(ctx).
		SetRetryOptions(&retryOpts)
	if c.routingKey != nil {
		ctxBuilder.SetRoutingKey(*c.routingKey)
	}
	rd := GetRoutingDelegateFromCtx(ctx)
	if rd != "" {
		ctxBuilder.SetRoutingDelegate(rd)
	}

	sk := GetShardKeyFromCtx(ctx)
	if sk != "" {
		ctxBuilder.SetShardKey(sk)
	}

	ctx, cancel := ctxBuilder.Build()
	defer cancel()

	err = c.ch.RunWithRetry(ctx, func(ctx netContext.Context, rs *tchannel.RequestState) (cerr error) {
		call.resHeaders = map[string]string{}
		call.success = false

		sc, ctx := c.getDynamicChannelWithFallback(reqHeaders, c.sc, ctx)
		call.call, cerr = sc.BeginCall(ctx, call.serviceMethod, &tchannel.CallOptions{
			Format:          tchannel.Thrift,
			ShardKey:        GetShardKeyFromCtx(ctx),
			RequestState:    rs,
			RoutingDelegate: GetRoutingDelegateFromCtx(ctx),
		})
		if cerr != nil {
			return errors.Wrapf(
				cerr, "Could not begin outbound %s.%s (%s %s) request",
				call.client.ClientID, call.methodName, call.client.serviceName, call.serviceMethod,
			)
		}

		// trace request
		reqHeaders = tchannel.InjectOutboundSpan(call.call.Response(), reqHeaders)

		if cerr := call.writeReqHeaders(reqHeaders); cerr != nil {
			return cerr
		}
		if cerr := call.writeReqBody(ctx, req); cerr != nil {
			return cerr
		}

		response := call.call.Response()
		if cerr = call.readResHeaders(response); cerr != nil {
			return cerr
		}
		if cerr = call.readResBody(ctx, response, resp); cerr != nil {
			return cerr
		}

		return cerr
	})

	if err != nil {
		// Do not wrap system errors.
		if _, ok := err.(tchannel.SystemError); ok {
			return call.success, call.resHeaders, err
		}
		return call.success, nil, errors.Wrapf(
			err, "Could not make outbound %s.%s (%s %s) response",
			call.client.ClientID, call.methodName, call.client.serviceName, call.serviceMethod,
		)
	}

	return call.success, call.resHeaders, err
}