servicebus/Servicebus.go (126 lines of code) (raw):

package servicebus import ( "context" "errors" "github.com/Azure/aks-middleware/grpc/server/ctxlogger" "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" ) type ServiceBus struct { // In the future, we can add more types of service bus here. Client *azservicebus.Client } type ServiceBusReceiver struct { Receiver *azservicebus.Receiver } type ServiceBusSender struct { Sender *azservicebus.Sender } func CreateServiceBusClient(ctx context.Context, clientUrl string, credential azcore.TokenCredential, options *azservicebus.ClientOptions) (ServiceBusClientInterface, error) { logger := ctxlogger.GetLogger(ctx) logger.Info("Creating Service Bus!") if credential == nil { var err error credential, err = azidentity.NewDefaultAzureCredential(nil) if err != nil { logger.Error("Error getting token credential") return nil, err } } client, err := azservicebus.NewClient(clientUrl, credential, options) if err != nil { logger.Error("Error getting client.") return nil, err } servicebus := &ServiceBus{ Client: client, } return servicebus, nil } func CreateServiceBusClientFromConnectionString(ctx context.Context, connectionString string, options *azservicebus.ClientOptions) (ServiceBusClientInterface, error) { logger := ctxlogger.GetLogger(ctx) logger.Info("Creating Service Bus from Connection String!") client, err := azservicebus.NewClientFromConnectionString(connectionString, options) if err != nil { logger.Error("Error getting client.") return nil, err } servicebus := &ServiceBus{ Client: client, } return servicebus, nil } func (sb *ServiceBus) NewServiceBusReceiver(ctx context.Context, topicOrQueue string, options *azservicebus.ReceiverOptions) (ReceiverInterface, error) { logger := ctxlogger.GetLogger(ctx) logger.Info("Creating new service bus receiver.") receiver, err := sb.Client.NewReceiverForQueue(topicOrQueue, options) if err != nil { logger.Error("Error getting receiver.") return nil, err } serviceBusReceiver := &ServiceBusReceiver{ Receiver: receiver, } return serviceBusReceiver, nil } func (sb *ServiceBus) NewServiceBusSender(ctx context.Context, queue string, options *azservicebus.NewSenderOptions) (SenderInterface, error) { logger := ctxlogger.GetLogger(ctx) logger.Info("Creating new service bus sender.") sender, err := sb.Client.NewSender(queue, options) if err != nil { logger.Error("Error getting the sender") return nil, err } serviceBusSender := &ServiceBusSender{ Sender: sender, } return serviceBusSender, nil } func (s *ServiceBusSender) SendMessage(ctx context.Context, message []byte) error { logger := ctxlogger.GetLogger(ctx) logger.Info("Sending message through service bus sender.") packagedMessage := &azservicebus.Message{ Body: message, } err := s.Sender.SendMessage(ctx, packagedMessage, nil) if err != nil { logger.Error("Error Sending message") return err } logger.Info("Message sent successfully!") return nil } func (s *ServiceBusSender) GetAzureSender() (*azservicebus.Sender, error) { if s.Sender != nil { return s.Sender, nil } else { return nil, errors.New("No Sender was found.") } } func (s *ServiceBusReceiver) GetAzureReceiver() (*azservicebus.Receiver, error) { if s.Receiver != nil { return s.Receiver, nil } else { return nil, errors.New("No Receiver was found.") } } func (r *ServiceBusReceiver) ReceiveMessage(ctx context.Context) ([]byte, error) { logger := ctxlogger.GetLogger(ctx) logger.Info("Receiving message") messages, err := r.Receiver.ReceiveMessages(ctx, 1, nil) if err != nil { logger.Info("Error receiving message!") return nil, err } var body []byte for _, message := range messages { body = message.Body logger.Info("%s\n" + string(body)) err = r.Receiver.CompleteMessage(ctx, message, nil) if err != nil { logger.Info("Error completing message!") return nil, err } } return body, nil }