in api/v1/src/admin/dataManager.js [318:427]
async function initializePubSubListener() {
const projectId = await runtimeConfig.getCurrentProjectId();
if (!projectId) {
console.log('Could not identify project, will not start up subscription');
return;
}
if (await runtimeConfig.commerceEnabled(projectId) === false) {
console.log('Marketplace integration is disabled, PubSub listener will not be started');
return;
} else {
console.log(`Initializing PubSub listener`);
}
const topicName = `projects/cloudcommerceproc-prod/topics/${projectId}`;
console.log(`Procurement topic name: ${topicName}`);
const subscriptionName = `projects/${projectId}/subscriptions/procurement-${projectId}`;
console.log(`Datashare procurement subscription name: ${subscriptionName}`);
const pubSubUtil = new PubSubUtil(projectId);
console.log('Checking if subscription exists');
const exists = await pubSubUtil.checkIfSubscriptionExists(topicName, projectId, `procurement-${projectId}`);
if (exists === true) {
console.log(`Subscription '${subscriptionName}' already exists`)
} else {
try {
const options = {
ackDeadlineSeconds: 600,
expirationPolicy: { seconds: null },
messageRetentionDuration: (60 * 60 * 24 * 7)
};
await pubSubUtil.createSubscription(topicName, subscriptionName, options);
console.log(`Subscription '${subscriptionName}' created.`);
} catch (err) {
console.error(`Unable to create subscription: '${subscriptionName}' to topic: '${topicName}'. Ensure that the topic exists and you have the proper permissions.`);
return;
}
}
// Subscribe
async function listenForMessages() {
try {
console.log(`Creating message handler for subscription: ${subscriptionName}`);
const messageHandler = async message => {
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${message.data}`);
console.log(`\tAttributes: ${JSON.stringify(message.attributes)}`);
if (message.data) {
const data = JSON.parse(message.data);
console.log(`Event type is: ${data.eventType}`);
const eventType = data.eventType;
if (eventType === 'ENTITLEMENT_CREATION_REQUESTED' || eventType === 'ENTITLEMENT_PLAN_CHANGE_REQUESTED') {
// Perform auto-approve here for policies that have auto-approve enabled
console.log(`Running auto approve for eventType: ${eventType}`);
await procurementManager.autoApproveEntitlement(projectId, data.entitlement.id)
} else if (eventType === 'ENTITLEMENT_ACTIVE') {
// Grant permissions for newly active entitlement
await procurementManager.activateNewEntitlement(projectId, data.entitlement.id)
} else if (eventType === 'ENTITLEMENT_CANCELLED' || eventType === 'ENTITLEMENT_DELETED' || eventType === 'ENTITLEMENT_SUSPENDED') {
// Remove user from the policy
console.log(`Running cancellation for eventType: ${eventType}`);
await procurementManager.cancelEntitlement(projectId, data.entitlement.id)
} else if (eventType === 'ENTITLEMENT_PLAN_CHANGED') {
// Grant permissions for the plan change
console.log(`Running auto approve for eventType: ${eventType}`);
await procurementManager.activateNewPlanChange(projectId, data.entitlement.id);
} else if (eventType === 'ACCOUNT_DELETED') {
// Delete the user account
console.log(`Running delete account for eventType: ${eventType}`);
await procurementManager.deleteAccount(projectId, data.account.id);
} else {
console.debug(`Event type not implemented: ${eventType}`);
}
}
// "Ack" (acknowledge receipt of) the message
message.ack();
};
// Create an event handler to handle errors
const errorHandler = function (error) {
console.error(`ERROR: ${error}`);
};
const subscriberOptions = {
flowControl: {
maxMessages: 1,
}
};
let subscription = pubSubUtil.getSubscription(subscriptionName, subscriberOptions);
subscription.on('message', messageHandler);
subscription.on('error', errorHandler);
subscription.on('close', () => { console.error('Subscription closed') });
subscription.detached((err, exists) => {
console.log(`Is subscription detached: ${exists}`);
if (err) {
console.error(err);
}
});
} catch (err) {
console.error(err);
}
}
// If a new subscription was created, delay to give it time to finish creating
// Even though the create returns a subscription object, you can't attached to .on immediately
let delay = exists === true ? 0 : 60000;
setTimeout(listenForMessages, delay);
}