queue_manager.go (322 lines of code) (raw):

package ali_mns import ( "fmt" "net/http" "strconv" "strings" "github.com/gogap/errors" ) type AliQueueManager interface { CreateSimpleQueue(queueName string) (err error) CreateQueue(queueName string, delaySeconds int32, maxMessageSize int32, messageRetentionPeriod int32, visibilityTimeout int32, pollingWaitSeconds int32, slices int32) (err error) CreateQueueWithOptions(queueName string, options ...QueueOption) (err error) SetQueueAttributes(queueName string, delaySeconds int32, maxMessageSize int32, messageRetentionPeriod int32, visibilityTimeout int32, pollingWaitSeconds int32, slices int32) (err error) SetQueueAttributesWithOptions(queueName string, options ...QueueOption) (err error) GetQueueAttributes(queueName string) (attr QueueAttribute, err error) DeleteQueue(queueName string) (err error) ListQueue(nextMarker string, retNumber int32, prefix string) (queues Queues, err error) ListQueueDetail(nextMarker string, retNumber int32, prefix string) (queueDetails QueueDetails, err error) } type MNSQueueManager struct { cli MNSClient decoder MNSDecoder } type QueueOptions struct { delaySeconds int32 maxMessageSize int32 messageRetentionPeriod int32 visibilityTimeout int32 pollingWaitSeconds int32 loggingEnabled bool } type QueueOption func(*QueueOptions, map[string]bool) func WithDelaySeconds(delay int32) QueueOption { return func(o *QueueOptions, tracker map[string]bool) { o.delaySeconds = delay tracker["delaySeconds"] = true } } func WithMaxMessageSize(size int32) QueueOption { return func(o *QueueOptions, tracker map[string]bool) { o.maxMessageSize = size tracker["maxMessageSize"] = true } } func WithMessageRetentionPeriod(period int32) QueueOption { return func(o *QueueOptions, tracker map[string]bool) { o.messageRetentionPeriod = period tracker["messageRetentionPeriod"] = true } } func WithVisibilityTimeout(timeout int32) QueueOption { return func(o *QueueOptions, tracker map[string]bool) { o.visibilityTimeout = timeout tracker["visibilityTimeout"] = true } } func WithPollingWaitSeconds(seconds int32) QueueOption { return func(o *QueueOptions, tracker map[string]bool) { o.pollingWaitSeconds = seconds tracker["pollingWaitSeconds"] = true } } func WithLoggingEnabled(enabled bool) QueueOption { return func(o *QueueOptions, tracker map[string]bool) { o.loggingEnabled = enabled tracker["loggingEnabled"] = true } } func checkQueueName(queueName string) (err error) { if len(queueName) > 256 { err = ERR_MNS_QUEUE_NAME_IS_TOO_LONG.New() return } return } func checkDelaySeconds(seconds int32) (err error) { if seconds > 60480 || seconds < 0 { err = ERR_MNS_DELAY_SECONDS_RANGE_ERROR.New() return } return } func checkMessageRetentionPeriod(retentionPeriod int32) (err error) { if retentionPeriod < 60 || retentionPeriod > 1296000 { err = ERR_MNS_MSG_RETENTION_PERIOD_RANGE_ERROR.New() return } return } func checkVisibilityTimeout(visibilityTimeout int32) (err error) { if visibilityTimeout < 1 || visibilityTimeout > 43200 { err = ERR_MNS_MSG_VISIBILITY_TIMEOUT_RANGE_ERROR.New() return } return } func checkPollingWaitSeconds(pollingWaitSeconds int32) (err error) { if pollingWaitSeconds < 0 || pollingWaitSeconds > 30 { err = ERR_MNS_MSG_POOLLING_WAIT_SECONDS_RANGE_ERROR.New() return } return } func NewMNSQueueManager(client MNSClient) AliQueueManager { return &MNSQueueManager{ cli: client, decoder: NewAliMNSDecoder(), } } func checkAttributes(delaySeconds int32, messageRetentionPeriod int32, visibilityTimeout int32, pollingWaitSeconds int32) (err error) { if err = checkDelaySeconds(delaySeconds); err != nil { return } if err = checkMessageRetentionPeriod(messageRetentionPeriod); err != nil { return } if err = checkVisibilityTimeout(visibilityTimeout); err != nil { return } if err = checkPollingWaitSeconds(pollingWaitSeconds); err != nil { return } return } func (p *MNSQueueManager) CreateSimpleQueue(queueName string) (err error) { return p.CreateQueue(queueName, 0, 65536, 345600, 30, 0, 2) } func (p *MNSQueueManager) CreateQueue(queueName string, delaySeconds int32, maxMessageSize int32, messageRetentionPeriod int32, visibilityTimeout int32, pollingWaitSeconds int32, slices int32) (err error) { queueName = strings.TrimSpace(queueName) if err = checkQueueName(queueName); err != nil { return } if err = checkAttributes(delaySeconds, messageRetentionPeriod, visibilityTimeout, pollingWaitSeconds); err != nil { return } message := CreateQueueRequest{ DelaySeconds: delaySeconds, MaxMessageSize: maxMessageSize, MessageRetentionPeriod: messageRetentionPeriod, VisibilityTimeout: visibilityTimeout, PollingWaitSeconds: pollingWaitSeconds, LoggingEnabled: false, } var code int code, err = send(p.cli, p.decoder, PUT, nil, &message, "queues/"+queueName, nil) if code == http.StatusNoContent { err = ERR_MNS_QUEUE_ALREADY_EXIST_AND_HAVE_SAME_ATTR.New(errors.Params{"name": queueName}) return } return } func (p *MNSQueueManager) CreateQueueWithOptions(queueName string, options ...QueueOption) (err error) { queueName = strings.TrimSpace(queueName) if err = checkQueueName(queueName); err != nil { return } opts := defaultQueueOptions() tracker := make(map[string]bool) for _, opt := range options { opt(&opts, tracker) } if err = checkAttributes(opts.delaySeconds, opts.messageRetentionPeriod, opts.visibilityTimeout, opts.pollingWaitSeconds); err != nil { return } message := CreateQueueRequest{ DelaySeconds: opts.delaySeconds, MaxMessageSize: opts.maxMessageSize, MessageRetentionPeriod: opts.messageRetentionPeriod, VisibilityTimeout: opts.visibilityTimeout, PollingWaitSeconds: opts.pollingWaitSeconds, LoggingEnabled: opts.loggingEnabled, } var code int code, err = send(p.cli, p.decoder, PUT, nil, &message, "queues/"+queueName, nil) if code == http.StatusNoContent { err = ERR_MNS_QUEUE_ALREADY_EXIST_AND_HAVE_SAME_ATTR.New(errors.Params{"name": queueName}) return } return } func (p *MNSQueueManager) SetQueueAttributes(queueName string, delaySeconds int32, maxMessageSize int32, messageRetentionPeriod int32, visibilityTimeout int32, pollingWaitSeconds int32, slices int32) (err error) { queueName = strings.TrimSpace(queueName) if err = checkQueueName(queueName); err != nil { return } if err = checkAttributes(delaySeconds, messageRetentionPeriod, visibilityTimeout, pollingWaitSeconds); err != nil { return } message := CreateQueueRequest{ DelaySeconds: delaySeconds, MaxMessageSize: maxMessageSize, MessageRetentionPeriod: messageRetentionPeriod, VisibilityTimeout: visibilityTimeout, PollingWaitSeconds: pollingWaitSeconds, } _, err = send(p.cli, p.decoder, PUT, nil, &message, fmt.Sprintf("queues/%s?metaoverride=true", queueName), nil) return } func (p *MNSQueueManager) SetQueueAttributesWithOptions(queueName string, options ...QueueOption) (err error) { queueName = strings.TrimSpace(queueName) if err = checkQueueName(queueName); err != nil { return } opts := QueueOptions{} tracker := make(map[string]bool) for _, opt := range options { opt(&opts, tracker) } message := CreateQueueRequest{} if tracker["delaySeconds"] { if err = checkDelaySeconds(opts.delaySeconds); err != nil { return } message.DelaySeconds = opts.delaySeconds } if tracker["maxMessageSize"] { message.MaxMessageSize = opts.maxMessageSize } if tracker["messageRetentionPeriod"] { if err = checkMessageRetentionPeriod(opts.messageRetentionPeriod); err != nil { return } message.MessageRetentionPeriod = opts.messageRetentionPeriod } if tracker["visibilityTimeout"] { if err = checkVisibilityTimeout(opts.visibilityTimeout); err != nil { return } message.VisibilityTimeout = opts.visibilityTimeout } if tracker["pollingWaitSeconds"] { if err = checkPollingWaitSeconds(opts.pollingWaitSeconds); err != nil { return } message.PollingWaitSeconds = opts.pollingWaitSeconds } if tracker["loggingEnabled"] { message.LoggingEnabled = opts.loggingEnabled } _, err = send(p.cli, p.decoder, PUT, nil, &message, fmt.Sprintf("queues/%s?metaoverride=true", queueName), nil) return } func (p *MNSQueueManager) GetQueueAttributes(queueName string) (attr QueueAttribute, err error) { queueName = strings.TrimSpace(queueName) if err = checkQueueName(queueName); err != nil { return } _, err = send(p.cli, p.decoder, GET, nil, nil, "queues/"+queueName, &attr) return } func (p *MNSQueueManager) DeleteQueue(queueName string) (err error) { queueName = strings.TrimSpace(queueName) if err = checkQueueName(queueName); err != nil { return } _, err = send(p.cli, p.decoder, DELETE, nil, nil, "queues/"+queueName, nil) return } func (p *MNSQueueManager) ListQueue(nextMarker string, retNumber int32, prefix string) (queues Queues, err error) { header := map[string]string{} marker := strings.TrimSpace(nextMarker) if len(marker) > 0 { if marker != "" { header["x-mns-marker"] = marker } } if retNumber > 0 { if retNumber >= 1 && retNumber <= 1000 { header["x-mns-ret-number"] = strconv.Itoa(int(retNumber)) } else { err = ERR_MNS_RET_NUMBER_RANGE_ERROR.New() return } } prefix = strings.TrimSpace(prefix) if prefix != "" { header["x-mns-prefix"] = prefix } _, err = send(p.cli, p.decoder, GET, header, nil, "queues", &queues) return } func (p *MNSQueueManager) ListQueueDetail(nextMarker string, retNumber int32, prefix string) (queueDetails QueueDetails, err error) { header := map[string]string{} marker := strings.TrimSpace(nextMarker) if len(marker) > 0 { if marker != "" { header["x-mns-marker"] = marker } } if retNumber > 0 { if retNumber >= 1 && retNumber <= 1000 { header["x-mns-ret-number"] = strconv.Itoa(int(retNumber)) } else { err = ERR_MNS_RET_NUMBER_RANGE_ERROR.New() return } } prefix = strings.TrimSpace(prefix) if prefix != "" { header["x-mns-prefix"] = prefix } header["x-mns-with-meta"] = "true" _, err = send(p.cli, p.decoder, GET, header, nil, "queues", &queueDetails) return } func defaultQueueOptions() QueueOptions { return QueueOptions{ delaySeconds: 0, maxMessageSize: 65536, messageRetentionPeriod: 345600, visibilityTimeout: 30, pollingWaitSeconds: 0, loggingEnabled: false, } }