topic.go (144 lines of code) (raw):

package ali_mns import ( "fmt" "net/http" "strconv" "strings" "github.com/gogap/errors" ) type AliMNSTopic interface { Name() string GenerateQueueEndpoint(queueName string) string GenerateMailEndpoint(mailAddress string) string PublishMessage(message MessagePublishRequest) (resp MessageSendResponse, err error) Subscribe(subscriptionName string, message MessageSubsribeRequest) (err error) SetSubscriptionAttributes(subscriptionName string, notifyStrategy NotifyStrategyType) (err error) GetSubscriptionAttributes(subscriptionName string) (attr SubscriptionAttribute, err error) Unsubscribe(subscriptionName string) (err error) ListSubscriptionByTopic(nextMarker string, retNumber int32, prefix string) (subscriptions Subscriptions, err error) ListSubscriptionDetailByTopic(nextMarker string, retNumber int32, prefix string) (subscriptionDetails SubscriptionDetails, err error) } type MNSTopic struct { name string client MNSClient decoder MNSDecoder qpsMonitor *QPSMonitor } func NewMNSTopic(name string, client MNSClient, qps ...int32) AliMNSTopic { if name == "" { panic("ali_mns: topic name could not be empty") } topic := new(MNSTopic) topic.client = client topic.name = name topic.decoder = NewAliMNSDecoder() qpsLimit := DefaultTopicQPSLimit if qps != nil && len(qps) == 1 && qps[0] > 0 { qpsLimit = qps[0] } topic.qpsMonitor = NewQPSMonitor(5, qpsLimit) return topic } func (p *MNSTopic) Name() string { return p.name } func (p *MNSTopic) GenerateQueueEndpoint(queueName string) string { return "acs:mns:" + p.client.getRegion() + ":" + p.client.getAccountId() + ":queues/" + queueName } func (p *MNSTopic) GenerateMailEndpoint(mailAddress string) string { return "mail:directmail:" + mailAddress } func (p *MNSTopic) PublishMessage(message MessagePublishRequest) (resp MessageSendResponse, err error) { p.qpsMonitor.checkQPS() _, err = send(p.client, p.decoder, POST, nil, message, fmt.Sprintf("topics/%s/%s", p.name, "messages"), &resp) return } func (p *MNSTopic) Subscribe(subscriptionName string, message MessageSubsribeRequest) (err error) { subscriptionName = strings.TrimSpace(subscriptionName) if err = checkTopicName(subscriptionName); err != nil { return } p.qpsMonitor.checkQPS() var code int code, err = send(p.client, p.decoder, PUT, nil, message, fmt.Sprintf("topics/%s/subscriptions/%s", p.name, subscriptionName), nil) if code == http.StatusNoContent { err = ERR_MNS_SUBSCRIPTION_ALREADY_EXIST_AND_HAVE_SAME_ATTR.New(errors.Params{"name": subscriptionName}) return } return } func (p *MNSTopic) SetSubscriptionAttributes(subscriptionName string, notifyStrategy NotifyStrategyType) (err error) { subscriptionName = strings.TrimSpace(subscriptionName) if err = checkTopicName(subscriptionName); err != nil { return } message := SetSubscriptionAttributesRequest{ NotifyStrategy: notifyStrategy, } p.qpsMonitor.checkQPS() _, err = send(p.client, p.decoder, PUT, nil, message, fmt.Sprintf("topics/%s/subscriptions/%s?metaoverride=true", p.name, subscriptionName), nil) return } func (p *MNSTopic) GetSubscriptionAttributes(subscriptionName string) (attr SubscriptionAttribute, err error) { subscriptionName = strings.TrimSpace(subscriptionName) if err = checkTopicName(subscriptionName); err != nil { return } _, err = send(p.client, p.decoder, GET, nil, nil, fmt.Sprintf("topics/%s/subscriptions/%s", p.name, subscriptionName), &attr) return } func (p *MNSTopic) Unsubscribe(subscriptionName string) (err error) { subscriptionName = strings.TrimSpace(subscriptionName) if err = checkTopicName(subscriptionName); err != nil { return } _, err = send(p.client, p.decoder, DELETE, nil, nil, fmt.Sprintf("topics/%s/subscriptions/%s", p.name, subscriptionName), nil) return } func (p *MNSTopic) ListSubscriptionByTopic(nextMarker string, retNumber int32, prefix string) (subscriptions Subscriptions, err error) { header := map[string]string{} marker := strings.TrimSpace(nextMarker) if len(marker) > 0 { if marker != "" { header["x-mns-marker"] = marker } } if retNumber > 0 { if retNumber >= 1 && retNumber <= 1000 { header["x-mns-ret-number"] = strconv.Itoa(int(retNumber)) } else { err = ERR_MNS_RET_NUMBER_RANGE_ERROR.New() return } } prefix = strings.TrimSpace(prefix) if prefix != "" { header["x-mns-prefix"] = prefix } _, err = send(p.client, p.decoder, GET, header, nil, fmt.Sprintf("topics/%s/subscriptions", p.name), &subscriptions) return } func (p *MNSTopic) ListSubscriptionDetailByTopic(nextMarker string, retNumber int32, prefix string) (subscriptionDetails SubscriptionDetails, err error) { header := map[string]string{} marker := strings.TrimSpace(nextMarker) if len(marker) > 0 { if marker != "" { header["x-mns-marker"] = marker } } if retNumber > 0 { if retNumber >= 1 && retNumber <= 1000 { header["x-mns-ret-number"] = strconv.Itoa(int(retNumber)) } else { err = ERR_MNS_RET_NUMBER_RANGE_ERROR.New() return } } prefix = strings.TrimSpace(prefix) if prefix != "" { header["x-mns-prefix"] = prefix } header["x-mns-with-meta"] = "true" _, err = send(p.client, p.decoder, GET, header, nil, fmt.Sprintf("topics/%s/subscriptions", p.name), &subscriptionDetails) return }