async function initializePubSubListener()

in api/v1/src/admin/dataManager.js [318:427]


async function initializePubSubListener() {
    const projectId = await runtimeConfig.getCurrentProjectId();
    if (!projectId) {
        console.log('Could not identify project, will not start up subscription');
        return;
    }

    if (await runtimeConfig.commerceEnabled(projectId) === false) {
        console.log('Marketplace integration is disabled, PubSub listener will not be started');
        return;
    } else {
        console.log(`Initializing PubSub listener`);
    }

    const topicName = `projects/cloudcommerceproc-prod/topics/${projectId}`;
    console.log(`Procurement topic name: ${topicName}`);
    const subscriptionName = `projects/${projectId}/subscriptions/procurement-${projectId}`;
    console.log(`Datashare procurement subscription name: ${subscriptionName}`);
    const pubSubUtil = new PubSubUtil(projectId);
    console.log('Checking if subscription exists');
    const exists = await pubSubUtil.checkIfSubscriptionExists(topicName, projectId, `procurement-${projectId}`);

    if (exists === true) {
        console.log(`Subscription '${subscriptionName}' already exists`)
    } else {
        try {
            const options = {
                ackDeadlineSeconds: 600,
                expirationPolicy: { seconds: null },
                messageRetentionDuration: (60 * 60 * 24 * 7)
            };
            await pubSubUtil.createSubscription(topicName, subscriptionName, options);
            console.log(`Subscription '${subscriptionName}' created.`);
        } catch (err) {
            console.error(`Unable to create subscription: '${subscriptionName}' to topic: '${topicName}'. Ensure that the topic exists and you have the proper permissions.`);
            return;
        }
    }

    // Subscribe
    async function listenForMessages() {
        try {
            console.log(`Creating message handler for subscription: ${subscriptionName}`);
            const messageHandler = async message => {
                console.log(`Received message ${message.id}:`);
                console.log(`\tData: ${message.data}`);
                console.log(`\tAttributes: ${JSON.stringify(message.attributes)}`);

                if (message.data) {
                    const data = JSON.parse(message.data);
                    console.log(`Event type is: ${data.eventType}`);
                    const eventType = data.eventType;
                    if (eventType === 'ENTITLEMENT_CREATION_REQUESTED' || eventType === 'ENTITLEMENT_PLAN_CHANGE_REQUESTED') {
                        // Perform auto-approve here for policies that have auto-approve enabled
                        console.log(`Running auto approve for eventType: ${eventType}`);
                        await procurementManager.autoApproveEntitlement(projectId, data.entitlement.id)
                    } else if (eventType === 'ENTITLEMENT_ACTIVE') {
                        // Grant permissions for newly active entitlement
                        await procurementManager.activateNewEntitlement(projectId, data.entitlement.id)
                    } else if (eventType === 'ENTITLEMENT_CANCELLED' || eventType === 'ENTITLEMENT_DELETED' || eventType === 'ENTITLEMENT_SUSPENDED') {
                        // Remove user from the policy
                        console.log(`Running cancellation for eventType: ${eventType}`);
                        await procurementManager.cancelEntitlement(projectId, data.entitlement.id)
                    } else if (eventType === 'ENTITLEMENT_PLAN_CHANGED') {
                        // Grant permissions for the plan change
                        console.log(`Running auto approve for eventType: ${eventType}`);
                        await procurementManager.activateNewPlanChange(projectId, data.entitlement.id);
                    } else if (eventType === 'ACCOUNT_DELETED') {
                        // Delete the user account
                        console.log(`Running delete account for eventType: ${eventType}`);
                        await procurementManager.deleteAccount(projectId, data.account.id);
                    } else {
                        console.debug(`Event type not implemented: ${eventType}`);
                    }
                }

                // "Ack" (acknowledge receipt of) the message
                message.ack();
            };

            // Create an event handler to handle errors
            const errorHandler = function (error) {
                console.error(`ERROR: ${error}`);
            };

            const subscriberOptions = {
                flowControl: {
                    maxMessages: 1,
                }
            };
            let subscription = pubSubUtil.getSubscription(subscriptionName, subscriberOptions);
            subscription.on('message', messageHandler);
            subscription.on('error', errorHandler);
            subscription.on('close', () => { console.error('Subscription closed') });
            subscription.detached((err, exists) => {
                console.log(`Is subscription detached: ${exists}`);
                if (err) {
                    console.error(err);
                }
            });
        } catch (err) {
            console.error(err);
        }
    }

    // If a new subscription was created, delay to give it time to finish creating
    // Even though the create returns a subscription object, you can't attached to .on immediately
    let delay = exists === true ? 0 : 60000;
    setTimeout(listenForMessages, delay);
}