public ConfiguratorResponse execute()

in services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f02_configurator/Configurator.java [88:247]


    public ConfiguratorResponse execute(ConfiguratorRequest request, String pubSubMessageId) throws IOException, NonRetryableApplicationException, InterruptedException {

        // run common service start logging and checks
        Utils.runServiceStartRoutines(
                logger,
                request,
                persistentSet,
                persistentSetObjectPrefix,
                request.getTrackingId()
        );

        // 1. Find the backup policy of this table
        Tuple<BackupPolicyAndState, String> backupPolicyTuple = getBackupPolicyAndState(request);
        BackupPolicyAndState backupPolicy = backupPolicyTuple.x();

        // 2a. Determine if we should take a backup at this run given the policy CRON expression
        // if the table has been backed up before then check if we should backup at this run
        boolean isBackupCronTime = isBackupCronTime(
                request.getTargetTable(),
                backupPolicy.getCron(),
                request.getRefTimestamp(),
                backupPolicy.getConfigSource(),
                backupPolicy.getLastBackupAt(),
                logger,
                request.getTrackingId()
        );

        // 2b. Check if the table has been created before the desired time travel
        Tuple<TableSpec, Long> sourceTableWithTimeTravelTuple = Utils.getTableSpecWithTimeTravel(
                request.getTargetTable(),
                backupPolicy.getTimeTravelOffsetDays(),
                request.getRefTimestamp()
        );

        // API Call
        Long tableCreationTime = bqService.getTableCreationTime(request.getTargetTable());

        // check if the table is created after the time travel timestamp
        boolean isTableCreatedBeforeTimeTravel = tableCreationTime < sourceTableWithTimeTravelTuple.y();


        // To take a backup, the backup cron expression must match this run and the table has to be around for
        // enough time to apply the desired time travel. Otherwise, skip the backup this run
        boolean isBackupTime = isBackupTime(request.isForceRun(), isBackupCronTime, isTableCreatedBeforeTimeTravel);

        logger.logInfoWithTracker(
                request.isDryRun(),
                request.getTrackingId(),
                request.getTargetTable(),
                String.format("isBackupTime for this run is '%s'. Calculated based on isForceRun=%s, isBackupCronTime=%s, isTableCreatedBeforeTimeTravel=%s ",
                        isBackupTime, request.isForceRun(), isBackupCronTime, isTableCreatedBeforeTimeTravel)
        );

        // 3. Prepare and send the backup request(s) if required
        SnapshoterRequest bqSnapshotRequest = null;
        SnapshoterRequest gcsSnapshotRequest = null;
        PubSubPublishResults bqSnapshotPublishResults = null;
        PubSubPublishResults gcsSnapshotPublishResults = null;
        if (isBackupTime) {
            Tuple<SnapshoterRequest, SnapshoterRequest> snapshotRequestsTuple = prepareSnapshotRequests(
                    backupPolicy,
                    request
            );

            List<JsonMessage> bqSnapshotRequests = new ArrayList<>(1);
            if (snapshotRequestsTuple.x() != null) {
                bqSnapshotRequest = snapshotRequestsTuple.x();
                bqSnapshotRequests.add(bqSnapshotRequest);
            }

            List<JsonMessage> gcsSnapshotRequests = new ArrayList<>(1);
            if (snapshotRequestsTuple.y() != null) {
                gcsSnapshotRequest = snapshotRequestsTuple.y();
                gcsSnapshotRequests.add(gcsSnapshotRequest);
            }

            // Publish the list of bq snapshot requests to PubSub
            bqSnapshotPublishResults = pubSubService.publishTableOperationRequests(
                    config.getProjectId(),
                    config.getBigQuerySnapshoterTopic(),
                    bqSnapshotRequests
            );

            // Publish the list of gcs snapshot requests to PubSub
            gcsSnapshotPublishResults = pubSubService.publishTableOperationRequests(
                    config.getProjectId(),
                    config.getGcsSnapshoterTopic(),
                    gcsSnapshotRequests
            );

            if (!bqSnapshotPublishResults.getSuccessMessages().isEmpty()) {
                logger.logInfoWithTracker(
                        request.isDryRun(),
                        request.getTrackingId(),
                        request.getTargetTable(),
                        String.format("Published %s BigQuery Snapshot requests %s",
                                bqSnapshotPublishResults.getSuccessMessages().size(),
                                bqSnapshotPublishResults.getSuccessMessages())
                );
            }

            if (!gcsSnapshotPublishResults.getSuccessMessages().isEmpty()) {
                logger.logInfoWithTracker(
                        request.isDryRun(),
                        request.getTrackingId(),
                        request.getTargetTable(),
                        String.format("Published %s GCS Snapshot requests %s",
                                gcsSnapshotPublishResults.getSuccessMessages().size(),
                                gcsSnapshotPublishResults.getSuccessMessages())
                );
            }

            if (!bqSnapshotPublishResults.getFailedMessages().isEmpty()) {
                logger.logWarnWithTracker(
                        request.isDryRun(),
                        request.getTrackingId(),
                        request.getTargetTable(),
                        String.format("Failed to publish BigQuery Snapshot request %s", bqSnapshotPublishResults.getFailedMessages().toString())

                );
            }

            if (!gcsSnapshotPublishResults.getFailedMessages().isEmpty()) {
                logger.logWarnWithTracker(
                        request.isDryRun(),
                        request.getTrackingId(),
                        request.getTargetTable(),
                        String.format("Failed to publish GCS Snapshot request %s", gcsSnapshotPublishResults.getFailedMessages().toString())

                );
            }
        }

        // run common service end logging and adding pubsub message to processed list
        Utils.runServiceEndRoutines(
                logger,
                request,
                persistentSet,
                persistentSetObjectPrefix,
                request.getTrackingId()
        );


        return new ConfiguratorResponse(
                request.getTargetTable(),
                request.getRunId(),
                request.getTrackingId(),
                request.isDryRun(),
                backupPolicy,
                backupPolicyTuple.y(), // source of the backup policy
                request.getRefTimestamp(),
                isBackupCronTime,
                isTableCreatedBeforeTimeTravel,
                isBackupTime,
                bqSnapshotRequest,
                gcsSnapshotRequest,
                bqSnapshotPublishResults,
                gcsSnapshotPublishResults
        );
    }