extension/client.go (156 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package extension import ( "bytes" "context" "encoding/json" "fmt" "net/http" "time" "github.com/elastic/apm-aws-lambda/version" "go.uber.org/zap" ) // RegisterResponse is the body of the response for /register type RegisterResponse struct { FunctionName string `json:"functionName"` FunctionVersion string `json:"functionVersion"` Handler string `json:"handler"` } // NextEventResponse is the response for /event/next type NextEventResponse struct { Timestamp time.Time `json:"timestamp,omitempty"` EventType EventType `json:"eventType"` ShutdownReason string `json:"shutdownReason,omitempty"` DeadlineMs int64 `json:"deadlineMs"` RequestID string `json:"requestId"` InvokedFunctionArn string `json:"invokedFunctionArn"` Tracing Tracing `json:"tracing"` } // Tracing is part of the response for /event/next type Tracing struct { Type string `json:"type"` Value string `json:"value"` } // StatusResponse is the body of the response for /init/error and /exit/error type StatusResponse struct { Status string `json:"status"` } // EventType represents the type of events received from /event/next type EventType string const ( // Invoke is a lambda invoke Invoke EventType = "INVOKE" // Shutdown is a shutdown event for the environment Shutdown EventType = "SHUTDOWN" extensionNameHeader = "Lambda-Extension-Name" extensionIdentiferHeader = "Lambda-Extension-Identifier" extensionErrorType = "Lambda-Extension-Function-Error-Type" ) // Client is a simple Client for the Lambda Extensions API type Client struct { baseURL string httpClient *http.Client ExtensionID string logger *zap.SugaredLogger } // NewClient returns a Lambda Extensions API Client func NewClient(awsLambdaRuntimeAPI string, logger *zap.SugaredLogger) *Client { baseURL := fmt.Sprintf("http://%s/2020-01-01/extension", awsLambdaRuntimeAPI) return &Client{ baseURL: baseURL, httpClient: &http.Client{}, logger: logger, } } // Register will register the extension with the Extensions API func (e *Client) Register(ctx context.Context, filename string) (*RegisterResponse, error) { const action = "/register" url := e.baseURL + action reqBody, err := json.Marshal(map[string]interface{}{ "events": []EventType{Invoke, Shutdown}, }) if err != nil { return nil, fmt.Errorf("failed to marshal register request body: %w", err) } httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewBuffer(reqBody)) if err != nil { return nil, fmt.Errorf("failed to create register request: %w", err) } httpReq.Header.Set(extensionNameHeader, filename) httpReq.Header.Set("User-Agent", version.UserAgent) httpRes, err := e.httpClient.Do(httpReq) if err != nil { return nil, fmt.Errorf("extension register request failed: %w", err) } defer httpRes.Body.Close() if httpRes.StatusCode != http.StatusOK { return nil, fmt.Errorf("extension register request failed with status %s", httpRes.Status) } res := RegisterResponse{} if err := json.NewDecoder(httpRes.Body).Decode(&res); err != nil { return nil, fmt.Errorf("failed to decode register response body: %w", err) } e.ExtensionID = httpRes.Header.Get(extensionIdentiferHeader) e.logger.Debugf("ExtensionID : %s", e.ExtensionID) return &res, nil } // NextEvent blocks while long polling for the next lambda invoke or shutdown func (e *Client) NextEvent(ctx context.Context) (*NextEventResponse, error) { const action = "/event/next" url := e.baseURL + action httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, url, http.NoBody) if err != nil { return nil, fmt.Errorf("failed to create next event request: %w", err) } httpReq.Header.Set(extensionIdentiferHeader, e.ExtensionID) httpReq.Header.Set("User-Agent", version.UserAgent) httpRes, err := e.httpClient.Do(httpReq) if err != nil { return nil, fmt.Errorf("next event request failed: %w", err) } defer httpRes.Body.Close() if httpRes.StatusCode != http.StatusOK { return nil, fmt.Errorf("next event request failed with status %s", httpRes.Status) } res := NextEventResponse{} if err := json.NewDecoder(httpRes.Body).Decode(&res); err != nil { return nil, fmt.Errorf("failed to decode next event response body: %w", err) } return &res, nil } // InitError reports an initialization error to the platform. Call it when you registered but failed to initialize // //nolint:dupl func (e *Client) InitError(ctx context.Context, errorType string) (*StatusResponse, error) { const action = "/init/error" url := e.baseURL + action httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, http.NoBody) if err != nil { return nil, fmt.Errorf("failed to create init error request: %w", err) } httpReq.Header.Set(extensionIdentiferHeader, e.ExtensionID) httpReq.Header.Set(extensionErrorType, errorType) httpReq.Header.Set("User-Agent", version.UserAgent) httpRes, err := e.httpClient.Do(httpReq) if err != nil { return nil, fmt.Errorf("initialization error request failed: %w", err) } defer httpRes.Body.Close() if httpRes.StatusCode > 299 { return nil, fmt.Errorf("initialization error request failed with status %s", httpRes.Status) } res := StatusResponse{} if err := json.NewDecoder(httpRes.Body).Decode(&res); err != nil { return nil, fmt.Errorf("failed to decode init error response body: %w", err) } return &res, nil } // ExitError reports an error to the platform before exiting. Call it when you encounter an unexpected failure // //nolint:dupl func (e *Client) ExitError(ctx context.Context, errorType string) (*StatusResponse, error) { const action = "/exit/error" url := e.baseURL + action httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, http.NoBody) if err != nil { return nil, fmt.Errorf("failed to create exit error request: %w", err) } httpReq.Header.Set(extensionIdentiferHeader, e.ExtensionID) httpReq.Header.Set(extensionErrorType, errorType) httpReq.Header.Set("User-Agent", version.UserAgent) httpRes, err := e.httpClient.Do(httpReq) if err != nil { return nil, fmt.Errorf("exit error request failed: %w", err) } defer httpRes.Body.Close() if httpRes.StatusCode > 299 { return nil, fmt.Errorf("exit error request failed with status %s", httpRes.Status) } res := StatusResponse{} if err := json.NewDecoder(httpRes.Body).Decode(&res); err != nil { return nil, fmt.Errorf("failed to decode exit error response body: %w", err) } return &res, nil }