func New[K, V Bytes]()

in go/services/statestore/client.go [87:158]


func New[K, V Bytes](
	app *protocol.Application,
	client MqttClient,
	opt ...ClientOption,
) (c *Client[K, V], err error) {
	ctx := context.Background()

	var opts ClientOptions
	opts.Apply(opt)

	c = &Client[K, V]{
		client:    client,
		notify:    map[string]map[chan Notify[K, V]]chan struct{}{},
		keynotify: map[string]uint{},
		log:       log.Wrap(opts.Logger),
	}

	defer func() {
		if err != nil {
			c.log.Warn(ctx, err)
			c.listeners.Close()
			c = nil
		}
	}()

	c.manualAck = opts.ManualAck

	tokens := protocol.WithTopicTokens{
		"clientId": strings.ToUpper(hex.EncodeToString([]byte(client.ID()))),
	}

	c.invoker, err = protocol.NewCommandInvoker(
		app,
		client,
		protocol.Raw{},
		protocol.Raw{},
		"statestore/v1/FA9AE35F-2F64-47CD-9BFF-08E2B32A0FE8/command/invoke",
		opts.invoker(),
		protocol.WithResponseTopicPrefix("clients/{clientId}"),
		protocol.WithResponseTopicSuffix("response"),
		tokens,
	)
	if err != nil {
		return c, err
	}
	c.listeners = append(c.listeners, c.invoker)

	c.receiver, err = protocol.NewTelemetryReceiver(
		app,
		client,
		protocol.Raw{},
		"clients/statestore/v1/FA9AE35F-2F64-47CD-9BFF-08E2B32A0FE8/{clientId}/command/notify/{keyName}",
		c.notifyReceive,
		opts.receiver(),
		tokens,
	)
	if err != nil {
		return c, err
	}
	c.listeners = append(c.listeners, c.invoker)

	ctx, cancel := context.WithCancel(ctx)
	done := client.RegisterConnectEventHandler(func(*mqtt.ConnectEvent) {
		c.reconnect(ctx)
	})
	c.done = func() {
		done()
		cancel()
	}

	return c, nil
}