in services/tagger-app/src/main/java/com/google/cloud/pso/bq_pii_classifier/tagger/TaggerController.java [134:237]
private Operation parseEvent(PubSubEvent event) throws NonRetryableApplicationException {
String defaultTrackingId = "0000000000000-z";
// check if the message is sent from a Standard DLP inspection job, if so map it to TaggerDlpJobRequest
logger.logInfoWithTracker(defaultTrackingId,"Attempt: Will try to parse request as TaggerDlpJobRequest from Standard DLP Mode..");
if (event.getMessage().getAttributes() != null) {
String dlpJobName = event.getMessage().getAttributes().getOrDefault("DlpJobName", "");
if (!dlpJobName.isBlank()) {
logger.logInfoWithTracker(defaultTrackingId, String.format(
"Parsed DlpJobName '%s'",
dlpJobName
));
String trackingId = TrackingHelper.extractTrackingIdFromJobName(dlpJobName);
String runId = TrackingHelper.parseRunIdAsPrefix(trackingId);
TaggerDlpJobRequest taggerDlpJobRequest = new TaggerDlpJobRequest(
runId,
trackingId,
dlpJobName
);
logger.logInfoWithTracker(taggerDlpJobRequest.getTrackingId(),
String.format("Final: Parsed Request from Standard DLP: %s", taggerDlpJobRequest.toString()));
// CASE 1: TaggerDlpJobRequest in Standard Mode from a DLP PubSub notification
return taggerDlpJobRequest;
}
}
try {
String requestJsonString = event.getMessage().dataToUtf8String();
// remove any escape characters (e.g. from Terraform
requestJsonString = requestJsonString.replace("\\", "");
logger.logInfoWithTracker(defaultTrackingId, String.format("Received payload: %s", requestJsonString));
TaggerDlpJobRequest taggerDlpJobRequest = gson.fromJson(requestJsonString, TaggerDlpJobRequest.class);
if (taggerDlpJobRequest.getDlpJobName() != null && !taggerDlpJobRequest.getDlpJobName().isEmpty()) {
logger.logInfoWithTracker(taggerDlpJobRequest.getTrackingId(),
String.format("Final: parsed Request from Tagging Dispatcher in Standard Mode: %s", taggerDlpJobRequest));
// CASE 2: TaggerDlpJobRequest from the Tagging Dispatcher Service in Standard Mode
return taggerDlpJobRequest;
} else {
TaggerTableSpecRequest taggerTableRequest = gson.fromJson(requestJsonString, TaggerTableSpecRequest.class);
if (taggerTableRequest.getTargetTable() == null) {
throw new NonRetryableApplicationException("Failed to parse Tagger request as a valid TaggerDlpJobRequest or TaggerTableSpecRequest");
}
logger.logInfoWithTracker(taggerTableRequest.getTrackingId(),
String.format("Final: Parsed Request from Tagging Dispatcher in Auto DLP Mode: %s", taggerTableRequest));
// CASE 3: TaggerTableSpecRequest from the Tagging Dispatcher Service in Auto-DLP Mode
return taggerTableRequest;
}
} catch (Exception ex) {
// if not, try to parse it as a proto if it comes from Auto DLP and map it to TaggerTableSpecRequest
try {
byte[] data = event.getMessage().getData();
DataProfilePubSubMessage dataProfilePubSubMessage = DataProfilePubSubMessage.parseFrom(data);
logger.logInfoWithTracker(defaultTrackingId, String.format(
"Parsed DataProfilePubSubMessage= '%s'",
dataProfilePubSubMessage
));
TableSpec targetTable = TableSpec.fromFullResource(dataProfilePubSubMessage.getProfile().getFullResource());
String runId = TrackingHelper.generateOneTimeTaggingSuffix();
String trackingId = TrackingHelper.generateTrackingId(runId, targetTable.toSqlString());
TaggerTableSpecRequest taggerTableSpecRequestFromAutoDlp = new TaggerTableSpecRequest(
runId,
trackingId,
targetTable
);
logger.logInfoWithTracker(taggerTableSpecRequestFromAutoDlp.getTrackingId(),
String.format("Parsed Request from Auto DLP notifications : %s", taggerTableSpecRequestFromAutoDlp.toString()));
// CASE 4: TaggerTableSpecRequest from Auto DLP Notification
return taggerTableSpecRequestFromAutoDlp;
} catch (Exception ex2) {
throw new NonRetryableApplicationException(
String.format("Couldn't parse PubSub event as Proto: %s : %s",
ex2.getClass().getSimpleName(),
ex2.getMessage()
));
}
}
}