logsapi/subscribe.go (98 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package logsapi
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"github.com/elastic/apm-aws-lambda/version"
)
// SubscribeRequest is the request body that is sent to Logs API on subscribe
type SubscribeRequest struct {
SchemaVersion SchemaVersion `json:"schemaVersion"`
LogTypes []SubscriptionType `json:"types"`
BufferingCfg BufferingCfg `json:"buffering"`
Destination Destination `json:"destination"`
}
// SchemaVersion is the Lambda runtime API schema version
type SchemaVersion string
const (
SchemaVersion20210318 = "2021-03-18"
SchemaVersionLatest = SchemaVersion20210318
)
// BufferingCfg is the configuration set for receiving logs from Logs API. Whichever of the conditions below is met first, the logs will be sent
type BufferingCfg struct {
// MaxItems is the maximum number of events to be buffered in memory. (default: 10000, minimum: 1000, maximum: 10000)
MaxItems uint32 `json:"maxItems"`
// MaxBytes is the maximum size in bytes of the logs to be buffered in memory. (default: 262144, minimum: 262144, maximum: 1048576)
MaxBytes uint32 `json:"maxBytes"`
// TimeoutMS is the maximum time (in milliseconds) for a batch to be buffered. (default: 1000, minimum: 100, maximum: 30000)
TimeoutMS uint32 `json:"timeoutMs"`
}
// Destination is the configuration for listeners who would like to receive logs with HTTP
type Destination struct {
Protocol string `json:"protocol"`
URI string `json:"URI"`
HTTPMethod string `json:"method"`
Encoding string `json:"encoding"`
}
func (lc *Client) startHTTPServer() (string, error) {
listener, err := net.Listen("tcp", lc.listenerAddr)
if err != nil {
return "", fmt.Errorf("failed to listen on %s: %w", lc.listenerAddr, err)
}
addr := listener.Addr().String()
go func() {
lc.logger.Infof("Extension listening for Lambda Logs API events on %s", addr)
if err := lc.server.Serve(listener); err != nil && !errors.Is(err, http.ErrServerClosed) {
lc.logger.Errorf("Error upon Logs API server start : %v", err)
}
}()
return addr, nil
}
func (lc *Client) subscribe(types []SubscriptionType, extensionID, uri string) error {
data, err := json.Marshal(&SubscribeRequest{
SchemaVersion: SchemaVersionLatest,
LogTypes: types,
BufferingCfg: BufferingCfg{
MaxItems: 10000,
MaxBytes: 1024 * 1024,
TimeoutMS: 100,
},
Destination: Destination{
Protocol: "HTTP",
URI: uri,
HTTPMethod: http.MethodPost,
Encoding: "JSON",
},
})
if err != nil {
return fmt.Errorf("failed to marshal SubscribeRequest: %w", err)
}
url := lc.logsAPIBaseURL + "/2020-08-15/logs"
resp, err := lc.sendRequest(url, data, extensionID)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusAccepted {
return errors.New("logs API is not supported in this environment")
}
if resp.StatusCode != http.StatusOK {
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("%s failed: %d[%s]", url, resp.StatusCode, resp.Status)
}
return fmt.Errorf("%s failed: %d[%s] %s", url, resp.StatusCode, resp.Status, string(body))
}
return nil
}
func (lc *Client) sendRequest(url string, data []byte, extensionID string) (*http.Response, error) {
req, err := http.NewRequest(http.MethodPut, url, bytes.NewBuffer(data))
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Lambda-Extension-Identifier", extensionID)
req.Header.Set("User-Agent", version.UserAgent)
resp, err := lc.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("request failed: %w", err)
}
return resp, nil
}