tools/datagen-bq-to-bq/main.py [45:114]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    local_snowfakery_output_base = config_vars.LOCAL_OUTPUT_BASE_DIR

    try:
        # 1. Export BigQuery Data to GCS (if source is BigQuery)
        if config_vars.SOURCE_TYPE == "BigQuery":
            print("\\n--- Exporting BigQuery tables to GCS ---")
            # The export function in notebook populates a global 'input_gcs_path', here it returns it
            input_gcs_path = bq_ops.export_bigquery_table_to_bq_gcs_staging(
                bq_client_main,
                config_vars.input_bq_table_names,  # Pass the string of table names
                staging_path_bigquery,  # Pass the batch-specific path
                input_gcs_path,
            )
            if not input_gcs_path:  # Check if the dictionary is empty
                raise Exception(
                    "Failed to export BigQuery tables or input_gcs_path map is empty."
                )
        # Add elif for SOURCE_TYPE == "GCS" if needed, similar to previous response

        # 2. Pre-process Source Files
        print("\\n--- Pre-processing source files ---")
        # This function in notebook uses/updates global 'table_attributes', here it returns it
        table_attributes = file_processing_utils.file_pre_processing(
            gemini_model,
            input_gcs_path,  # Pass the map generated above
            staging_gcs_path,  # Pass the general staging path for intermediate files
            header_gcs_path,  # Pass the (potentially empty) header_gcs_path map
            table_attributes,
        )
        if not table_attributes or not any(
            attrs.get("staging_gcs_path") for attrs in table_attributes.values()
        ):
            raise Exception(
                "File preprocessing failed or no staging paths were generated."
            )

        # 3. Start Audit Log
        print("\\n--- Logging start entries to audit table ---")
        audit_utils.start_audit_log(
            bq_client_main, batch_id, input_gcs_path, table_attributes, header_gcs_path
        )

        # 4. Generate Synthetic Data using Snowfakery (combines recipe gen and run)
        print("\\n--- Generating synthetic data (recipes and execution) ---")
        # Create a batch-specific local output directory for Snowfakery
        local_snowfakery_output_batch = os.path.join(
            local_snowfakery_output_base, str(batch_id)
        )
        if os.path.exists(local_snowfakery_output_batch):
            shutil.rmtree(local_snowfakery_output_batch)
        os.makedirs(local_snowfakery_output_batch, exist_ok=True)

        # The 'generate_output_data' function in notebook handles recipe creation and generation
        generation_successful = snowfakery_gen.generate_output_data(
            gemini_model,
            table_attributes,  # Pass current table_attributes
            local_snowfakery_output_batch,  # Pass local output dir for this batch
        )
        if not generation_successful:
            raise Exception("Snowfakery data generation process failed.")

        # 5. Post-process Generated Files
        print("\\n--- Post-processing generated files ---")
        # This function updates and returns table_attributes with num_records_generated and output_gcs_path
        table_attributes = file_processing_utils.file_post_processing(
            input_gcs_path,  # Pass original input_gcs_path for iteration reference
            table_attributes,  # Pass current table_attributes
            output_gcs_path,  # Base GCS path for final outputs (batch specific)
            local_snowfakery_output_batch,  # Local directory where generated files are
        )
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



tools/datagen-gcs-to-gcs/main.py [45:114]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    local_snowfakery_output_base = config_vars.LOCAL_OUTPUT_BASE_DIR

    try:
        # 1. Export BigQuery Data to GCS (if source is BigQuery)
        if config_vars.SOURCE_TYPE == "BigQuery":
            print("\\n--- Exporting BigQuery tables to GCS ---")
            # The export function in notebook populates a global 'input_gcs_path', here it returns it
            input_gcs_path = bq_ops.export_bigquery_table_to_bq_gcs_staging(
                bq_client_main,
                config_vars.input_bq_table_names,  # Pass the string of table names
                staging_path_bigquery,  # Pass the batch-specific path
                input_gcs_path,
            )
            if not input_gcs_path:  # Check if the dictionary is empty
                raise Exception(
                    "Failed to export BigQuery tables or input_gcs_path map is empty."
                )
        # Add elif for SOURCE_TYPE == "GCS" if needed, similar to previous response

        # 2. Pre-process Source Files
        print("\\n--- Pre-processing source files ---")
        # This function in notebook uses/updates global 'table_attributes', here it returns it
        table_attributes = file_processing_utils.file_pre_processing(
            gemini_model,
            input_gcs_path,  # Pass the map generated above
            staging_gcs_path,  # Pass the general staging path for intermediate files
            header_gcs_path,  # Pass the (potentially empty) header_gcs_path map
            table_attributes,
        )
        if not table_attributes or not any(
            attrs.get("staging_gcs_path") for attrs in table_attributes.values()
        ):
            raise Exception(
                "File preprocessing failed or no staging paths were generated."
            )

        # 3. Start Audit Log
        print("\\n--- Logging start entries to audit table ---")
        audit_utils.start_audit_log(
            bq_client_main, batch_id, input_gcs_path, table_attributes, header_gcs_path
        )

        # 4. Generate Synthetic Data using Snowfakery (combines recipe gen and run)
        print("\\n--- Generating synthetic data (recipes and execution) ---")
        # Create a batch-specific local output directory for Snowfakery
        local_snowfakery_output_batch = os.path.join(
            local_snowfakery_output_base, str(batch_id)
        )
        if os.path.exists(local_snowfakery_output_batch):
            shutil.rmtree(local_snowfakery_output_batch)
        os.makedirs(local_snowfakery_output_batch, exist_ok=True)

        # The 'generate_output_data' function in notebook handles recipe creation and generation
        generation_successful = snowfakery_gen.generate_output_data(
            gemini_model,
            table_attributes,  # Pass current table_attributes
            local_snowfakery_output_batch,  # Pass local output dir for this batch
        )
        if not generation_successful:
            raise Exception("Snowfakery data generation process failed.")

        # 5. Post-process Generated Files
        print("\\n--- Post-processing generated files ---")
        # This function updates and returns table_attributes with num_records_generated and output_gcs_path
        table_attributes = file_processing_utils.file_post_processing(
            input_gcs_path,  # Pass original input_gcs_path for iteration reference
            table_attributes,  # Pass current table_attributes
            output_gcs_path,  # Base GCS path for final outputs (batch specific)
            local_snowfakery_output_batch,  # Local directory where generated files are
        )
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



