func()

in go/services/statestore/notify.go [67:105]


func (c *Client[K, V]) notifyReceive(
	ctx context.Context,
	msg *protocol.TelemetryMessage[[]byte],
) error {
	hexKey, ok := msg.TopicTokens["keyName"]
	if !ok {
		return resp.PayloadError("missing key name")
	}

	bytKey, err := hex.DecodeString(hexKey)
	if err != nil {
		return resp.PayloadError("invalid key name %q", hexKey)
	}

	data, err := resp.BlobArray[[]byte](msg.Payload)
	if err != nil {
		return err
	}

	opOnly := len(data) == 2
	hasValue := len(data) == 4

	if (!opOnly && !hasValue) ||
		(string(data[0]) != "NOTIFY") ||
		(hasValue && string(data[2]) != "VALUE") {
		return resp.PayloadError("invalid payload %q", string(msg.Payload))
	}

	key := K(bytKey)
	op := string(data[1])
	var val V
	if hasValue {
		val = V(data[3])
	}

	c.notifySend(ctx, &Notify[K, V]{key, op, val, msg.Timestamp, msg.Ack})

	return nil
}