pkg/queue/message.go (113 lines of code) (raw):
package queue
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"os"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue"
"github.com/Azure/moby-packaging/pkg/archive"
)
const (
defaultAccountName = "moby"
defaultQueueName = "moby-packaging-signing-and-publishing"
)
var (
twoMinutesInSeconds int32 = 60 * 2
)
type Message struct {
Artifact ArtifactInfo `json:"artifact"`
Spec archive.Spec `json:"spec"`
}
type ArtifactInfo struct {
Name string `json:"name"`
URI string `json:"uri"`
Sha256Sum string `json:"sha256sum"`
}
type Messages struct {
Messages []*azqueue.DequeuedMessage
}
type Client struct {
c *azqueue.QueueClient
}
func (c *Client) GetAllMessages(ctx context.Context) (*Messages, error) {
var (
allMessages = []*azqueue.DequeuedMessage{}
max int32 = 32 // maximum number of messages for request
failures int = 0
totalFailures int = 0
errs error
allErrs error
dqOpts = azqueue.DequeueMessagesOptions{
NumberOfMessages: &max,
VisibilityTimeout: &twoMinutesInSeconds,
}
)
// Temporarily dequeue all the messages to ensure we don't enqueue a duplicate
for m, err := c.c.DequeueMessages(ctx, &dqOpts); len(m.Messages) != 0; m, err = c.c.DequeueMessages(ctx, &dqOpts) {
if err != nil {
errs = errors.Join(errs, err)
allErrs = errors.Join(allErrs, err)
totalFailures++
failures++
if failures > 4 || totalFailures > 10 {
fmt.Fprintf(os.Stderr, "##vso[task.logissue type=error;]failed to examine messages: %s\n", errs)
break
}
continue
}
allMessages = append(allMessages, m.Messages...)
errs = nil
failures = 0
}
return &Messages{Messages: allMessages}, allErrs
}
// used by trigger
func (m *Messages) ContainsBuild(spec archive.Spec) (bool, error) {
failures := 0
for _, rawMessage := range m.Messages {
if failures > 4 {
return false, fmt.Errorf("too many failures inspecting builds")
}
messageID := "unknown"
if rawMessage.MessageID != nil {
messageID = *rawMessage.MessageID
}
if rawMessage.MessageText == nil {
failures++
fmt.Fprintf(os.Stderr, "##vso[task.logissue type=error;]nil message with ID: %s\n", messageID)
continue
}
b, err := base64.StdEncoding.DecodeString(*rawMessage.MessageText)
if err != nil {
failures++
fmt.Fprintf(os.Stderr, "##vso[task.logissue type=error;]error decoding base64 string for message with ID: %s\n", messageID)
continue
}
var m Message
if err := json.Unmarshal(b, &m); err != nil {
failures++
fmt.Fprintf(os.Stderr, "##vso[task.logissue type=error;]error unmarshaling message with ID: %s\n", messageID)
continue
}
if m.Spec == spec {
return true, nil
}
}
return false, nil
}
func NewDefaultSignQueueClient() (*Client, error) {
return NewClient(defaultAccountName, defaultQueueName)
}
func NewClient(accountName, queueName string) (*Client, error) {
credential, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
return nil, err
}
serviceURL := fmt.Sprintf("https://%s.queue.core.windows.net", accountName)
sClient, err := azqueue.NewServiceClient(serviceURL, credential, nil)
if err != nil {
return nil, err
}
return &Client{c: sClient.NewQueueClient(queueName)}, nil
}