go/services/statestore/notify.go (82 lines of code) (raw):
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
package statestore
import (
"context"
"encoding/hex"
"sync"
"github.com/Azure/iot-operations-sdks/go/protocol"
"github.com/Azure/iot-operations-sdks/go/protocol/hlc"
"github.com/Azure/iot-operations-sdks/go/services/statestore/internal/resp"
)
// Notify represents a notification event.
type Notify[K, V Bytes] struct {
Key K
Operation string
Value V
Version hlc.HybridLogicalClock
// Ack provides a function to manually ack if enabled; it will be nil
// otherwise.
Ack func()
}
// Notify requests a notification channel for a key. It returns the channel and
// a function to remove and close that channel. Note that KeyNotify must be
// called to actually perform the notification request (though notifications may
// be received on this channel if KeyNotify had already been called previously).
// Also please note that the state store does not queue messages when the client
// is disconnected, therefore notifications received on this channel are not
// guaranteed, may be duplicated, and may come out-of-order during reconnection.
func (c *Client[K, V]) Notify(key K) (<-chan Notify[K, V], func()) {
k := string(key)
// Give the channel a buffer of 1 so we can iterate through them quickly.
ch := make(chan Notify[K, V], 1)
done := make(chan struct{})
c.notifyMu.Lock()
defer c.notifyMu.Unlock()
kn, ok := c.notify[k]
if !ok {
kn = map[chan Notify[K, V]]chan struct{}{}
c.notify[k] = kn
}
kn[ch] = done
return ch, sync.OnceFunc(func() {
close(done)
c.notifyMu.Lock()
defer c.notifyMu.Unlock()
close(ch)
delete(kn, ch)
if len(kn) == 0 {
delete(c.notify, k)
}
})
}
// Receive a NOTIFY message.
func (c *Client[K, V]) notifyReceive(
ctx context.Context,
msg *protocol.TelemetryMessage[[]byte],
) error {
hexKey, ok := msg.TopicTokens["keyName"]
if !ok {
return resp.PayloadError("missing key name")
}
bytKey, err := hex.DecodeString(hexKey)
if err != nil {
return resp.PayloadError("invalid key name %q", hexKey)
}
data, err := resp.BlobArray[[]byte](msg.Payload)
if err != nil {
return err
}
opOnly := len(data) == 2
hasValue := len(data) == 4
if (!opOnly && !hasValue) ||
(string(data[0]) != "NOTIFY") ||
(hasValue && string(data[2]) != "VALUE") {
return resp.PayloadError("invalid payload %q", string(msg.Payload))
}
key := K(bytKey)
op := string(data[1])
var val V
if hasValue {
val = V(data[3])
}
c.notifySend(ctx, &Notify[K, V]{key, op, val, msg.Timestamp, msg.Ack})
return nil
}
func (c *Client[K, V]) notifySend(ctx context.Context, notify *Notify[K, V]) {
// TODO: Lock less globally if possible, but keep it simple for now.
c.notifyMu.RLock()
defer c.notifyMu.RUnlock()
for ch, done := range c.notify[string(notify.Key)] {
select {
case ch <- *notify:
case <-done:
case <-ctx.Done():
}
}
}