in services/dispatcher-inspection-app/src/main/java/com/google/cloud/pso/bq_pii_classifier/dispatcher/InspectionDispatcherController.java [66:120]
public ResponseEntity receiveMessage(@RequestBody PubSubEvent requestBody) {
String runId = TrackingHelper.generateInspectionRunId();
String state = "";
try {
if (requestBody == null || requestBody.getMessage() == null) {
String msg = "Bad Request: invalid message format";
logger.logSevereWithTracker(runId, msg);
throw new NonRetryableApplicationException("Request body or message is Null.");
}
String requestJsonString = requestBody.getMessage().dataToUtf8String();
// remove any escape characters (e.g. from Terraform
requestJsonString = requestJsonString.replace("\\", "");
logger.logInfoWithTracker(runId, String.format("Received payload: %s", requestJsonString));
BigQueryScope bqScope = gson.fromJson(requestJsonString, BigQueryScope.class);
logger.logInfoWithTracker(runId, String.format("Parsed JSON input %s ", bqScope.toString()));
Dispatcher dispatcher = new Dispatcher(
environment.toConfig(),
new BigQueryServiceImpl(),
new PubSubServiceImpl(),
new BigQueryScannerImpl(),
new GCSPersistentSetImpl(environment.getGcsFlagsBucket()),
"inspection-dispatcher-flags",
runId
);
PubSubPublishResults results = dispatcher.execute(bqScope, requestBody.getMessage().getMessageId());
state = String.format("Publishing results: %s SUCCESS MESSAGES and %s FAILED MESSAGES",
results.getSuccessMessages().size(),
results.getFailedMessages().size());
logger.logInfoWithTracker(runId, state);
} catch (Exception e) {
logger.logNonRetryableExceptions(runId, e);
state = String.format("ERROR '%s'", e.getMessage());
}
// Always ACK the pubsub message to avoid retries
// The dispatcher is the entry point and retrying it could cause
// unnecessary runs and costs
return new ResponseEntity(
String.format("Process completed with state = %s", state),
HttpStatus.OK);
}