lib/functions_framework/server.rb (349 lines of code) (raw):
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require "json"
require "monitor"
require "puma"
require "puma/server"
require "rack"
module FunctionsFramework
##
# A web server that wraps a function.
#
class Server
include ::MonitorMixin
##
# Create a new web server given a function definition, a set of application
# globals, and server configuration.
#
# To configure the server, pass a block that takes a
# {FunctionsFramework::Server::Config} object as the parameter. This block
# is the only opportunity to modify the configuration; once the server is
# initialized, configuration is frozen.
#
# @param function [FunctionsFramework::Function] The function to execute.
# @param globals [Hash] Globals to pass to invocations. This hash should
# normally be frozen so separate function invocations cannot interfere
# with one another's globals.
# @yield [FunctionsFramework::Server::Config] A config object that can be
# manipulated to configure this server.
#
def initialize function, globals
super()
@config = Config.new
yield @config if block_given?
@config.freeze
@function = function
@app =
case function.type
when :http
HttpApp.new function, globals, @config
when :cloud_event
EventApp.new function, globals, @config
when :typed
TypedApp.new function, globals, @config
else
raise "Unrecognized function type: #{function.type}"
end
@server = nil
@signals_installed = false
end
##
# The function to execute.
# @return [FunctionsFramework::Function]
#
attr_reader :function
##
# The final configuration. This is a frozen object that cannot be modified.
# @return [FunctionsFramework::Server::Config]
#
attr_reader :config
##
# Start the web server in the background. Does nothing if the web server
# is already running.
#
# @return [self]
#
def start
synchronize do
unless running?
# Puma >= 6.0 interprets these settings from options
options = {
min_threads: @config.min_threads,
max_threads: @config.max_threads,
environment: @config.show_error_details? ? "development" : "production"
}
# Puma::Events.stdio for Puma < 6.0; otherwise nil for Puma >= 6.0
events = ::Puma::Events.stdio if ::Puma::Events.respond_to? :stdio
@server = ::Puma::Server.new @app, events, options
if @server.respond_to? :min_threads=
# Puma < 6.0 sets server attributes for these settings
@server.min_threads = @config.min_threads
@server.max_threads = @config.max_threads
@server.leak_stack_on_error = @config.show_error_details?
end
@server.binder.add_tcp_listener @config.bind_addr, @config.port
@config.logger.info "FunctionsFramework: Serving function #{@function.name.inspect} " \
"on port #{@config.port}..."
@server.run true
end
end
self
end
##
# Stop the web server in the background. Does nothing if the web server
# is not running.
#
# @param force [Boolean] Use a forced halt instead of a graceful shutdown
# @param wait [Boolean] Block until shutdown is complete
# @return [self]
#
def stop force: false, wait: false
synchronize do
if running?
@config.logger.info "FunctionsFramework: Shutting down server..."
if force
@server.halt wait
else
@server.stop wait
end
end
end
self
end
##
# Wait for the server to stop. Returns immediately if the server is not
# running.
#
# @param timeout [nil,Numeric] The timeout. If `nil` (the default), waits
# indefinitely, otherwise times out after the given number of seconds.
# @return [self]
#
def wait_until_stopped timeout: nil
@server&.thread&.join timeout
self
end
##
# Determine if the web server is currently running
#
# @return [Boolean]
#
def running?
@server&.thread&.alive?
end
##
# Returns pidfile if server is currently running
#
# @return [String, nil]
#
def pidfile
@config.pidfile if running?
end
##
# Returns whether pidfile is present.
#
# @return [Boolean]
#
def pidfile?
!!@config.pidfile && running?
end
##
# Cause this server to respond to SIGTERM, SIGINT, and SIGHUP by shutting
# down gracefully.
#
# @return [self]
#
def respond_to_signals
synchronize do
return self if @signals_installed
::Signal.trap "SIGTERM" do
Server.signal_enqueue "SIGTERM", @config.logger, @server
end
::Signal.trap "SIGINT" do
Server.signal_enqueue "SIGINT", @config.logger, @server
end
begin
::Signal.trap "SIGHUP" do
Server.signal_enqueue "SIGHUP", @config.logger, @server
end
rescue ::ArgumentError
# Not available on all systems
end
@signals_installed = true
end
self
end
class << self
## @private
def start_signal_queue
@signal_queue = ::Queue.new
::Thread.start do
loop do
signal, logger, server = @signal_queue.pop
logger.info "FunctionsFramework: Caught #{signal}; shutting down server..."
server&.stop
end
end
end
## @private
def signal_enqueue signal, logger, server
@signal_queue << [signal, logger, server]
end
end
start_signal_queue
##
# The web server configuration. This object is yielded from the
# {FunctionsFramework::Server} constructor and can be modified at that
# point. Afterward, it is available from {FunctionsFramework::Server#config}
# but it is frozen.
#
class Config
##
# Create a new config object with the default settings
#
def initialize
self.rack_env = nil
self.bind_addr = nil
self.port = nil
self.pidfile = nil
self.min_threads = nil
self.max_threads = nil
self.show_error_details = nil
self.logger = nil
end
##
# Set the Rack environment, or `nil` to use the default.
# @param rack_env [String,nil]
#
def rack_env= rack_env
@rack_env = rack_env || ::ENV["RACK_ENV"] ||
(::ENV["K_REVISION"] ? "production" : "development")
end
##
# Set the bind address, or `nil` to use the default.
# @param bind_addr [String,nil]
#
def bind_addr= bind_addr
@bind_addr = bind_addr || ::ENV["FUNCTION_BIND_ADDR"] || "0.0.0.0"
end
##
# Set the port number, or `nil` to use the default.
# @param port [Integer,nil]
#
def port= port
@port = (port || ::ENV["PORT"] || 8080).to_i
end
##
# Set the pidfile string, or `nil` to use the default.
# @param path [String,nil]
#
def pidfile= path
@pidfile = (path || ::ENV["PIDFILE"] || "puma.pid").to_s
end
##
# Set the minimum number of worker threads, or `nil` to use the default.
# @param min_threads [Integer,nil]
#
def min_threads= min_threads
@min_threads = (min_threads || ::ENV["FUNCTION_MIN_THREADS"])&.to_i
end
##
# Set the maximum number of worker threads, or `nil` to use the default.
# @param max_threads [Integer,nil]
#
def max_threads= max_threads
@max_threads = (max_threads || ::ENV["FUNCTION_MAX_THREADS"])&.to_i
end
##
# Set whether to show detailed error messages, or `nil` to use the default.
# @param show_error_details [Boolean,nil]
#
def show_error_details= show_error_details
@show_error_details =
if show_error_details.nil?
!::ENV["FUNCTION_DETAILED_ERRORS"].to_s.empty?
else
!!show_error_details
end
end
##
# Set the logger for server messages, or `nil` to use the global default.
# @param logger [Logger]
#
def logger= logger
@logger = logger || ::FunctionsFramework.logger
end
##
# Returns the current Rack environment.
# @return [String]
#
def rack_env
@rack_env
end
##
# Returns the current bind address.
# @return [String]
#
def bind_addr
@bind_addr
end
##
# Returns the current port number.
# @return [Integer]
#
def port
@port
end
##
# Returns the current pidfile string.
# @return [String]
#
def pidfile
@pidfile
end
##
# Returns the minimum number of worker threads in the thread pool.
# @return [Integer]
#
def min_threads
@min_threads || 1
end
##
# Returns the maximum number of worker threads in the thread pool.
# @return [Integer]
#
def max_threads
@max_threads || 16
end
##
# Returns whether to show detailed error messages.
# @return [Boolean]
#
def show_error_details?
@show_error_details.nil? ? (@rack_env == "development") : @show_error_details
end
##
# Returns the logger.
# @return [Logger]
#
def logger
@logger
end
end
## @private
class AppBase
EXCLUDED_PATHS = ["/favicon.ico", "/robots.txt"].freeze
def initialize config
@config = config
end
def excluded_path? env
path = env[::Rack::SCRIPT_NAME].to_s + env[::Rack::PATH_INFO].to_s
EXCLUDED_PATHS.include? path
end
def interpret_response response
case response
when ::Array
response
when ::Rack::Response
response.finish
when ::String
string_response response, 200
when ::Hash
string_response ::JSON.dump(response), 200, content_type: "application/json"
when ::CloudEvents::CloudEventsError
cloud_events_error_response response
when ::StandardError
message = "#{response.class}: #{response.message}"
message = [message, *response.backtrace].join "\n\t"
error_response message
else
error_response "Unexpected response type: #{response.class}"
end
end
def notfound_response
string_response "Not found", 404
end
def no_content_response
[204, [], []]
end
def string_response string, status, content_type: nil
string.force_encoding ::Encoding::ASCII_8BIT unless string.valid_encoding?
if string.encoding == ::Encoding::ASCII_8BIT
content_type ||= "application/octet-stream"
else
content_type ||= "text/plain"
content_type = "#{content_type}; charset=#{string.encoding.name.downcase}"
end
headers = {
"content-type" => content_type,
"content-length" => string.bytesize
}
[status, headers, [string]]
end
def cloud_events_error_response error
@config.logger.warn error
string_response "#{error.class}: #{error.message}", 400
end
def error_response message
@config.logger.error message
message = "Unexpected internal error" unless @config.show_error_details?
string_response message, 500
end
def bad_request message
message = "Bad Request" unless @config.show_error_details?
string_response message, 400
end
def flush_streams
$stdout.flush
$stderr.flush
end
end
## @private
class HttpApp < AppBase
def initialize function, globals, config
super config
@function = function
@globals = globals
end
def call env
return notfound_response if excluded_path? env
response =
begin
logger = env[::Rack::RACK_LOGGER] ||= @config.logger
request = ::Rack::Request.new env
logger.info "FunctionsFramework: Handling HTTP #{request.request_method} request"
@function.call request, globals: @globals, logger: logger
rescue ::StandardError => e
e
end
interpret_response response
ensure
flush_streams
end
end
## @private
class TypedApp < AppBase
def initialize function, globals, config
super config
@function = function
@globals = globals
end
def call env
return notfound_response if excluded_path? env
begin
logger = env[::Rack::RACK_LOGGER] ||= @config.logger
request = ::Rack::Request.new env
logger.info "FunctionsFramework: Handling Typed #{request.request_method} request"
begin
req = if @function.request_class
request_class.decode_json request.body.read.to_s
else
JSON.parse request.body.read.to_s
end
rescue JSON::ParserError => e
return bad_request e.message
end
res = @function.call req, globals: @globals, logger: logger
return string_response res.to_json, 200, content_type: "application/json" if res
string_response "", 204
rescue ::StandardError => e
interpret_response e
end
ensure
flush_streams
end
end
## @private
class EventApp < AppBase
def initialize function, globals, config
super config
@function = function
@globals = globals
@cloud_events = ::CloudEvents::HttpBinding.default
@legacy_events = LegacyEventConverter.new
end
def call env
return notfound_response if excluded_path? env
return no_content_response if env[::Rack::REQUEST_METHOD] == "GET"
logger = env[::Rack::RACK_LOGGER] ||= @config.logger
event = decode_event env
response =
case event
when ::CloudEvents::Event
handle_cloud_event event, logger
when ::Array
::CloudEvents::CloudEventsError.new "Batched CloudEvents are not supported"
when ::CloudEvents::CloudEventsError
event
else
raise "Unexpected event type: #{event.class}"
end
interpret_response response
ensure
flush_streams
end
private
def decode_event env
begin
@cloud_events.decode_event env
rescue ::CloudEvents::NotCloudEventError
env[::Rack::RACK_INPUT].rewind rescue nil
@legacy_events.decode_rack_env(env) || ::CloudEvents::CloudEventsError.new("Unrecognized event format")
end
rescue ::CloudEvents::CloudEventsError => e
e
end
def handle_cloud_event event, logger
logger.info "FunctionsFramework: Handling CloudEvent"
@function.call event, globals: @globals, logger: logger
"ok"
rescue ::StandardError => e
e
end
end
end
end