queue.go (175 lines of code) (raw):
package ali_mns
import (
"fmt"
"net/url"
)
var (
DefaultNumOfMessages int32 = 16
)
type AliMNSQueue interface {
QPSMonitor() *QPSMonitor
Name() string
SendMessage(message MessageSendRequest) (resp MessageSendResponse, err error)
BatchSendMessage(messages ...MessageSendRequest) (resp BatchMessageSendResponse, err error)
ReceiveMessage(respChan chan MessageReceiveResponse, errChan chan error, waitseconds ...int64)
BatchReceiveMessage(respChan chan BatchMessageReceiveResponse, errChan chan error, numOfMessages int32, waitseconds ...int64)
PeekMessage(respChan chan MessageReceiveResponse, errChan chan error)
BatchPeekMessage(respChan chan BatchMessageReceiveResponse, errChan chan error, numOfMessages int32)
DeleteMessage(receiptHandle string) (err error)
BatchDeleteMessage(receiptHandles ...string) (resp BatchMessageDeleteErrorResponse, err error)
ChangeMessageVisibility(receiptHandle string, visibilityTimeout int64) (resp MessageVisibilityChangeResponse, err error)
}
type MNSQueue struct {
name string
client MNSClient
decoder MNSDecoder
qpsMonitor *QPSMonitor
}
func NewMNSQueue(name string, client MNSClient, qps ...int32) AliMNSQueue {
if name == "" {
panic("ali_mns: queue name could not be empty")
}
queue := new(MNSQueue)
queue.client = client
queue.name = name
queue.decoder = NewAliMNSDecoder()
qpsLimit := DefaultQueueQPSLimit
if qps != nil && len(qps) == 1 && qps[0] > 0 {
qpsLimit = qps[0]
}
queue.qpsMonitor = NewQPSMonitor(5, qpsLimit)
return queue
}
func (p *MNSQueue) QPSMonitor() *QPSMonitor {
return p.qpsMonitor
}
func (p *MNSQueue) Name() string {
return p.name
}
func (p *MNSQueue) SendMessage(message MessageSendRequest) (resp MessageSendResponse, err error) {
p.qpsMonitor.checkQPS()
_, err = send(p.client, p.decoder, POST, nil, message, fmt.Sprintf("queues/%s/%s", p.name, "messages"), &resp)
return
}
func (p *MNSQueue) BatchSendMessage(messages ...MessageSendRequest) (resp BatchMessageSendResponse, err error) {
if messages == nil || len(messages) == 0 {
return
}
batchRequest := BatchMessageSendRequest{}
for _, message := range messages {
batchRequest.Messages = append(batchRequest.Messages, message)
}
p.qpsMonitor.checkQPS()
_, err = send(p.client, NewBatchOpDecoder(&resp), POST, nil, batchRequest, fmt.Sprintf("queues/%s/%s", p.name, "messages"), &resp)
return
}
func (p *MNSQueue) ReceiveMessage(respChan chan MessageReceiveResponse, errChan chan error, waitseconds ...int64) {
resource := fmt.Sprintf("queues/%s/%s", p.name, "messages")
if waitseconds != nil {
for _, waitsecond := range waitseconds {
if waitsecond <= 0 {
continue
}
resource = fmt.Sprintf("queues/%s/%s?waitseconds=%d", p.name, "messages", waitsecond)
p.qpsMonitor.checkQPS()
resp := MessageReceiveResponse{}
_, err := send(p.client, p.decoder, GET, nil, nil, resource, &resp)
if err != nil {
// if no
errChan <- err
} else {
respChan <- resp
// return if success, may be too much msg accumulated
return
}
}
} else {
p.qpsMonitor.checkQPS()
resp := MessageReceiveResponse{}
_, err := send(p.client, p.decoder, GET, nil, nil, resource, &resp)
if err != nil {
errChan <- err
} else {
respChan <- resp
}
}
// if no message after waitsecond loop or after once try if no waitsecond offered
return
}
func (p *MNSQueue) BatchReceiveMessage(respChan chan BatchMessageReceiveResponse, errChan chan error, numOfMessages int32, waitseconds ...int64) {
if numOfMessages <= 0 {
numOfMessages = DefaultNumOfMessages
}
resource := fmt.Sprintf("queues/%s/%s?numOfMessages=%d", p.name, "messages", numOfMessages)
if waitseconds != nil {
for _, waitsecond := range waitseconds {
if waitsecond <= 0 {
continue
}
resource = fmt.Sprintf("queues/%s/%s?numOfMessages=%d&waitseconds=%d", p.name, "messages", numOfMessages, waitsecond)
p.qpsMonitor.checkQPS()
resp := BatchMessageReceiveResponse{}
_, err := send(p.client, p.decoder, GET, nil, nil, resource, &resp)
if err != nil {
errChan <- err
} else {
respChan <- resp
return
}
}
} else {
p.qpsMonitor.checkQPS()
resp := BatchMessageReceiveResponse{}
_, err := send(p.client, p.decoder, GET, nil, nil, resource, &resp)
if err != nil {
errChan <- err
} else {
respChan <- resp
}
}
return
}
func (p *MNSQueue) PeekMessage(respChan chan MessageReceiveResponse, errChan chan error) {
p.qpsMonitor.checkQPS()
resp := MessageReceiveResponse{}
_, err := send(p.client, p.decoder, GET, nil, nil, fmt.Sprintf("queues/%s/%s?peekonly=true", p.name, "messages"), &resp)
if err != nil {
errChan <- err
} else {
respChan <- resp
}
return
}
func (p *MNSQueue) BatchPeekMessage(respChan chan BatchMessageReceiveResponse, errChan chan error, numOfMessages int32) {
if numOfMessages <= 0 {
numOfMessages = DefaultNumOfMessages
}
p.qpsMonitor.checkQPS()
resp := BatchMessageReceiveResponse{}
_, err := send(p.client, p.decoder, GET, nil, nil, fmt.Sprintf("queues/%s/%s?numOfMessages=%d&peekonly=true", p.name, "messages", numOfMessages), &resp)
if err != nil {
errChan <- err
} else {
respChan <- resp
}
return
}
func (p *MNSQueue) DeleteMessage(receiptHandle string) (err error) {
p.qpsMonitor.checkQPS()
_, err = send(p.client, p.decoder, DELETE, nil, nil, fmt.Sprintf("queues/%s/%s?ReceiptHandle=%s", p.name, "messages", url.QueryEscape(receiptHandle)), nil)
return
}
func (p *MNSQueue) BatchDeleteMessage(receiptHandles ...string) (resp BatchMessageDeleteErrorResponse, err error) {
if receiptHandles == nil || len(receiptHandles) == 0 {
return
}
handlers := ReceiptHandles{}
for _, handler := range receiptHandles {
handlers.ReceiptHandles = append(handlers.ReceiptHandles, handler)
}
p.qpsMonitor.checkQPS()
_, err = send(p.client, NewBatchOpDecoder(&resp), DELETE, nil, handlers, fmt.Sprintf("queues/%s/%s", p.name, "messages"), nil)
return
}
func (p *MNSQueue) ChangeMessageVisibility(receiptHandle string, visibilityTimeout int64) (resp MessageVisibilityChangeResponse, err error) {
p.qpsMonitor.checkQPS()
_, err = send(p.client, p.decoder, PUT, nil, nil, fmt.Sprintf("queues/%s/%s?ReceiptHandle=%s&VisibilityTimeout=%d", p.name, "messages", url.QueryEscape(receiptHandle), visibilityTimeout), &resp)
return
}