async function applyPolicies()

in api/v1/src/lib/appliers/pubsubApplier.js [29:70]


async function applyPolicies(projectId, policyIds, fullRefresh) {
    const labelKey = cfg.cdsManagedLabelKey;
    let options = {};
    const bigqueryUtil = new BigQueryUtil(projectId);
    const topicPermissionDiffProcedure = bigqueryUtil.getTableFqdn(projectId, cfg.cdsDatasetId, cfg.topicPermissionDiffProcedureId);

    if (!fullRefresh && policyIds && policyIds.length > 0) {
        options = {
            query: `CALL \`${topicPermissionDiffProcedure}\`(@policyIds)`,
            params: { policyIds: policyIds }
        };
    } else {
        options = {
            query: `CALL \`${topicPermissionDiffProcedure}\`(null)`
        };
    }

    const [rows] = await bigqueryUtil.executeQuery(options);
    console.log(`Pub/Sub Topic Permission Diff Result: ${JSON.stringify(rows, null, 3)}`);

    const pubsubUtil = new PubSubUtil(projectId);
    if (fullRefresh === true) {
        // Update all managed buckets
        const topics = await pubsubUtil.getTopics();
        for (const topic of topics) {
            if (underscore.has(topic.metadata.labels, labelKey)) {
                const topicId = topic.name.substring(topic.name.lastIndexOf('/') + 1);
                let topicPolicyRecord = underscore.findWhere(rows, { topicId: topicId });
                let accounts = [];
                if (topicPolicyRecord) {
                    accounts = topicPolicyRecord.accounts;
                }
                await performTopicUpdate(projectId, topicId, accounts);
            }
        }
    } else {
        // Differential update, iterate over result based on the policyId filter only 
        for (const row of rows) {
            await performTopicUpdate(projectId, row.topicId, row.accounts);
        }
    }
}