in src/v2/providers/pubsub.ts [107:185]
export function onMessagePublished<T = any>(
options: PubSubOptions,
handler: (event: CloudEvent<MessagePublishedData<T>>) => any | Promise<any>
): CloudFunction<MessagePublishedData<T>>;
export function onMessagePublished<T = any>(
topicOrOptions: string | PubSubOptions,
handler: (event: CloudEvent<MessagePublishedData<T>>) => any | Promise<any>
): CloudFunction<MessagePublishedData<T>> {
let topic: string;
let opts: options.EventHandlerOptions;
if (typeof topicOrOptions === 'string') {
topic = topicOrOptions;
opts = {};
} else {
topic = topicOrOptions.topic;
opts = { ...topicOrOptions };
delete (opts as any).topic;
}
const func = (raw: CloudEvent<unknown>) => {
const messagePublishedData = raw.data as {
message: unknown;
subscription: string;
};
messagePublishedData.message = new Message(messagePublishedData.message);
return handler(raw as CloudEvent<MessagePublishedData<T>>);
};
func.run = handler;
Object.defineProperty(func, '__trigger', {
get: () => {
const baseOpts = options.optionsToTriggerAnnotations(
options.getGlobalOptions()
);
const specificOpts = options.optionsToTriggerAnnotations(opts);
return {
// TODO(inlined): Remove "apiVersion" once the CLI has migrated to
// "platform"
apiVersion: 2,
platform: 'gcfv2',
...baseOpts,
...specificOpts,
labels: {
...baseOpts?.labels,
...specificOpts?.labels,
},
eventTrigger: {
eventType: 'google.cloud.pubsub.topic.v1.messagePublished',
resource: `projects/${process.env.GCLOUD_PROJECT}/topics/${topic}`,
},
};
},
});
const baseOpts = options.optionsToEndpoint(options.getGlobalOptions());
const specificOpts = options.optionsToEndpoint(opts);
const endpoint: ManifestEndpoint = {
platform: 'gcfv2',
...baseOpts,
...specificOpts,
labels: {
...baseOpts?.labels,
...specificOpts?.labels,
},
eventTrigger: {
eventType: 'google.cloud.pubsub.topic.v1.messagePublished',
eventFilters: { topic },
retry: false,
},
};
copyIfPresent(endpoint.eventTrigger, opts, 'retry', 'retry');
func.__endpoint = endpoint;
return func;
}