in services/dispatcher-tagging-app/src/main/java/com/google/cloud/pso/bq_pii_classifier/dispatcher/TaggingDispatcherController.java [65:137]
public ResponseEntity receiveMessage(@RequestBody PubSubEvent requestBody) {
String runId = TrackingHelper.generateTaggingRunId();
String state = "";
try {
if (requestBody == null || requestBody.getMessage() == null) {
String msg = "Bad Request: invalid message format";
logger.logSevereWithTracker(runId, msg);
throw new NonRetryableApplicationException("Request body or message is Null.");
}
String requestJsonString = requestBody.getMessage().dataToUtf8String();
// remove any escape characters (e.g. from Terraform
requestJsonString = requestJsonString.replace("\\", "");
logger.logInfoWithTracker(runId, String.format("Received payload: %s", requestJsonString));
BigQueryScope bqScope = gson.fromJson(requestJsonString, BigQueryScope.class);
logger.logInfoWithTracker(runId, String.format("Parsed JSON input %s ", bqScope.toString()));
Scanner dlpResultsScanner;
if (environment.getIsAutoDlpMode()){
dlpResultsScanner = new AutoDlpResultsScannerImpl(
environment.getProjectId(),
environment.getSolutionDataset(),
environment.getDlpTableAuto(),
new BigQueryServiceImpl()
);
}else{
dlpResultsScanner = new StandardDlpResultsScannerImpl(
environment.getProjectId(),
environment.getSolutionDataset(),
environment.getDlpTableStandard(),
environment.getLoggingTable(),
new BigQueryServiceImpl()
);
}
Dispatcher dispatcher = new Dispatcher(
environment.toConfig(),
new BigQueryServiceImpl(),
new PubSubServiceImpl(),
dlpResultsScanner,
new GCSPersistentSetImpl(environment.getGcsFlagsBucket()),
"tagging-dispatcher-flags",
runId
);
PubSubPublishResults results = dispatcher.execute(bqScope, requestBody.getMessage().getMessageId());
state = String.format("Publishing results: %s SUCCESS MESSAGES and %s FAILED MESSAGES",
results.getSuccessMessages().size(),
results.getFailedMessages().size());
logger.logInfoWithTracker(runId, state);
} catch (Exception e) {
logger.logNonRetryableExceptions(runId, e);
state = String.format("ERROR '%s'", e.getMessage());
}
// Always ACK the pubsub message to avoid retries
// The dispatcher is the entry point and retrying it could cause
// unnecessary runs and costs
return new ResponseEntity(
String.format("Process completed with state = %s", state),
HttpStatus.OK);
}