in go/services/statestore/client.go [87:158]
func New[K, V Bytes](
app *protocol.Application,
client MqttClient,
opt ...ClientOption,
) (c *Client[K, V], err error) {
ctx := context.Background()
var opts ClientOptions
opts.Apply(opt)
c = &Client[K, V]{
client: client,
notify: map[string]map[chan Notify[K, V]]chan struct{}{},
keynotify: map[string]uint{},
log: log.Wrap(opts.Logger),
}
defer func() {
if err != nil {
c.log.Warn(ctx, err)
c.listeners.Close()
c = nil
}
}()
c.manualAck = opts.ManualAck
tokens := protocol.WithTopicTokens{
"clientId": strings.ToUpper(hex.EncodeToString([]byte(client.ID()))),
}
c.invoker, err = protocol.NewCommandInvoker(
app,
client,
protocol.Raw{},
protocol.Raw{},
"statestore/v1/FA9AE35F-2F64-47CD-9BFF-08E2B32A0FE8/command/invoke",
opts.invoker(),
protocol.WithResponseTopicPrefix("clients/{clientId}"),
protocol.WithResponseTopicSuffix("response"),
tokens,
)
if err != nil {
return c, err
}
c.listeners = append(c.listeners, c.invoker)
c.receiver, err = protocol.NewTelemetryReceiver(
app,
client,
protocol.Raw{},
"clients/statestore/v1/FA9AE35F-2F64-47CD-9BFF-08E2B32A0FE8/{clientId}/command/notify/{keyName}",
c.notifyReceive,
opts.receiver(),
tokens,
)
if err != nil {
return c, err
}
c.listeners = append(c.listeners, c.invoker)
ctx, cancel := context.WithCancel(ctx)
done := client.RegisterConnectEventHandler(func(*mqtt.ConnectEvent) {
c.reconnect(ctx)
})
c.done = func() {
done()
cancel()
}
return c, nil
}