in api/v1/src/lib/appliers/pubsubApplier.js [29:70]
async function applyPolicies(projectId, policyIds, fullRefresh) {
const labelKey = cfg.cdsManagedLabelKey;
let options = {};
const bigqueryUtil = new BigQueryUtil(projectId);
const topicPermissionDiffProcedure = bigqueryUtil.getTableFqdn(projectId, cfg.cdsDatasetId, cfg.topicPermissionDiffProcedureId);
if (!fullRefresh && policyIds && policyIds.length > 0) {
options = {
query: `CALL \`${topicPermissionDiffProcedure}\`(@policyIds)`,
params: { policyIds: policyIds }
};
} else {
options = {
query: `CALL \`${topicPermissionDiffProcedure}\`(null)`
};
}
const [rows] = await bigqueryUtil.executeQuery(options);
console.log(`Pub/Sub Topic Permission Diff Result: ${JSON.stringify(rows, null, 3)}`);
const pubsubUtil = new PubSubUtil(projectId);
if (fullRefresh === true) {
// Update all managed buckets
const topics = await pubsubUtil.getTopics();
for (const topic of topics) {
if (underscore.has(topic.metadata.labels, labelKey)) {
const topicId = topic.name.substring(topic.name.lastIndexOf('/') + 1);
let topicPolicyRecord = underscore.findWhere(rows, { topicId: topicId });
let accounts = [];
if (topicPolicyRecord) {
accounts = topicPolicyRecord.accounts;
}
await performTopicUpdate(projectId, topicId, accounts);
}
}
} else {
// Differential update, iterate over result based on the policyId filter only
for (const row of rows) {
await performTopicUpdate(projectId, row.topicId, row.accounts);
}
}
}