request.go (217 lines of code) (raw):
package sls
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httputil"
"net/url"
"time"
"github.com/cenkalti/backoff"
"github.com/go-kit/kit/log/level"
"golang.org/x/net/context"
)
// timeout configs
var (
defaultRequestTimeout = 60 * time.Second
defaultRetryTimeout = 90 * time.Second
defaultHttpClient = newDefaultHTTPClient(defaultRequestTimeout)
defaultHTTPIdleTimeout = time.Second * 55
)
func newDefaultTransport() *http.Transport {
t := http.DefaultTransport.(*http.Transport).Clone()
t.IdleConnTimeout = defaultHTTPIdleTimeout
return t
}
// returns a new http client instance with default config
func newDefaultHTTPClient(requestTimeout time.Duration) *http.Client {
return &http.Client{
Transport: newDefaultTransport(),
Timeout: requestTimeout,
}
}
func retryReadErrorCheck(ctx context.Context, err error) (bool, error) {
if err == nil {
return false, nil
}
switch e := err.(type) {
case *url.Error:
return true, e
case *Error:
if RetryOnServerErrorEnabled {
if e.HTTPCode >= 500 && e.HTTPCode <= 599 {
return true, e
}
}
case *BadResponseError:
if RetryOnServerErrorEnabled {
if e.HTTPCode >= 500 && e.HTTPCode <= 599 {
return true, e
}
}
default:
return false, e
}
return false, err
}
func retryWriteErrorCheck(ctx context.Context, err error) (bool, error) {
if err == nil {
return false, nil
}
switch e := err.(type) {
case *Error:
if RetryOnServerErrorEnabled {
if e.HTTPCode == 500 || e.HTTPCode == 502 || e.HTTPCode == 503 {
return true, e
}
}
case *BadResponseError:
if RetryOnServerErrorEnabled {
if e.HTTPCode == 500 || e.HTTPCode == 502 || e.HTTPCode == 503 {
return true, e
}
}
default:
return false, e
}
return false, err
}
// request sends a request to SLS.
// mock param only for test, default is []
func request(project *LogProject, method, uri string, headers map[string]string,
body []byte, mock ...interface{}) (*http.Response, error) {
var r *http.Response
var slsErr error
var err error
var mockErr *mockErrorRetry
project.init()
ctx, cancel := context.WithTimeout(context.Background(), project.retryTimeout)
defer cancel()
//fmt.Println("request ", project, method, uri, headers, body)
// all GET method is read function
if method == http.MethodGet {
err = RetryWithCondition(ctx, backoff.NewExponentialBackOff(), func() (bool, error) {
if len(mock) == 0 {
//fmt.Println("real request", project, method, uri, headers, body)
r, slsErr = realRequest(ctx, project, method, uri, headers, body)
//fmt.Println("real request done")
} else {
r, mockErr = nil, mock[0].(*mockErrorRetry)
mockErr.RetryCnt--
if mockErr.RetryCnt <= 0 {
r = &http.Response{}
slsErr = nil
return false, nil
}
slsErr = &mockErr.Err
}
return retryReadErrorCheck(ctx, slsErr)
})
} else {
err = RetryWithCondition(ctx, backoff.NewExponentialBackOff(), func() (bool, error) {
if len(mock) == 0 {
r, slsErr = realRequest(ctx, project, method, uri, headers, body)
} else {
r, mockErr = nil, mock[0].(*mockErrorRetry)
mockErr.RetryCnt--
if mockErr.RetryCnt <= 0 {
r = &http.Response{}
slsErr = nil
return false, nil
}
slsErr = &mockErr.Err
}
return retryWriteErrorCheck(ctx, slsErr)
})
}
if err != nil {
return r, err
}
return r, slsErr
}
// request sends a request to alibaba cloud Log Service.
// @note if error is nil, you must call http.Response.Body.Close() to finalize reader
func realRequest(ctx context.Context, project *LogProject, method, uri string, headers map[string]string,
body []byte) (*http.Response, error) {
// The caller should provide 'x-log-bodyrawsize' header
if _, ok := headers[HTTPHeaderBodyRawSize]; !ok {
return nil, NewClientError(fmt.Errorf("Can't find 'x-log-bodyrawsize' header"))
}
// SLS public request headers
baseURL := project.getBaseURL()
headers[HTTPHeaderHost] = baseURL
headers[HTTPHeaderAPIVersion] = version
if len(project.UserAgent) > 0 {
headers[HTTPHeaderUserAgent] = project.UserAgent
} else {
headers[HTTPHeaderUserAgent] = DefaultLogUserAgent
}
stsToken := project.SecurityToken
accessKeyID := project.AccessKeyID
accessKeySecret := project.AccessKeySecret
if project.credentialProvider != nil {
c, err := project.credentialProvider.GetCredentials()
if err != nil {
return nil, NewClientError(fmt.Errorf("fail to get credentials: %w", err))
}
stsToken = c.SecurityToken
accessKeyID = c.AccessKeyID
accessKeySecret = c.AccessKeySecret
}
// Access with token
if stsToken != "" {
headers[HTTPHeaderAcsSecurityToken] = stsToken
}
if body != nil {
if _, ok := headers[HTTPHeaderContentType]; !ok {
return nil, NewClientError(fmt.Errorf("Can't find 'Content-Type' header"))
}
}
for k, v := range project.innerHeaders {
headers[k] = v
}
var signer Signer
if project.AuthVersion == AuthV4 {
headers[HTTPHeaderLogDate] = dateTimeISO8601()
signer = NewSignerV4(accessKeyID, accessKeySecret, project.Region)
} else if project.AuthVersion == AuthV0 {
signer = NewSignerV0()
} else {
headers[HTTPHeaderDate] = nowRFC1123()
signer = NewSignerV1(accessKeyID, accessKeySecret)
}
if err := signer.Sign(method, uri, headers, body); err != nil {
return nil, err
}
addHeadersAfterSign(project.commonHeaders, headers)
// Initialize http request
reader := bytes.NewReader(body)
// Handle the endpoint
urlStr := fmt.Sprintf("%s%s", baseURL, uri)
req, err := http.NewRequest(method, urlStr, reader)
if err != nil {
return nil, NewClientError(err)
}
for k, v := range headers {
req.Header.Add(k, v)
}
if IsDebugLevelMatched(5) {
dump, e := httputil.DumpRequest(req, true)
if e != nil {
level.Info(Logger).Log("msg", e)
}
level.Info(Logger).Log("msg", "HTTP Request:\n%v", string(dump))
}
// Get ready to do request
resp, err := project.httpClient.Do(req)
if err != nil {
return nil, err
}
// Parse the sls error from body.
if resp.StatusCode != http.StatusOK {
err := &Error{}
err.HTTPCode = (int32)(resp.StatusCode)
defer resp.Body.Close()
buf, ioErr := ioutil.ReadAll(resp.Body)
if ioErr != nil {
return nil, NewBadResponseError(ioErr.Error(), resp.Header, resp.StatusCode)
}
if jErr := json.Unmarshal(buf, err); jErr != nil {
return nil, NewBadResponseError(string(buf), resp.Header, resp.StatusCode)
}
err.RequestID = resp.Header.Get(RequestIDHeader)
return nil, err
}
if IsDebugLevelMatched(5) {
dump, e := httputil.DumpResponse(resp, true)
if e != nil {
level.Info(Logger).Log("msg", e)
}
level.Info(Logger).Log("msg", "HTTP Response:\n%v", string(dump))
}
return resp, nil
}