go/services/schemaregistry/client.go (91 lines of code) (raw):
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
package schemaregistry
import (
"context"
"encoding/json"
"log/slog"
"github.com/Azure/iot-operations-sdks/go/internal/options"
"github.com/Azure/iot-operations-sdks/go/protocol"
"github.com/Azure/iot-operations-sdks/go/protocol/errors"
"github.com/Azure/iot-operations-sdks/go/services/schemaregistry/schemaregistry"
)
type (
// Client represents a client of the schema registry.
Client struct {
client *schemaregistry.SchemaRegistryClient
// TODO: Remove when no longer necessary for compat.
invID string
}
// ClientOption represents a single option for the client.
ClientOption interface{ client(*ClientOptions) }
// ClientOptions are the resolved options for the client.
ClientOptions struct {
Logger *slog.Logger
}
// Error represents an error returned by the schema registry.
Error struct {
Message string
PropertyName string
PropertyValue any
}
)
// New creates a new schema registry client.
func New(
app *protocol.Application,
client protocol.MqttClient,
opt ...ClientOption,
) (*Client, error) {
var opts ClientOptions
opts.Apply(opt)
sr, err := schemaregistry.NewSchemaRegistryClient(
app,
client,
opts.invoker(),
)
if err != nil {
return nil, err
}
return &Client{sr, client.ID()}, nil
}
// Start listening to all underlying MQTT topics.
func (c *Client) Start(ctx context.Context) error {
return c.client.Start(ctx)
}
// Close all underlying MQTT topics and free resources.
func (c *Client) Close() {
c.client.Close()
}
// Error returns the error message.
func (e *Error) Error() string {
return e.Message
}
//nolint:staticcheck // schemaregistry compat.
func translateError(err error) error {
switch e := err.(type) {
case *errors.Remote:
if k, ok := e.Kind.(errors.UnknownError); ok && k.PropertyName != "" {
return &Error{
Message: err.Error(),
PropertyName: k.PropertyName,
PropertyValue: k.PropertyValue,
}
}
case *errors.Client:
if _, ok := e.Kind.(errors.PayloadInvalid); ok {
if j, ok := e.Nested.(*json.SyntaxError); ok && j.Offset == 0 {
// We're already returning a nil schema (because of the error),
// so just treat the 404 case as not an error.
return nil
}
}
}
return err
}
// Apply resolves the provided list of options.
func (o *ClientOptions) Apply(
opts []ClientOption,
rest ...ClientOption,
) {
for opt := range options.Apply[ClientOption](opts, rest...) {
opt.client(o)
}
}
func (o *ClientOptions) client(opt *ClientOptions) {
if o != nil {
*opt = *o
}
}
func (o withLogger) client(opt *ClientOptions) {
opt.Logger = o.Logger
}
func (o *ClientOptions) invoker() *protocol.CommandInvokerOptions {
return &protocol.CommandInvokerOptions{
Logger: o.Logger,
}
}