logsapi/client.go (88 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package logsapi import ( "context" "errors" "fmt" "net" "net/http" "time" "go.uber.org/zap" ) // SubscriptionType represents the log streams that the Lambda Logs API // provides for subscription type SubscriptionType string const ( // Platform logstream records events and errors related to // invocations and extensions Platform SubscriptionType = "platform" // Function logstream records logs written by lambda function // to stderr or stdout Function SubscriptionType = "function" // Extension logstream records logs generated by extension Extension SubscriptionType = "extension" ) // ClientOption is a config option for a Client. type ClientOption func(*Client) type invocationLifecycler interface { OnLambdaLogRuntimeDone(requestID, status string, time time.Time) error OnPlatformStart(reqID string) OnPlatformReport(reqID string) (fnARN string, deadlineMs int64, ts time.Time, err error) // PlatformStartReqID is to identify the requestID for the function // logs under the assumption that function logs for a specific request // ID will be bounded by PlatformStart and PlatformEnd events. PlatformStartReqID() string // Size should return the number of invocations waiting on platform.report Size() int } // Client is the client used to subscribe to the Logs API. type Client struct { httpClient *http.Client logsAPIBaseURL string logsAPISubscriptionTypes []SubscriptionType logsChannel chan LogEvent listenerAddr string server *http.Server logger *zap.SugaredLogger invocationLifecycler invocationLifecycler } // NewClient returns a new Client with the given URL. func NewClient(opts ...ClientOption) (*Client, error) { c := Client{ server: &http.Server{ // Fixes "Potential Slowloris Attack because ReadHeaderTimeout is not configured in the http.Server" ReadHeaderTimeout: time.Second * 5, }, httpClient: &http.Client{}, } for _, opt := range opts { opt(&c) } mux := http.NewServeMux() mux.HandleFunc("/", handleLogEventsRequest(c.logger, c.logsChannel)) c.server.Handler = mux if c.logsAPIBaseURL == "" { return nil, errors.New("logs api base url cannot be empty") } if c.logger == nil { return nil, errors.New("logger cannot be nil") } return &c, nil } // StartService starts the HTTP server listening for log events and subscribes to the Logs API. func (lc *Client) StartService(extensionID string) error { addr, err := lc.startHTTPServer() if err != nil { return err } _, port, err := net.SplitHostPort(addr) if err != nil { if err2 := lc.Shutdown(); err2 != nil { lc.logger.Warnf("failed to shutdown the server: %v", err2) } return fmt.Errorf("failed to retrieve port from address %s: %w", addr, err) } host, _, err := net.SplitHostPort(lc.listenerAddr) if err != nil { if err2 := lc.Shutdown(); err2 != nil { lc.logger.Warnf("failed to shutdown the server: %v", err2) } return fmt.Errorf("failed to retrieve host from address %s: %w", lc.listenerAddr, err) } uri := "http://" + net.JoinHostPort(host, port) if err := lc.subscribe(lc.logsAPISubscriptionTypes, extensionID, uri); err != nil { if err2 := lc.Shutdown(); err2 != nil { lc.logger.Warnf("failed to shutdown the server: %v", err2) } return err } return nil } // Shutdown shutdowns the log service gracefully. func (lc *Client) Shutdown() error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() return lc.server.Shutdown(ctx) }