fc/runtime_api_client.go (128 lines of code) (raw):
// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved
// Copyright 2021 Alibaba Group Holding Limited. All Rights Reserved.
package fc
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"runtime"
"strings"
)
const (
headerFCRequestID = "x-fc-request-id"
headerDeadlineMS = "x-fc-function-deadline"
// Cred
headerAccessKeyId = "x-fc-access-key-id"
headerAccessKeySecret = "x-fc-access-key-secret"
headerSecurityToken = "x-fc-security-token"
// Function info
headerFunctionType = "x-fc-function-type"
headerFunctionName = "x-fc-function-name"
headerFunctionHandler = "x-fc-function-handler"
headerFunctionMemory = "x-fc-function-memory"
headerFunctionTimeout = "x-fc-function-timeout"
// Service info
headerServiceName = "x-fc-service-name"
headerServiceLogproject = "x-fc-service-logproject"
headerServiceLogstore = "x-fc-service-logstore"
// tracing info
headerOpenTracingSpanContext = "x-fc-tracing-opentracing-span-context"
headerOpenTracingSpanBaggages = "x-fc-tracing-opentracing-span-baggages"
headerJaegerEndpoint = "x-fc-tracing-jaeger-endpoint"
headerRegion = "x-fc-region"
headerAccountId = "x-fc-account-id"
headerHttpParams = "x-fc-http-params"
headerQualifier = "x-fc-qualifier"
headerVersionId = "x-fc-version-id"
headerRetryCount = "x-fc-retry-count"
contentTypeJSON = "application/json"
apiVersion = "2020-11-11"
)
type runtimeAPIClient struct {
baseURL string
userAgent string
httpClient *http.Client
}
func newRuntimeAPIClient(address string) *runtimeAPIClient {
client := &http.Client{
Timeout: 0, // connections to the runtime API are never expected to time out
}
endpoint := "http://" + address + "/" + apiVersion + "/runtime/invocation/"
userAgent := "aliyun-fc-go/" + runtime.Version()
return &runtimeAPIClient{endpoint, userAgent, client}
}
type invoke struct {
id string
payload []byte
headers http.Header
client *runtimeAPIClient
}
type invokeTypeInfo struct {
funcType functionType
initializer string
initializerTimeout int
}
// success sends the response payload for an in-progress invocation.
// Notes:
// - An invoke is not complete until next() is called again!
func (i *invoke) success(payload []byte, contentType string, httpParams string) error {
url := i.client.baseURL + i.id + "/response"
return i.client.post(url, payload, contentType, httpParams)
}
// failure sends the payload to the Runtime API. This marks the function's invoke as a failure.
// Notes:
// - The execution of the function process continues, and is billed, until next() is called again!
// - A Lambda Function continues to be re-used for future invokes even after a failure.
// If the error is fatal (panic, unrecoverable state), exit the process immediately after calling failure()
func (i *invoke) failure(payload []byte, contentType string) error {
url := i.client.baseURL + i.id + "/error"
return i.client.post(url, payload, contentType, "")
}
// next connects to the Runtime API and waits for a new invoke Request to be available.
// Note: After a call to Done() or Error() has been made, a call to next() will complete the in-flight invoke.
func (c *runtimeAPIClient) next() (*invoke, error) {
url := c.baseURL + "next"
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, fmt.Errorf("failed to construct GET request to %s: %v", url, err)
}
req.Header.Set("User-Agent", c.userAgent)
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to get the next invoke: %v", err)
}
defer func() {
if err := resp.Body.Close(); err != nil {
log.Printf("runtime API client failed to close %s response body: %v", url, err)
}
}()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("failed to GET %s: got unexpected status code: %d", url, resp.StatusCode)
}
payload := new(bytes.Buffer)
_, err = payload.ReadFrom(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read the invoke payload: %v", err)
}
return &invoke{
id: resp.Header.Get(headerFCRequestID),
payload: payload.Bytes(),
headers: resp.Header,
client: c,
}, nil
}
func (c *runtimeAPIClient) post(url string, payload []byte, contentType, httpParams string) error {
req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(payload))
if err != nil {
return fmt.Errorf("failed to construct POST request to %s: %v", url, err)
}
req.Header.Set("User-Agent", c.userAgent)
req.Header.Set("Content-Type", contentType)
if strings.TrimSpace(httpParams) != "" {
req.Header.Set(headerHttpParams, httpParams)
}
resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to POST to %s: %v", url, err)
}
defer func() {
if err := resp.Body.Close(); err != nil {
log.Printf("runtime API client failed to close %s response body: %v", url, err)
}
}()
if resp.StatusCode != 200 {
return fmt.Errorf("failed to POST to %s: got unexpected status code: %d", url, resp.StatusCode)
}
_, err = io.Copy(ioutil.Discard, resp.Body)
if err != nil {
return fmt.Errorf("something went wrong reading the POST response from %s: %v", url, err)
}
return nil
}