lib/functions_framework/legacy_event_converter.rb (180 lines of code) (raw):
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require "json"
module FunctionsFramework
##
# Converter from legacy GCF event formats to CloudEvents.
#
class LegacyEventConverter
##
# Decode an event from the given Rack environment hash.
#
# @param env [Hash] The Rack environment
# @return [::CloudEvents::Event] if the request could be converted
# @return [nil] if the event format was not recognized.
#
def decode_rack_env env
content_type = ::CloudEvents::ContentType.new env["CONTENT_TYPE"], default_charset: "utf-8"
return nil unless content_type.media_type == "application" && content_type.subtype_base == "json"
input = read_input_json env["rack.input"], content_type.charset
return nil unless input
input = convert_raw_pubsub_event input, env if raw_pubsub_payload? input
context = normalized_context input
return nil unless context
construct_cloud_event context, input["data"]
end
private
def read_input_json input, charset
input = input.read if input.respond_to? :read
input.force_encoding charset if charset
content = ::JSON.parse input
content = nil unless content.is_a? ::Hash
content
rescue ::JSON::ParserError
nil
end
def raw_pubsub_payload? input
return false if input.include?("context") || !input.include?("subscription")
message = input["message"]
message.is_a?(::Hash) && message.include?("data") && message.include?("messageId")
end
def convert_raw_pubsub_event input, env
message = input["message"]
path = "#{env['SCRIPT_NAME']}#{env['PATH_INFO']}"
path_match = %r{projects/[^/?]+/topics/[^/?]+}.match path
topic = path_match ? path_match[0] : "UNKNOWN_PUBSUB_TOPIC"
timestamp = message["publishTime"] || ::Time.now.utc.strftime("%Y-%m-%dT%H:%M:%S.%6NZ")
{
"context" => {
"eventId" => message["messageId"],
"timestamp" => timestamp,
"eventType" => "google.pubsub.topic.publish",
"resource" => {
"service" => "pubsub.googleapis.com",
"type" => "type.googleapis.com/google.pubsub.v1.PubsubMessage",
"name" => topic
}
},
"data" => {
"@type" => "type.googleapis.com/google.pubsub.v1.PubsubMessage",
"data" => message["data"],
"attributes" => message["attributes"]
}
}
end
def normalized_context input
id = normalized_context_field input, "eventId"
timestamp = normalized_context_field input, "timestamp"
type = normalized_context_field input, "eventType"
domain = normalized_context_field input, "domain"
service, resource = analyze_resource normalized_context_field input, "resource"
service ||= service_from_type type
return nil unless id && timestamp && type && service && resource
{ id: id, timestamp: timestamp, type: type, service: service, resource: resource, domain: domain }
end
def normalized_context_field input, field
input["context"]&.[](field) || input[field]
end
def analyze_resource raw_resource
service = resource = nil
case raw_resource
when ::Hash
service = raw_resource["service"]
resource = raw_resource["name"]
when ::String
resource = raw_resource
end
[service, resource]
end
def service_from_type type
LEGACY_TYPE_TO_SERVICE.each do |pattern, service|
return service if pattern =~ type
end
nil
end
def construct_cloud_event context, data
source, subject = convert_source context[:service], context[:resource], context[:domain]
type = LEGACY_TYPE_TO_CE_TYPE[context[:type]]
return nil unless type && source
ce_data, data_subject = convert_data context, data
content_type = "application/json"
::CloudEvents::Event.new id: context[:id],
source: source,
type: type,
spec_version: "1.0",
data_content_type: content_type,
data: ce_data,
subject: subject || data_subject,
time: context[:timestamp]
end
def convert_source service, resource, domain
return ["//#{service}/#{resource}", nil] unless CE_SERVICE_TO_RESOURCE_RE.key? service
match = CE_SERVICE_TO_RESOURCE_RE[service].match resource
return [nil, nil] unless match
resource_fragment = match[1]
subject = match[2]
if service == "firebasedatabase.googleapis.com"
location =
case domain
when "firebaseio.com"
"us-central1"
when /^([\w-]+)\./
Regexp.last_match[1]
else
return [nil, nil]
end
["//#{service}/projects/_/locations/#{location}/#{resource_fragment}", subject]
else
["//#{service}/#{resource_fragment}", subject]
end
end
def convert_data context, data
service = context[:service]
case service
when "pubsub.googleapis.com"
data["messageId"] = context[:id]
data["publishTime"] = context[:timestamp]
[{ "message" => data }, nil]
when "firebaseauth.googleapis.com"
if data.key? "metadata"
FIREBASE_AUTH_METADATA_LEGACY_TO_CE.each do |old_key, new_key|
if data["metadata"].key? old_key
data["metadata"][new_key] = data["metadata"][old_key]
data["metadata"].delete old_key
end
end
end
subject = "users/#{data['uid']}" if data.key? "uid"
[data, subject]
else
[data, nil]
end
end
LEGACY_TYPE_TO_SERVICE = {
%r{^providers/cloud\.firestore/} => "firestore.googleapis.com",
%r{^providers/cloud\.pubsub/} => "pubsub.googleapis.com",
%r{^providers/cloud\.storage/} => "storage.googleapis.com",
%r{^providers/firebase\.auth/} => "firebaseauth.googleapis.com",
%r{^providers/google\.firebase\.analytics/} => "firebase.googleapis.com",
%r{^providers/google\.firebase\.database/} => "firebasedatabase.googleapis.com"
}.freeze
LEGACY_TYPE_TO_CE_TYPE = {
"google.pubsub.topic.publish" => "google.cloud.pubsub.topic.v1.messagePublished",
"providers/cloud.pubsub/eventTypes/topic.publish" => "google.cloud.pubsub.topic.v1.messagePublished",
"google.storage.object.finalize" => "google.cloud.storage.object.v1.finalized",
"google.storage.object.delete" => "google.cloud.storage.object.v1.deleted",
"google.storage.object.archive" => "google.cloud.storage.object.v1.archived",
"google.storage.object.metadataUpdate" => "google.cloud.storage.object.v1.metadataUpdated",
"providers/cloud.firestore/eventTypes/document.write" => "google.cloud.firestore.document.v1.written",
"providers/cloud.firestore/eventTypes/document.create" => "google.cloud.firestore.document.v1.created",
"providers/cloud.firestore/eventTypes/document.update" => "google.cloud.firestore.document.v1.updated",
"providers/cloud.firestore/eventTypes/document.delete" => "google.cloud.firestore.document.v1.deleted",
"providers/firebase.auth/eventTypes/user.create" => "google.firebase.auth.user.v1.created",
"providers/firebase.auth/eventTypes/user.delete" => "google.firebase.auth.user.v1.deleted",
"providers/google.firebase.analytics/eventTypes/event.log" => "google.firebase.analytics.log.v1.written",
"providers/google.firebase.database/eventTypes/ref.create" => "google.firebase.database.ref.v1.created",
"providers/google.firebase.database/eventTypes/ref.write" => "google.firebase.database.ref.v1.written",
"providers/google.firebase.database/eventTypes/ref.update" => "google.firebase.database.ref.v1.updated",
"providers/google.firebase.database/eventTypes/ref.delete" => "google.firebase.database.ref.v1.deleted",
"providers/cloud.storage/eventTypes/object.change" => "google.cloud.storage.object.v1.finalized"
}.freeze
CE_SERVICE_TO_RESOURCE_RE = {
"firebase.googleapis.com" => %r{^(projects/[^/]+)/(events/[^/]+)$},
"firebasedatabase.googleapis.com" => %r{^projects/_/(instances/[^/]+)/(refs/.+)$},
"firestore.googleapis.com" => %r{^(projects/[^/]+/databases/\(default\))/(documents/.+)$},
"storage.googleapis.com" => %r{^(projects/[^/]+/buckets/[^/]+)/([^#]+)(?:#.*)?$}
}.freeze
# Map Firebase Auth legacy event metadata field names to their equivalent CloudEvent field names.
FIREBASE_AUTH_METADATA_LEGACY_TO_CE = {
"createdAt" => "createTime",
"lastSignedInAt" => "lastSignInTime"
}.freeze
end
end