func()

in go/protocol/command_invoker.go [216:291]


func (ci *CommandInvoker[Req, Res]) Invoke(
	ctx context.Context,
	req Req,
	opt ...InvokeOption,
) (res *CommandResponse[Res], err error) {
	var opts InvokeOptions
	opts.Apply(opt)

	shallow := true
	defer func() { err = errutil.Return(err, ci.log, shallow) }()

	timeout := opts.Timeout
	if timeout == 0 {
		timeout = DefaultTimeout
	}

	expiry := &internal.Timeout{
		Duration: timeout,
		Name:     "MessageExpiry",
		Text:     commandInvokerErrStr,
	}
	if err := expiry.Validate(); err != nil {
		return nil, err
	}

	correlationData, err := errutil.NewUUID()
	if err != nil {
		return nil, err
	}

	msg := &Message[Req]{
		CorrelationData: correlationData,
		Payload:         req,
		Metadata:        opts.Metadata,
	}
	pub, err := ci.publisher.build(msg, opts.TopicTokens, expiry)
	if err != nil {
		return nil, err
	}

	pub.UserProperties[constants.Partition] = ci.publisher.client.ID()
	pub.ResponseTopic, err = ci.responseTopic.Topic(opts.TopicTokens)
	if err != nil {
		return nil, err
	}

	listen, done := ci.initPending(string(pub.CorrelationData))
	defer done()

	shallow = false
	err = ci.publisher.publish(ctx, pub)
	if err != nil {
		return nil, err
	}

	ci.log.Debug(ctx, "request sent",
		slog.String("topic", pub.Topic),
		slog.Any("correlation_data", pub.CorrelationData),
	)

	// If a message expiry was specified, also time out our own context, so that
	// we stop listening for a response when none will come.
	ctx, cancel := expiry.Context(ctx)
	defer cancel()

	select {
	case res := <-listen:
		ci.log.Debug(ctx, "response received",
			slog.String("topic", pub.ResponseTopic),
			slog.Any("correlation_data", pub.CorrelationData),
		)
		return res.res, res.err
	case <-ctx.Done():
		return nil, errutil.Context(ctx, commandInvokerErrStr)
	}
}