client.go (311 lines of code) (raw):
package ali_mns
import (
"bytes"
"crypto/md5"
"encoding/base64"
"encoding/xml"
"fmt"
"net/http"
neturl "net/url"
"os"
"regexp"
"runtime"
"strings"
"sync"
"time"
"github.com/aliyun/credentials-go/credentials"
"github.com/gogap/errors"
"github.com/valyala/fasthttp"
)
const (
DefaultQueueQPSLimit int32 = 2000
DefaultTopicQPSLimit int32 = 2000
DefaultDNSTTL int32 = 10
)
const (
GlobalProxy = "MNS_GLOBAL_PROXY"
)
const (
version = "2015-06-06"
)
const (
DefaultTimeout int64 = 35
DefaultMaxConnsPerHost int = 512
)
const (
AliyunAkEnvKey = "ALIBABA_CLOUD_ACCESS_KEY_ID"
AliyunSkEnvKey = "ALIBABA_CLOUD_ACCESS_KEY_SECRET"
)
type Method string
var (
errMapping map[string]errors.ErrCodeTemplate
)
func init() {
initMNSErrors()
}
const (
GET Method = "GET"
PUT = "PUT"
POST = "POST"
DELETE = "DELETE"
)
type MNSClient interface {
Send(method Method, headers map[string]string, message interface{}, resource string) (*fasthttp.Response, error)
SetProxy(url string)
SetTransport(transport fasthttp.RoundTripper)
getAccountId() (accountId string)
getRegion() (region string)
}
type aliMNSClient struct {
Timeout int64
MaxConnsPerHost int
url *neturl.URL
credential credentials.Credential
accessKeyId string
client *fasthttp.Client
proxyURL string
accountId string
region string
clientLocker sync.Mutex
}
type AliMNSClientConfig struct {
EndPoint string
AccessKeyId string
AccessKeySecret string
Token string
Credential credentials.Credential
TimeoutSecond int64
MaxConnsPerHost int
}
// NewClient Follow the Alibaba Cloud standards and set the AK (Access Key) and SK (Secret Key) in the environment variables.
// For more details, see: https://help.aliyun.com/zh/sdk/developer-reference/configure-the-alibaba-cloud-accesskey-environment-variable-on-linux-macos-and-windows-systems
func NewClient(endpoint string) MNSClient {
return NewClientWithToken(endpoint, "")
}
// NewClientWithToken Follow the Alibaba Cloud standards and set the AK (Access Key) and SK (Secret Key) in the environment variables.
// For more details, see: https://help.aliyun.com/zh/sdk/developer-reference/configure-the-alibaba-cloud-accesskey-environment-variable-on-linux-macos-and-windows-systems
func NewClientWithToken(endpoint, token string) MNSClient {
return NewAliMNSClientWithConfig(AliMNSClientConfig{
EndPoint: endpoint,
AccessKeyId: os.Getenv(AliyunAkEnvKey),
AccessKeySecret: os.Getenv(AliyunSkEnvKey),
Token: token,
TimeoutSecond: DefaultTimeout,
MaxConnsPerHost: DefaultMaxConnsPerHost,
})
}
// Deprecated: Use NewClient instead.
func NewAliMNSClient(inputUrl, accessKeyId, accessKeySecret string) MNSClient {
return NewAliMNSClientWithConfig(AliMNSClientConfig{
EndPoint: inputUrl,
AccessKeyId: accessKeyId,
AccessKeySecret: accessKeySecret,
Token: "",
TimeoutSecond: DefaultTimeout,
MaxConnsPerHost: DefaultMaxConnsPerHost,
})
}
// Deprecated: Use NewClientWithToken instead.
func NewAliMNSClientWithToken(inputUrl, accessKeyId, accessKeySecret, token string) MNSClient {
return NewAliMNSClientWithConfig(AliMNSClientConfig{
EndPoint: inputUrl,
AccessKeyId: accessKeyId,
AccessKeySecret: accessKeySecret,
Token: token,
TimeoutSecond: DefaultTimeout,
MaxConnsPerHost: DefaultMaxConnsPerHost,
})
}
func NewAliMNSClientWithConfig(clientConfig AliMNSClientConfig) MNSClient {
if clientConfig.EndPoint == "" {
panic("ali-mns: message queue url is empty")
}
cli := new(aliMNSClient)
cli.Timeout = clientConfig.TimeoutSecond
if clientConfig.Credential != nil {
cli.credential = clientConfig.Credential
} else if clientConfig.Token != "" {
config := new(credentials.Config).
SetType("sts").
SetAccessKeyId(clientConfig.AccessKeyId).
SetAccessKeySecret(clientConfig.AccessKeySecret).
SetSecurityToken(clientConfig.Token)
var err error
cli.credential, err = credentials.NewCredential(config)
if err != nil {
panic(err)
}
} else {
config := new(credentials.Config).
SetType("access_key").
SetAccessKeyId(clientConfig.AccessKeyId).
SetAccessKeySecret(clientConfig.AccessKeySecret)
var err error
cli.credential, err = credentials.NewCredential(config)
if err != nil {
panic(err)
}
}
if clientConfig.MaxConnsPerHost != 0 {
cli.MaxConnsPerHost = clientConfig.MaxConnsPerHost
} else {
cli.MaxConnsPerHost = DefaultMaxConnsPerHost
}
var err error
if cli.url, err = neturl.Parse(clientConfig.EndPoint); err != nil {
panic("err parse url")
}
// 1. parse region and accountId
pieces := strings.Split(clientConfig.EndPoint, ".")
if len(pieces) != 5 {
panic("ali-mns: message queue url is invalid")
}
accountIdSlice := strings.Split(pieces[0], "/")
cli.accountId = accountIdSlice[len(accountIdSlice)-1]
re := regexp.MustCompile("-(internal|control)")
regionSlice := re.Split(pieces[2], -1)
cli.region = regionSlice[0]
if globalUrl := os.Getenv(GlobalProxy); globalUrl != "" {
cli.proxyURL = globalUrl
}
// 2. now init http client
cli.initFastHttpClient()
//change to dial dual stack to support both ipv4 and ipv6
cli.client.DialDualStack = true
return cli
}
func (p aliMNSClient) getAccountId() (accountId string) {
return p.accountId
}
func (p aliMNSClient) getRegion() (region string) {
return p.region
}
func (p *aliMNSClient) SetProxy(url string) {
if url == p.proxyURL {
return
}
p.proxyURL = url
}
func (p *aliMNSClient) initFastHttpClient() {
p.clientLocker.Lock()
defer p.clientLocker.Unlock()
timeoutInt := DefaultTimeout
if p.Timeout > 0 {
timeoutInt = p.Timeout
}
timeout := time.Second * time.Duration(timeoutInt)
p.client = &fasthttp.Client{ReadTimeout: timeout, WriteTimeout: timeout, MaxConnsPerHost: p.MaxConnsPerHost, Name: getDefaultUserAgent()}
}
func (p *aliMNSClient) SetTransport(transport fasthttp.RoundTripper) {
p.client.ConfigureClient = func(hc *fasthttp.HostClient) error {
hc.Transport = transport
return nil
}
}
func (p *aliMNSClient) proxy() (*neturl.URL, error) {
if p.proxyURL != "" {
return neturl.Parse(p.proxyURL)
}
return nil, nil
}
func (p *aliMNSClient) Send(method Method, headers map[string]string, message interface{}, resource string) (*fasthttp.Response, error) {
var xmlContent []byte
var err error
if message == nil {
xmlContent = []byte{}
} else {
switch m := message.(type) {
case []byte:
{
xmlContent = m
}
default:
if bXml, e := xml.Marshal(message); e != nil {
err = ERR_MARSHAL_MESSAGE_FAILED.New(errors.Params{"err": e})
return nil, err
} else {
xmlContent = bXml
}
}
}
xmlMD5 := md5.Sum(xmlContent)
strMd5 := fmt.Sprintf("%x", xmlMD5)
if headers == nil {
headers = make(map[string]string)
}
headers[MQ_VERSION] = version
headers[CONTENT_TYPE] = "application/xml"
headers[CONTENT_MD5] = base64.StdEncoding.EncodeToString([]byte(strMd5))
headers[DATE] = time.Now().UTC().Format(http.TimeFormat)
credential, err := p.credential.GetCredential()
if err != nil {
return nil, err
}
if credential.SecurityToken != nil && *credential.SecurityToken != "" {
headers[SECURITY_TOKEN] = *credential.SecurityToken
}
signature, err := getSignature(method, headers, fmt.Sprintf("/%s", resource), *credential.AccessKeySecret)
if err != nil {
return nil, ERR_GENERAL_AUTH_HEADER_FAILED.New(errors.Params{"err": err})
}
headers[AUTHORIZATION] = fmt.Sprintf("MNS %s:%s", *credential.AccessKeyId, signature)
var buffer bytes.Buffer
buffer.WriteString(p.url.String())
buffer.WriteString("/")
buffer.WriteString(resource)
url := buffer.String()
req := fasthttp.AcquireRequest()
req.SetRequestURI(url)
req.Header.SetMethod(string(method))
req.SetBody(xmlContent)
for header, value := range headers {
req.Header.Set(header, value)
}
resp := fasthttp.AcquireResponse()
if err = p.client.Do(req, resp); err != nil {
err = ERR_SEND_REQUEST_FAILED.New(errors.Params{"err": err})
return nil, err
}
return resp, nil
}
func initMNSErrors() {
errMapping = map[string]errors.ErrCodeTemplate{
"AccessDenied": ERR_MNS_ACCESS_DENIED,
"InvalidAccessKeyId": ERR_MNS_INVALID_ACCESS_KEY_ID,
"InternalError": ERR_MNS_INTERNAL_ERROR,
"InvalidAuthorizationHeader": ERR_MNS_INVALID_AUTHORIZATION_HEADER,
"InvalidDateHeader": ERR_MNS_INVALID_DATE_HEADER,
"InvalidArgument": ERR_MNS_INVALID_ARGUMENT,
"InvalidDegist": ERR_MNS_INVALID_DEGIST,
"InvalidRequestURL": ERR_MNS_INVALID_REQUEST_URL,
"InvalidQueryString": ERR_MNS_INVALID_QUERY_STRING,
"MalformedXML": ERR_MNS_MALFORMED_XML,
"MissingAuthorizationHeader": ERR_MNS_MISSING_AUTHORIZATION_HEADER,
"MissingDateHeader": ERR_MNS_MISSING_DATE_HEADER,
"MissingVersionHeader": ERR_MNS_MISSING_VERSION_HEADER,
"MissingReceiptHandle": ERR_MNS_MISSING_RECEIPT_HANDLE,
"MissingVisibilityTimeout": ERR_MNS_MISSING_VISIBILITY_TIMEOUT,
"MessageNotExist": ERR_MNS_MESSAGE_NOT_EXIST,
"QueueAlreadyExist": ERR_MNS_QUEUE_ALREADY_EXIST,
"QueueDeletedRecently": ERR_MNS_QUEUE_DELETED_RECENTLY,
"InvalidQueueName": ERR_MNS_INVALID_QUEUE_NAME,
"QueueNameLengthError": ERR_MNS_QUEUE_NAME_LENGTH_ERROR,
"QueueNotExist": ERR_MNS_QUEUE_NOT_EXIST,
"ReceiptHandleError": ERR_MNS_RECEIPT_HANDLE_ERROR,
"SignatureDoesNotMatch": ERR_MNS_SIGNATURE_DOES_NOT_MATCH,
"TimeExpired": ERR_MNS_TIME_EXPIRED,
"QpsLimitExceeded": ERR_MNS_QPS_LIMIT_EXCEEDED,
"TopicAlreadyExist": ERR_MNS_TOPIC_ALREADY_EXIST,
"TopicNameLengthError": ERR_MNS_TOPIC_NAME_LENGTH_ERROR,
"TopicNotExist": ERR_MNS_TOPIC_NOT_EXIST,
"SubscriptionNameLengthError": ERR_MNS_SUBSRIPTION_NAME_LENGTH_ERROR,
"TopicNameInvalid": ERR_MNS_INVALID_TOPIC_NAME,
"SubsriptionNameInvalid": ERR_MNS_INVALID_SUBSCRIPTION_NAME,
"SubscriptionAlreadyExist": ERR_MNS_SUBSCRIPTION_ALREADY_EXIST,
"EndpointInvalid": ERR_MNS_INVALID_ENDPOINT,
"SubscriberNotExist": ERR_MNS_SUBSCRIBER_NOT_EXIST,
}
}
func getDefaultUserAgent() string {
goVersion := strings.TrimPrefix(runtime.Version(), "go")
return fmt.Sprintf("%s/%s(%s/%s/%s;%s)", SdkName, Version, runtime.GOOS, "-", runtime.GOARCH, goVersion)
}
func ParseError(resp ErrorResponse, resource string) (err error) {
if errCodeTemplate, exist := errMapping[resp.Code]; exist {
err = errCodeTemplate.New(errors.Params{"resp": resp, "resource": resource})
} else {
err = ERR_MNS_UNKNOWN_CODE.New(errors.Params{"resp": resp, "resource": resource})
}
return
}