in services/dispatcher-app/src/main/java/com/google/cloud/pso/bq_snapshot_manager/dispatcher/DispatcherController.java [67:132]
public ResponseEntity receiveMessage(@RequestBody PubSubEvent requestBody) {
String runId = TrackingHelper.MIN_RUN_ID;
String state = "";
// These values will be updated based on the execution flow and logged at the end
try {
if (requestBody == null || requestBody.getMessage() == null) {
String msg = "Bad Request: invalid message format";
logger.logSevereWithTracker(runId, null, msg);
throw new NonRetryableApplicationException("Request body or message is Null.");
}
String requestJsonString = requestBody.getMessage().dataToUtf8String();
// remove any escape characters (e.g. from Terraform
requestJsonString = requestJsonString.replace("\\", "");
logger.logInfoWithTracker(runId, null, String.format("Received payload: %s", requestJsonString));
DispatcherRequest dispatcherRequest = gson.fromJson(requestJsonString, DispatcherRequest.class);
if(dispatcherRequest.isDryRun()){
runId = TrackingHelper.generateDryRunId();
}else{
if(dispatcherRequest.isForceRun()){
runId = TrackingHelper.generateForcedRunId();
}else{
runId = TrackingHelper.generateHeartBeatRunId();
}
}
logger.logInfoWithTracker(dispatcherRequest.isDryRun(), runId, null, String.format("Parsed dispatcher request %s ", dispatcherRequest.toString()));
Dispatcher dispatcher = new Dispatcher(
environment.toConfig(),
new PubSubServiceImpl(),
new ResourceScannerImpl(),
new GCSPersistentSetImpl(environment.getGcsFlagsBucket()),
"dispatcher-flags",
functionNumber,
runId
);
PubSubPublishResults results = dispatcher.execute(dispatcherRequest, requestBody.getMessage().getMessageId());
state = String.format("Publishing results: %s SUCCESS MESSAGES and %s FAILED MESSAGES",
results.getSuccessMessages().size(),
results.getFailedMessages().size());
logger.logInfoWithTracker(dispatcherRequest.isDryRun(), runId, null, state);
} catch (Exception e) {
logger.logNonRetryableExceptions(runId, null, e);
state = String.format("ERROR '%s'", e.getMessage());
}
// Always ACK the pubsub message to avoid retries
// The dispatcher is the entry point and retrying it could cause
// unnecessary runs and costs
return new ResponseEntity(
String.format("Process completed with state = %s", state),
HttpStatus.OK);
}