in services/library/src/main/java/com/google/cloud/pso/bq_pii_classifier/functions/tagger/Tagger.java [85:186]
public Map<String, String> execute(
Operation request,
String lookUpKey,
String pubSubMessageId
) throws IOException, InterruptedException, NonRetryableApplicationException {
logger.logFunctionStart(request.getTrackingId());
logger.logInfoWithTracker(request.getTrackingId(),
String.format("Request : %s", request.toString()));
/**
* Check if we already processed this pubSubMessageId before to avoid submitting BQ queries
* in case we have unexpected errors with PubSub re-sending the message. This is an extra measure to avoid unnecessary cost.
* We do that by keeping simple flag files in GCS with the pubSubMessageId as file name.
*/
String flagFileName = String.format("%s/%s", persistentSetObjectPrefix, pubSubMessageId);
if (persistentSet.contains(flagFileName)) {
// log error and ACK and return
String msg = String.format("PubSub message ID '%s' has been processed before by the Tagger. The message should be ACK to PubSub to stop retries. Please investigate further why the message was retried in the first place.",
pubSubMessageId);
throw new NonRetryableApplicationException(msg);
}
// Query DLP results in BQ and return a dict of bq_column=policy_tag
// lookup key is the DLP "jobId" in case of Standard Mode (to use the clustered column)
// and "project.dataset.table" in case of Auto DLP (as there are no jobIds in that case)
TablePolicyTags tablePolicyTags = findingsReader.getFieldsToPolicyTagsMap(lookUpKey);
// TODO: do we really need appliedFieldsToPolicyTags or can we just return tablePolicyTags.getFieldsPolicyTags()
Map<String, String> appliedFieldsToPolicyTags = new HashMap<>();
// if we have DLP findings for this table or dlpJob
if (tablePolicyTags != null) {
Map<String, PolicyTagInfo> computedFieldsToPolicyTagsMap = tablePolicyTags.getFieldsPolicyTags();
TableSpec targetTableSpec = tablePolicyTags.getTableSpec();
// AutoDLP mode and re-tagging runs in Standard mode could potentially contain tables that were deleted.
// For that we need to check if the table exists before attempting to apply tags
// When a table is not found a com.google.api.client.googleapis.json.GoogleJsonResponseException
if (bqService.tableExists(targetTableSpec)) {
logger.logInfoWithTracker(request.getTrackingId(),
String.format("Computed Fields to Policy Tags mapping : %s",
computedFieldsToPolicyTagsMap.toString()));
// construct a map of table labels based on the common labels and info type labels
Map<String, String> tableLabels = generateTableLabelsFromDlpFindings(tablePolicyTags,
config.getInfoTypeMap());
//log found labels for this table
for (Map.Entry<String, String> labelEntry : tableLabels.entrySet()) {
logger.logLabelsHistory(targetTableSpec,
labelEntry.getKey(),
labelEntry.getValue(),
config.isDryRunLabels(),
request.getTrackingId());
}
// Apply policy tags to columns in BigQuery
// If isDryRun = True no actual tagging will happen on BigQuery and Dry-Run log entries will be written instead
List<TableFieldSchema> updatedFields = applyPolicyTagsAndLabels(
targetTableSpec,
computedFieldsToPolicyTagsMap,
tableLabels,
config.getAppOwnedTaxonomies(),
config.isDryRunTags(),
config.isDryRunLabels(),
request.getTrackingId());
appliedFieldsToPolicyTags = mapFieldsToPolicyTags(updatedFields);
} else {
// if the table doesn't exist anymore on BigQuery
logger.logWarnWithTracker(request.getTrackingId(),
String.format(
"Table %s doesn't exist anymore in BigQuery and no tagging could be applied",
targetTableSpec.toSqlString()
));
}
} else {
// if we don't have DLP findings for this table or dlpJob
logger.logInfoWithTracker(request.getTrackingId(),
String.format(
"No DLP InfoTypes or mapped policy tags are found for lookUpKey '%s'",
lookUpKey
));
}
// Add a flag key marking that we already completed this request and no additional runs
// are required in case PubSub is in a loop of retrying due to ACK timeout while the service has already processed the request
// This is an extra measure to avoid unnecessary BigQuery cost due to config issues.
logger.logInfoWithTracker(request.getTrackingId(), String.format("Persisting processing key for PubSub message ID %s", pubSubMessageId));
persistentSet.add(flagFileName);
logger.logFunctionEnd(request.getTrackingId());
return appliedFieldsToPolicyTags;
}