src/pubsub_middleware.ts (77 lines of code) (raw):

// Copyright 2019 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. import {Request, Response, NextFunction} from 'express'; import {isBinaryCloudEvent} from './cloud_events'; const PUBSUB_EVENT_TYPE = 'google.pubsub.topic.publish'; const PUBSUB_MESSAGE_TYPE = 'type.googleapis.com/google.pubsub.v1.PubsubMessage'; const PUBSUB_SERVICE = 'pubsub.googleapis.com'; /** * The request body of an HTTP request received directly from a Pub/Sub subscription. * * {@link https://cloud.google.com/pubsub/docs/push?hl=en#receiving_messages} */ export interface RawPubSubBody { /** * The name of the subscription for which this request was made. Format is: * projects/{project}/subscriptions/{sub}. */ subscription: string; /** * A message that is published by publishers and consumed by subscribers. The message must * contain either a non-empty data field or at least one attribute. * * {@link https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage} */ message: { /** * Attributes for this message. If this field is empty, the message must contain non-empty * data. */ attributes?: {[key: string]: string}; /** * Base64 encoded message data. If this field is empty, the message must contain at least one * attribute. */ data: string; /** * ID of this message, assigned by the server when the message is published. Guaranteed to be * unique within the topic. */ messageId: string; /** * If non-empty, identifies related messages for which publish order should be respected. This * field is not set by the Pub/Sub emulator. */ orderingKey?: string; /** * The time at which the message was published, formatted as timestamp in RFC3339 UTC "Zulu" * format. This field is not set by the Pub/Sub emulator. */ publishTime?: string; }; } /** * The request body schema that is expected by the downstream by the function loader for Pub/Sub * event functions. */ export interface MarshalledPubSubBody { context: { eventId: string; timestamp: string; eventType: typeof PUBSUB_EVENT_TYPE; resource: { service: typeof PUBSUB_SERVICE; type: typeof PUBSUB_MESSAGE_TYPE; name: string | null; }; }; data: { '@type': typeof PUBSUB_MESSAGE_TYPE; data: string; attributes: {[key: string]: string}; }; } /** * Type predicate that checks if a given Request is a RawPubSubRequest * @param request - A Request object to typecheck * @returns true if this Request is a RawPubSubRequest */ // eslint-disable-next-line @typescript-eslint/no-explicit-any const isRawPubSubRequestBody = (body: any): body is RawPubSubBody => { return !!( body && !body.context && body.subscription && body.message && body.message.data && body.message.messageId ); }; /** * Extract the Pub/Sub topic name from the HTTP request path. * @param path - The URL path of the http request * @returns the Pub/Sub topic name if the path matches the expected format, * null otherwise */ const extractPubSubTopic = (path: string): string | null => { const parsedTopic = path.match(/projects\/[^/?]+\/topics\/[^/?]+/); if (parsedTopic) { return parsedTopic[0]; } console.warn('Failed to extract the topic name from the URL path.'); console.warn( "Configure your subscription's push endpoint to use the following path: ", 'projects/PROJECT_NAME/topics/TOPIC_NAME', ); return null; }; /** * Marshal the body of an HTTP request from a Pub/Sub subscription * @param body - An unmarshalled http request body from a Pub/Sub push subscription * @param path - The HTTP request path * @returns the marshalled request body expected by wrapEventFunction */ const marshalPubSubRequestBody = ( body: RawPubSubBody, path: string, ): MarshalledPubSubBody => ({ context: { eventId: body.message.messageId, timestamp: body.message.publishTime || new Date().toISOString(), eventType: PUBSUB_EVENT_TYPE, resource: { service: PUBSUB_SERVICE, type: PUBSUB_MESSAGE_TYPE, name: extractPubSubTopic(path), }, }, data: { '@type': PUBSUB_MESSAGE_TYPE, data: body.message.data, attributes: body.message.attributes || {}, }, }); /** * Express middleware used to marshal the HTTP request body received directly from a * Pub/Sub subscription into the format that is expected downstream by wrapEventFunction * @param req - Express request object * @param res - Express response object * @param next - Function used to pass control to the next middle middleware function in the stack */ export const legacyPubSubEventMiddleware = ( req: Request, res: Response, next: NextFunction, ) => { const {body, path} = req; if (isRawPubSubRequestBody(body) && !isBinaryCloudEvent(req)) { req.body = marshalPubSubRequestBody(body, path); } next(); };