public PubSubPublishResults execute()

in services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f01_dispatcher/Dispatcher.java [75:161]


    public PubSubPublishResults execute(DispatcherRequest dispatcherRequest, String pubSubMessageId) throws IOException, NonRetryableApplicationException, InterruptedException {

        /*
           Check if we already processed this pubSubMessageId before to avoid re-running the dispatcher (and the whole process)
           in case we have unexpected errors with PubSub re-sending the message. This is an extra measure to avoid unnecessary cost.
           We do that by keeping simple flag files in GCS with the pubSubMessageId as file name.
         */
        String flagFileName = String.format("%s/%s", persistentSetObjectPrefix, pubSubMessageId);
        if (persistentSet.contains(flagFileName)) {
            // log error and ACK and return
            String msg = String.format("PubSub message ID '%s' has been processed before by the dispatcher. The message should be ACK to PubSub to stop retries. Please investigate further why the message was retried in the first place.",
                    pubSubMessageId);
            throw new NonRetryableApplicationException(msg);
        } else {
            logger.logInfoWithTracker(runId,
                    null,
                    String.format("Persisting processing key for PubSub message ID %s", pubSubMessageId));
            persistentSet.add(flagFileName);
        }

        // construct a BigQueryScopeLister using the input resourceScanner implementation
        BigQueryScopeLister bqScopeLister = new BigQueryScopeLister(
                resourceScanner,
                new LoggingHelper(
                        BigQueryScopeLister.class.getSimpleName(),
                        functionNumber,
                        config.getProjectId(),
                        config.getApplicationName()
                ),
                runId
        );

        // List down which tables to publish a request for based on the input scan scope
        List<TableSpec> tablesInScope = bqScopeLister.listTablesInScope(dispatcherRequest.getBigQueryScope());

        // Convert each table in scope to a ConfiguratorRequest to be sent as a PubSub message
        List<JsonMessage> pubSubMessagesToPublish = new ArrayList<>();
        // use the start time of this run as a reference point in time for CRON checks across all requests in this run
        Timestamp refTs = TrackingHelper.parseRunIdAsTimestamp(runId);
        for (TableSpec tableSpec : tablesInScope) {
            pubSubMessagesToPublish.add(
                    new ConfiguratorRequest(
                            tableSpec,
                            runId,
                            TrackingHelper.generateTrackingId(runId),
                            dispatcherRequest.isDryRun(),
                            dispatcherRequest.isForceRun(),
                            refTs
                    )
            );
        }

        // Publish the list of requests to PubSub
        PubSubPublishResults publishResults = pubSubService.publishTableOperationRequests(
                config.getProjectId(),
                config.getOutputTopic(),
                pubSubMessagesToPublish
        );

        // handle failed publishing requests
        for (FailedPubSubMessage msg : publishResults.getFailedMessages()) {
            ConfiguratorRequest request = (ConfiguratorRequest) msg.getMsg();

            String logMsg = String.format("Failed to publish this PubSub messages %s", msg.toString());
            logger.logWarnWithTracker(runId, request.getTargetTable(), logMsg);

            logger.logFailedDispatcherEntityId(
                    request.getTrackingId(),
                    request.getTargetTable(),
                    request.getTargetTable().toSqlString(),
                    msg.getExceptionMessage(),
                    msg.getExceptionClass()
            );
        }

        // handle success publishing requests
        for (SuccessPubSubMessage msg : publishResults.getSuccessMessages()) {
            // this enable us to detect dispatched messages within a runId that fail in later stages (i.e. Tagger)
            ConfiguratorRequest request = (ConfiguratorRequest) msg.getMsg();

            logger.logSuccessDispatcherTrackingId(runId, request.getTrackingId(), request.getTargetTable());
        }

        logger.logFunctionEnd(runId, null);

        return publishResults;
    }