lib/functions_framework/server.rb (349 lines of code) (raw):

# Copyright 2020 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. require "json" require "monitor" require "puma" require "puma/server" require "rack" module FunctionsFramework ## # A web server that wraps a function. # class Server include ::MonitorMixin ## # Create a new web server given a function definition, a set of application # globals, and server configuration. # # To configure the server, pass a block that takes a # {FunctionsFramework::Server::Config} object as the parameter. This block # is the only opportunity to modify the configuration; once the server is # initialized, configuration is frozen. # # @param function [FunctionsFramework::Function] The function to execute. # @param globals [Hash] Globals to pass to invocations. This hash should # normally be frozen so separate function invocations cannot interfere # with one another's globals. # @yield [FunctionsFramework::Server::Config] A config object that can be # manipulated to configure this server. # def initialize function, globals super() @config = Config.new yield @config if block_given? @config.freeze @function = function @app = case function.type when :http HttpApp.new function, globals, @config when :cloud_event EventApp.new function, globals, @config when :typed TypedApp.new function, globals, @config else raise "Unrecognized function type: #{function.type}" end @server = nil @signals_installed = false end ## # The function to execute. # @return [FunctionsFramework::Function] # attr_reader :function ## # The final configuration. This is a frozen object that cannot be modified. # @return [FunctionsFramework::Server::Config] # attr_reader :config ## # Start the web server in the background. Does nothing if the web server # is already running. # # @return [self] # def start synchronize do unless running? # Puma >= 6.0 interprets these settings from options options = { min_threads: @config.min_threads, max_threads: @config.max_threads, environment: @config.show_error_details? ? "development" : "production" } # Puma::Events.stdio for Puma < 6.0; otherwise nil for Puma >= 6.0 events = ::Puma::Events.stdio if ::Puma::Events.respond_to? :stdio @server = ::Puma::Server.new @app, events, options if @server.respond_to? :min_threads= # Puma < 6.0 sets server attributes for these settings @server.min_threads = @config.min_threads @server.max_threads = @config.max_threads @server.leak_stack_on_error = @config.show_error_details? end @server.binder.add_tcp_listener @config.bind_addr, @config.port @config.logger.info "FunctionsFramework: Serving function #{@function.name.inspect} " \ "on port #{@config.port}..." @server.run true end end self end ## # Stop the web server in the background. Does nothing if the web server # is not running. # # @param force [Boolean] Use a forced halt instead of a graceful shutdown # @param wait [Boolean] Block until shutdown is complete # @return [self] # def stop force: false, wait: false synchronize do if running? @config.logger.info "FunctionsFramework: Shutting down server..." if force @server.halt wait else @server.stop wait end end end self end ## # Wait for the server to stop. Returns immediately if the server is not # running. # # @param timeout [nil,Numeric] The timeout. If `nil` (the default), waits # indefinitely, otherwise times out after the given number of seconds. # @return [self] # def wait_until_stopped timeout: nil @server&.thread&.join timeout self end ## # Determine if the web server is currently running # # @return [Boolean] # def running? @server&.thread&.alive? end ## # Returns pidfile if server is currently running # # @return [String, nil] # def pidfile @config.pidfile if running? end ## # Returns whether pidfile is present. # # @return [Boolean] # def pidfile? !!@config.pidfile && running? end ## # Cause this server to respond to SIGTERM, SIGINT, and SIGHUP by shutting # down gracefully. # # @return [self] # def respond_to_signals synchronize do return self if @signals_installed ::Signal.trap "SIGTERM" do Server.signal_enqueue "SIGTERM", @config.logger, @server end ::Signal.trap "SIGINT" do Server.signal_enqueue "SIGINT", @config.logger, @server end begin ::Signal.trap "SIGHUP" do Server.signal_enqueue "SIGHUP", @config.logger, @server end rescue ::ArgumentError # Not available on all systems end @signals_installed = true end self end class << self ## @private def start_signal_queue @signal_queue = ::Queue.new ::Thread.start do loop do signal, logger, server = @signal_queue.pop logger.info "FunctionsFramework: Caught #{signal}; shutting down server..." server&.stop end end end ## @private def signal_enqueue signal, logger, server @signal_queue << [signal, logger, server] end end start_signal_queue ## # The web server configuration. This object is yielded from the # {FunctionsFramework::Server} constructor and can be modified at that # point. Afterward, it is available from {FunctionsFramework::Server#config} # but it is frozen. # class Config ## # Create a new config object with the default settings # def initialize self.rack_env = nil self.bind_addr = nil self.port = nil self.pidfile = nil self.min_threads = nil self.max_threads = nil self.show_error_details = nil self.logger = nil end ## # Set the Rack environment, or `nil` to use the default. # @param rack_env [String,nil] # def rack_env= rack_env @rack_env = rack_env || ::ENV["RACK_ENV"] || (::ENV["K_REVISION"] ? "production" : "development") end ## # Set the bind address, or `nil` to use the default. # @param bind_addr [String,nil] # def bind_addr= bind_addr @bind_addr = bind_addr || ::ENV["FUNCTION_BIND_ADDR"] || "0.0.0.0" end ## # Set the port number, or `nil` to use the default. # @param port [Integer,nil] # def port= port @port = (port || ::ENV["PORT"] || 8080).to_i end ## # Set the pidfile string, or `nil` to use the default. # @param path [String,nil] # def pidfile= path @pidfile = (path || ::ENV["PIDFILE"] || "puma.pid").to_s end ## # Set the minimum number of worker threads, or `nil` to use the default. # @param min_threads [Integer,nil] # def min_threads= min_threads @min_threads = (min_threads || ::ENV["FUNCTION_MIN_THREADS"])&.to_i end ## # Set the maximum number of worker threads, or `nil` to use the default. # @param max_threads [Integer,nil] # def max_threads= max_threads @max_threads = (max_threads || ::ENV["FUNCTION_MAX_THREADS"])&.to_i end ## # Set whether to show detailed error messages, or `nil` to use the default. # @param show_error_details [Boolean,nil] # def show_error_details= show_error_details @show_error_details = if show_error_details.nil? !::ENV["FUNCTION_DETAILED_ERRORS"].to_s.empty? else !!show_error_details end end ## # Set the logger for server messages, or `nil` to use the global default. # @param logger [Logger] # def logger= logger @logger = logger || ::FunctionsFramework.logger end ## # Returns the current Rack environment. # @return [String] # def rack_env @rack_env end ## # Returns the current bind address. # @return [String] # def bind_addr @bind_addr end ## # Returns the current port number. # @return [Integer] # def port @port end ## # Returns the current pidfile string. # @return [String] # def pidfile @pidfile end ## # Returns the minimum number of worker threads in the thread pool. # @return [Integer] # def min_threads @min_threads || 1 end ## # Returns the maximum number of worker threads in the thread pool. # @return [Integer] # def max_threads @max_threads || 16 end ## # Returns whether to show detailed error messages. # @return [Boolean] # def show_error_details? @show_error_details.nil? ? (@rack_env == "development") : @show_error_details end ## # Returns the logger. # @return [Logger] # def logger @logger end end ## @private class AppBase EXCLUDED_PATHS = ["/favicon.ico", "/robots.txt"].freeze def initialize config @config = config end def excluded_path? env path = env[::Rack::SCRIPT_NAME].to_s + env[::Rack::PATH_INFO].to_s EXCLUDED_PATHS.include? path end def interpret_response response case response when ::Array response when ::Rack::Response response.finish when ::String string_response response, 200 when ::Hash string_response ::JSON.dump(response), 200, content_type: "application/json" when ::CloudEvents::CloudEventsError cloud_events_error_response response when ::StandardError message = "#{response.class}: #{response.message}" message = [message, *response.backtrace].join "\n\t" error_response message else error_response "Unexpected response type: #{response.class}" end end def notfound_response string_response "Not found", 404 end def no_content_response [204, [], []] end def string_response string, status, content_type: nil string.force_encoding ::Encoding::ASCII_8BIT unless string.valid_encoding? if string.encoding == ::Encoding::ASCII_8BIT content_type ||= "application/octet-stream" else content_type ||= "text/plain" content_type = "#{content_type}; charset=#{string.encoding.name.downcase}" end headers = { "content-type" => content_type, "content-length" => string.bytesize } [status, headers, [string]] end def cloud_events_error_response error @config.logger.warn error string_response "#{error.class}: #{error.message}", 400 end def error_response message @config.logger.error message message = "Unexpected internal error" unless @config.show_error_details? string_response message, 500 end def bad_request message message = "Bad Request" unless @config.show_error_details? string_response message, 400 end def flush_streams $stdout.flush $stderr.flush end end ## @private class HttpApp < AppBase def initialize function, globals, config super config @function = function @globals = globals end def call env return notfound_response if excluded_path? env response = begin logger = env[::Rack::RACK_LOGGER] ||= @config.logger request = ::Rack::Request.new env logger.info "FunctionsFramework: Handling HTTP #{request.request_method} request" @function.call request, globals: @globals, logger: logger rescue ::StandardError => e e end interpret_response response ensure flush_streams end end ## @private class TypedApp < AppBase def initialize function, globals, config super config @function = function @globals = globals end def call env return notfound_response if excluded_path? env begin logger = env[::Rack::RACK_LOGGER] ||= @config.logger request = ::Rack::Request.new env logger.info "FunctionsFramework: Handling Typed #{request.request_method} request" begin req = if @function.request_class request_class.decode_json request.body.read.to_s else JSON.parse request.body.read.to_s end rescue JSON::ParserError => e return bad_request e.message end res = @function.call req, globals: @globals, logger: logger return string_response res.to_json, 200, content_type: "application/json" if res string_response "", 204 rescue ::StandardError => e interpret_response e end ensure flush_streams end end ## @private class EventApp < AppBase def initialize function, globals, config super config @function = function @globals = globals @cloud_events = ::CloudEvents::HttpBinding.default @legacy_events = LegacyEventConverter.new end def call env return notfound_response if excluded_path? env return no_content_response if env[::Rack::REQUEST_METHOD] == "GET" logger = env[::Rack::RACK_LOGGER] ||= @config.logger event = decode_event env response = case event when ::CloudEvents::Event handle_cloud_event event, logger when ::Array ::CloudEvents::CloudEventsError.new "Batched CloudEvents are not supported" when ::CloudEvents::CloudEventsError event else raise "Unexpected event type: #{event.class}" end interpret_response response ensure flush_streams end private def decode_event env begin @cloud_events.decode_event env rescue ::CloudEvents::NotCloudEventError env[::Rack::RACK_INPUT].rewind rescue nil @legacy_events.decode_rack_env(env) || ::CloudEvents::CloudEventsError.new("Unrecognized event format") end rescue ::CloudEvents::CloudEventsError => e e end def handle_cloud_event event, logger logger.info "FunctionsFramework: Handling CloudEvent" @function.call event, globals: @globals, logger: logger "ok" rescue ::StandardError => e e end end end end