public GCSSnapshoterResponse execute()

in services/library/src/main/java/com/google/cloud/pso/bq_snapshot_manager/functions/f03_snapshoter/GCSSnapshoter.java [95:216]


    public GCSSnapshoterResponse execute(SnapshoterRequest request, Timestamp operationTs, String pubSubMessageId) throws IOException, NonRetryableApplicationException, InterruptedException {

        // run common service start logging and checks
        Utils.runServiceStartRoutines(
                logger,
                request,
                persistentSet,
                persistentSetObjectPrefix,
                request.getTrackingId()
        );


        // validate required input
        validateRequest(request);

        // Perform the Snapshot operation using the BigQuery service

        // time travel is calculated relative to the operation time
        Tuple<TableSpec, Long> sourceTableWithTimeTravelTuple = Utils.getTableSpecWithTimeTravel(
                request.getTargetTable(),
                request.getBackupPolicyAndState().getTimeTravelOffsetDays(),
                operationTs
        );

        // construct the backup folder for this run in the format project/dataset/table/trackingid/timetravelstamp
        String backupFolder = String.format("%s/%s/%s/%s/%s/%s",
                request.getTargetTable().getProject(),
                request.getTargetTable().getDataset(),
                request.getTargetTable().getTable(),
                request.getTrackingId(),
                sourceTableWithTimeTravelTuple.y(), // include the time travel millisecond for transparency
                request.getBackupPolicyAndState().getGcsExportFormat()
                );

        String gcsDestinationUri = prepareGcsUriForMultiFileExport(
                request.getBackupPolicyAndState().getGcsSnapshotStorageLocation(),
                backupFolder
                );


        Timestamp timeTravelTs = Timestamp.ofTimeSecondsAndNanos(sourceTableWithTimeTravelTuple.y() / 1000, 0);
        logger.logInfoWithTracker(
                request.isDryRun(),
                request.getTrackingId(),
                request.getTargetTable(),
                String.format("Will take a GCS Snapshot for '%s' to '%s' with time travel timestamp '%s' (%s days)",
                        request.getTargetTable().toSqlString(),
                        gcsDestinationUri,
                        timeTravelTs,
                        request.getBackupPolicyAndState().getTimeTravelOffsetDays().getText()
                )
        );

        if(!request.isDryRun()){
            // create an async bq export job

            String jobId = TrackingHelper.generateBQExportJobId(request.getTrackingId(), config.getApplicationName());

            // We create the tagging request and added it to a persistent storage
            // The Tagger service will receive notifications of export job completion via log sinks and pick up the tagger request from the persistent storage
            // Make sure the file is stored first before running the export job. In case of non-fatal error of file creation and retry, we don't re-run the export job
            TaggerRequest taggerRequest = new TaggerRequest(
                    request.getTargetTable(),
                    request.getRunId(),
                    request.getTrackingId(),
                    request.isDryRun(),
                    request.getBackupPolicyAndState(),
                    BackupMethod.GCS_SNAPSHOT,
                    null,
                    gcsDestinationUri,
                    operationTs
            );

            String taggerRequestFile = String.format("%s/%s", persistentMapObjectPrefix, jobId);
            persistentMap.put(taggerRequestFile, taggerRequest.toJsonString());

            Map<String, String> jobLabels = new HashMap<>();
            // labels has to be max 63 chars, contain only lowercase letters, numeric characters, underscores, and dashes. All characters must use UTF-8 encoding, and international characters are allowed.
            jobLabels.put("app", config.getApplicationName());

            // API Call
            bqService.exportToGCS(
                    jobId,
                    sourceTableWithTimeTravelTuple.x(),
                    gcsDestinationUri,
                    request.getBackupPolicyAndState().getGcsExportFormat(),
                    request.getBackupPolicyAndState().getGcsCsvDelimiter(),
                    request.getBackupPolicyAndState().getGcsCsvExportHeader(),
                    request.getBackupPolicyAndState().getGcsUseAvroLogicalTypes(),
                    request.getTrackingId(),
                    jobLabels
            );
        }

        logger.logInfoWithTracker(
                request.isDryRun(),
                request.getTrackingId(),
                request.getTargetTable(),
                String.format("BigQuery GCS export submitted for table %s to %s",
                        request.getTargetTable().toSqlString(),
                        gcsDestinationUri
                )
        );

        // run common service end logging and adding pubsub message to processed list
        Utils.runServiceEndRoutines(
                logger,
                request,
                persistentSet,
                persistentSetObjectPrefix,
                request.getTrackingId()
        );

        return new GCSSnapshoterResponse(
                request.getTargetTable(),
                request.getRunId(),
                request.getTrackingId(),
                request.isDryRun(),
                operationTs,
                sourceTableWithTimeTravelTuple.x()
        );
    }