async function performTopicUpdate()

in api/v1/src/lib/appliers/pubsubApplier.js [77:147]


async function performTopicUpdate(projectId, topicName, accounts) {
    console.log(`Begin IAM update for topic: ${topicName}`);
    const pubsubUtil = new PubSubUtil(projectId);
    const exists = await pubsubUtil.topicExists(topicName);
    if (!exists) {
        console.warn(`Skipping IAM update for non-existant topic: ${topicName}`);
        return false;
    }
    const viewerRole = await runtimeConfig.pubsubSubscriberRole(projectId);
    let isDirty = false;
    const topicPolicy = await pubsubUtil.getTopicIamPolicy(topicName);
    let readBinding = {};
    let bindingExists = false;
    if (topicPolicy.bindings) {
        const viewerRoleBinding = underscore.findWhere(topicPolicy.bindings, { role: viewerRole });
        if (viewerRoleBinding) {
            readBinding = viewerRoleBinding;
            bindingExists = true;
            let i = readBinding.members.length;
            while (i--) {
                let member = readBinding.members[i];
                let arr = member.split(':');
                let type = arr[0];
                let email = arr[1];
                if (cfg.managedIamAccessTypes.includes(type)) {
                    const shouldHaveAccess = underscore.findWhere(accounts, { email: email, emailType: type });
                    if (!shouldHaveAccess) {
                        console.log(`Deleting user: ${type}:${email} from topic: ${topicName}`);
                        readBinding.members.splice(i, 1);
                        isDirty = true;
                    }
                }
            }
        }
    } else {
        topicPolicy.bindings = [];
    }

    if (!bindingExists) {
        readBinding.role = viewerRole;
        readBinding.members = [];
        topicPolicy.bindings.push(readBinding);
    }

    accounts.forEach(account => {
        if (account.email && account.emailType) {
            const identifier = `${account.emailType}:${account.email}`;
            const accessRecordExists = readBinding.members.includes(identifier);
            if (!accessRecordExists) {
                readBinding.members.push(identifier);
                isDirty = true;
                console.log(`Adding access record to topic: ${topicName}: ${JSON.stringify(account)}`);
            }
        }
    });

    if (isDirty === true) {
        try {
            await pubsubUtil.setTopicIamPolicy(topicName, topicPolicy);
            console.info(`Policy set successfully for topic '${topicName}'`);
        } catch (err) {
            console.error(`Failed to set policy for topic '${topicName}' with error '${err}' and payload: ${JSON.stringify(topicPolicy)}`);
            throw err;
        }
    } else {
        console.info(`Metadata is already up to date for topic: '${topicName}'`);
    }

    console.log(`End IAM update for topic: ${topicName}`);
    return isDirty;
}