in go/services/statestore/notify.go [67:105]
func (c *Client[K, V]) notifyReceive(
ctx context.Context,
msg *protocol.TelemetryMessage[[]byte],
) error {
hexKey, ok := msg.TopicTokens["keyName"]
if !ok {
return resp.PayloadError("missing key name")
}
bytKey, err := hex.DecodeString(hexKey)
if err != nil {
return resp.PayloadError("invalid key name %q", hexKey)
}
data, err := resp.BlobArray[[]byte](msg.Payload)
if err != nil {
return err
}
opOnly := len(data) == 2
hasValue := len(data) == 4
if (!opOnly && !hasValue) ||
(string(data[0]) != "NOTIFY") ||
(hasValue && string(data[2]) != "VALUE") {
return resp.PayloadError("invalid payload %q", string(msg.Payload))
}
key := K(bytKey)
op := string(data[1])
var val V
if hasValue {
val = V(data[3])
}
c.notifySend(ctx, &Notify[K, V]{key, op, val, msg.Timestamp, msg.Ack})
return nil
}