logsapi/client.go (88 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package logsapi
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"time"
"go.uber.org/zap"
)
// SubscriptionType represents the log streams that the Lambda Logs API
// provides for subscription
type SubscriptionType string
const (
// Platform logstream records events and errors related to
// invocations and extensions
Platform SubscriptionType = "platform"
// Function logstream records logs written by lambda function
// to stderr or stdout
Function SubscriptionType = "function"
// Extension logstream records logs generated by extension
Extension SubscriptionType = "extension"
)
// ClientOption is a config option for a Client.
type ClientOption func(*Client)
type invocationLifecycler interface {
OnLambdaLogRuntimeDone(requestID, status string, time time.Time) error
OnPlatformStart(reqID string)
OnPlatformReport(reqID string) (fnARN string, deadlineMs int64, ts time.Time, err error)
// PlatformStartReqID is to identify the requestID for the function
// logs under the assumption that function logs for a specific request
// ID will be bounded by PlatformStart and PlatformEnd events.
PlatformStartReqID() string
// Size should return the number of invocations waiting on platform.report
Size() int
}
// Client is the client used to subscribe to the Logs API.
type Client struct {
httpClient *http.Client
logsAPIBaseURL string
logsAPISubscriptionTypes []SubscriptionType
logsChannel chan LogEvent
listenerAddr string
server *http.Server
logger *zap.SugaredLogger
invocationLifecycler invocationLifecycler
}
// NewClient returns a new Client with the given URL.
func NewClient(opts ...ClientOption) (*Client, error) {
c := Client{
server: &http.Server{
// Fixes "Potential Slowloris Attack because ReadHeaderTimeout is not configured in the http.Server"
ReadHeaderTimeout: time.Second * 5,
},
httpClient: &http.Client{},
}
for _, opt := range opts {
opt(&c)
}
mux := http.NewServeMux()
mux.HandleFunc("/", handleLogEventsRequest(c.logger, c.logsChannel))
c.server.Handler = mux
if c.logsAPIBaseURL == "" {
return nil, errors.New("logs api base url cannot be empty")
}
if c.logger == nil {
return nil, errors.New("logger cannot be nil")
}
return &c, nil
}
// StartService starts the HTTP server listening for log events and subscribes to the Logs API.
func (lc *Client) StartService(extensionID string) error {
addr, err := lc.startHTTPServer()
if err != nil {
return err
}
_, port, err := net.SplitHostPort(addr)
if err != nil {
if err2 := lc.Shutdown(); err2 != nil {
lc.logger.Warnf("failed to shutdown the server: %v", err2)
}
return fmt.Errorf("failed to retrieve port from address %s: %w", addr, err)
}
host, _, err := net.SplitHostPort(lc.listenerAddr)
if err != nil {
if err2 := lc.Shutdown(); err2 != nil {
lc.logger.Warnf("failed to shutdown the server: %v", err2)
}
return fmt.Errorf("failed to retrieve host from address %s: %w", lc.listenerAddr, err)
}
uri := "http://" + net.JoinHostPort(host, port)
if err := lc.subscribe(lc.logsAPISubscriptionTypes, extensionID, uri); err != nil {
if err2 := lc.Shutdown(); err2 != nil {
lc.logger.Warnf("failed to shutdown the server: %v", err2)
}
return err
}
return nil
}
// Shutdown shutdowns the log service gracefully.
func (lc *Client) Shutdown() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return lc.server.Shutdown(ctx)
}