shared/pubSubUtil.js (116 lines of code) (raw):

/** * Copyright 2019 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ 'use strict'; const { PubSub } = require('@google-cloud/pubsub'); const pubsub = require('@google-cloud/pubsub'); class PubSubUtil { constructor(projectId) { const options = { scopes: ['https://www.googleapis.com/auth/cloud-platform'] }; if (projectId) { options.projectId = projectId; } this.pubsub = new PubSub(options); this.client = new pubsub.v1.SubscriberClient(options); } get VERBOSE_MODE() { return process.env.VERBOSE_MODE; } /** */ async getTopics() { const [topics] = await this.pubsub.getTopics(); return topics; } /** * @param {string} topicName * create topic by name and return true */ async createTopic(topicName) { return this.pubsub.createTopic(topicName); } /** * @param {string} topicName * delete topic by name and return true */ async deleteTopic(topicName) { return this.pubsub.topic(topicName).delete(); } /** * @param {string} topicName * @param {string} subscriptionName * @param {object} options * create subscription by name and return true */ async createSubscription(topicName, subscriptionName, options) { const sub = await this.pubsub.topic(topicName).createSubscription(subscriptionName, options); if (this.VERBOSE_MODE) { console.log(`Subscription '${subscriptionName}' created for '${topicName}' with options: ${options}.`); } return sub[0]; } /** * @param {string} topicName * @param {string} subscriptionName * delete subscription by name and return true */ async deleteSubscription(topicName, subscriptionName) { await this.pubsub.subscription(subscriptionName).delete(); if (this.VERBOSE_MODE) { console.log(`Subscription '${subscriptionName}' deleted for '${topicName}'.`); } return true; } /** * @param {string} topicName * @param {string} subscriptionProjectId * @param {string} subscriptionName * check if the subscription exists by name and return true if exists * this function will only work if you have the necessary permissions on the topic */ async checkIfSubscriptionExists(topicName, subscriptionProjectId, subscriptionName) { const formattedSubscription = `projects/${subscriptionProjectId}/subscriptions/${subscriptionName}`; const subscription = this.getSubscription(formattedSubscription); const response = await subscription.exists(); const exists = response[0]; return exists; } /** * @param {} topicName */ async topicExists(topicName) { const topic = this.pubsub.topic(topicName); const exists = await topic.exists().catch((err) => { console.error(err.message); throw err; }); return exists[0]; } /** * @param {} subscriptionName * @param {} subscriberOptions */ getSubscription(subscriptionName, subscriberOptions) { return this.pubsub.subscription(subscriptionName, subscriberOptions); } /** * @param {string} subscriptionProjectId * @param {string} subscriptionName * get a message syncronously and ack it. throw an error if no messages in the subscription */ async getMessage(subscriptionProjectId, subscriptionName) { // The maximum number of messages returned for this request. const formattedSubscription = this.client.subscriptionPath(subscriptionProjectId, subscriptionName); const maxMessages = 1; const request = { subscription: formattedSubscription, maxMessages: maxMessages, returnImmediately: true }; // The subscriber pulls a specified number of messages. const [response] = await this.client.pull(request); if (response.receivedMessages.length === 0) { throw new Error('Pubsub message(s) not found.'); } // Obtain the first message. const message = response.receivedMessages[0]; const ackRequest = { subscription: formattedSubscription, ackIds: [message.ackId], }; //..acknowledges the message. const ack = await this.client.acknowledge(ackRequest); if (this.VERBOSE_MODE) { console.log(`Message ${message.message.messageId} acknowledged.`); } // Return the message contents. return message; } /** * @param {string} topicName * @param {Object} message * @param {Object} customAttributes * publish a message to a topicName with custom attributes and return the message id */ async publishMessage(topicName, message, customAttributes) { const jsonString = JSON.stringify(message); const dataBuffer = Buffer.from(jsonString); const messageId = await this.pubsub.topic(topicName).publish(dataBuffer, customAttributes); if (this.VERBOSE_MODE) { console.log(`Message '${messageId}' published to '${topicName}'.`); } return messageId; } /** * @param {} topicName * https://googleapis.dev/nodejs/pubsub/latest/IAM.html#getPolicy * https://github.com/googleapis/nodejs-pubsub/blob/master/samples/getTopicPolicy.js */ async getTopicIamPolicy(topicName) { const topic = this.pubsub.topic(topicName); return topic.iam.getPolicy() .then(data => { const policy = data[0]; const apiResponse = data[1]; return policy; }).catch((err) => { console.error(err); throw err; }); } /** * @param {} topicName * @param {} policy * https://googleapis.dev/nodejs/pubsub/latest/IAM.html#setPolicy * https://github.com/googleapis/nodejs-pubsub/blob/master/samples/setTopicPolicy.js */ async setTopicIamPolicy(topicName, policy) { const topic = this.pubsub.topic(topicName); return topic.iam.setPolicy(policy).then(data => { const policy = data[0]; const apiResponse = data[1]; return policy; }).catch((err) => { console.error(err); throw err; }); } } module.exports = PubSubUtil;