in services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f01_dispatcher/Dispatcher.java [75:161]
public PubSubPublishResults execute(DispatcherRequest dispatcherRequest, String pubSubMessageId) throws IOException, NonRetryableApplicationException, InterruptedException {
/*
Check if we already processed this pubSubMessageId before to avoid re-running the dispatcher (and the whole process)
in case we have unexpected errors with PubSub re-sending the message. This is an extra measure to avoid unnecessary cost.
We do that by keeping simple flag files in GCS with the pubSubMessageId as file name.
*/
String flagFileName = String.format("%s/%s", persistentSetObjectPrefix, pubSubMessageId);
if (persistentSet.contains(flagFileName)) {
// log error and ACK and return
String msg = String.format("PubSub message ID '%s' has been processed before by the dispatcher. The message should be ACK to PubSub to stop retries. Please investigate further why the message was retried in the first place.",
pubSubMessageId);
throw new NonRetryableApplicationException(msg);
} else {
logger.logInfoWithTracker(runId,
null,
String.format("Persisting processing key for PubSub message ID %s", pubSubMessageId));
persistentSet.add(flagFileName);
}
// construct a BigQueryScopeLister using the input resourceScanner implementation
BigQueryScopeLister bqScopeLister = new BigQueryScopeLister(
resourceScanner,
new LoggingHelper(
BigQueryScopeLister.class.getSimpleName(),
functionNumber,
config.getProjectId(),
config.getApplicationName()
),
runId
);
// List down which tables to publish a request for based on the input scan scope
List<TableSpec> tablesInScope = bqScopeLister.listTablesInScope(dispatcherRequest.getBigQueryScope());
// Convert each table in scope to a ConfiguratorRequest to be sent as a PubSub message
List<JsonMessage> pubSubMessagesToPublish = new ArrayList<>();
// use the start time of this run as a reference point in time for CRON checks across all requests in this run
Timestamp refTs = TrackingHelper.parseRunIdAsTimestamp(runId);
for (TableSpec tableSpec : tablesInScope) {
pubSubMessagesToPublish.add(
new ConfiguratorRequest(
tableSpec,
runId,
TrackingHelper.generateTrackingId(runId),
dispatcherRequest.isDryRun(),
dispatcherRequest.isForceRun(),
refTs
)
);
}
// Publish the list of requests to PubSub
PubSubPublishResults publishResults = pubSubService.publishTableOperationRequests(
config.getProjectId(),
config.getOutputTopic(),
pubSubMessagesToPublish
);
// handle failed publishing requests
for (FailedPubSubMessage msg : publishResults.getFailedMessages()) {
ConfiguratorRequest request = (ConfiguratorRequest) msg.getMsg();
String logMsg = String.format("Failed to publish this PubSub messages %s", msg.toString());
logger.logWarnWithTracker(runId, request.getTargetTable(), logMsg);
logger.logFailedDispatcherEntityId(
request.getTrackingId(),
request.getTargetTable(),
request.getTargetTable().toSqlString(),
msg.getExceptionMessage(),
msg.getExceptionClass()
);
}
// handle success publishing requests
for (SuccessPubSubMessage msg : publishResults.getSuccessMessages()) {
// this enable us to detect dispatched messages within a runId that fail in later stages (i.e. Tagger)
ConfiguratorRequest request = (ConfiguratorRequest) msg.getMsg();
logger.logSuccessDispatcherTrackingId(runId, request.getTrackingId(), request.getTargetTable());
}
logger.logFunctionEnd(runId, null);
return publishResults;
}