def preprocess_finetuning()

in use-cases/model-fine-tuning-pipeline/data-processing/ray/src/preprocessing_finetuning.py [0:0]


def preprocess_finetuning():
    """Preprocesses a raw dataset for fine-tuning a model.

    This function performs several steps to prepare data for fine-tuning, including:

    1. **Data Loading:** Loads raw data from a CSV file stored in Google Cloud Storage (GCS).
    2. **Data Cleaning:** Cleans and filters the data, selecting required columns and handling null values.
    3. **Data Chunking:** Splits the data into smaller chunks for parallel processing using Ray.
    4. **Download Images:** Uses Ray to distribute the data preprocessing task to download images.
    5. **Data Storage:** Stores the preprocessed data as a CSV file back to GCS.

    The function utilizes several global variables (e.g., `IMAGE_BUCKET`, `RAY_CLUSTER_HOST`, `GCS_IMAGE_FOLDER`) and relies on custom classes like `DataLoader`, `DataPrep`, and `RayUtils` for specific tasks.  It also configures signal handlers for graceful shutdown and sets up a Ray runtime environment with required Python modules and pip packages.

    Returns:
        None. The function saves the preprocessed data to a GCS location.
    """
    logger.info("Configure signal handlers")
    signal.signal(signal.SIGINT, graceful_shutdown)
    signal.signal(signal.SIGTERM, graceful_shutdown)
    input_processing_file = "flipkart_raw_dataset/flipkart_com-ecommerce_sample.csv"
    output_processing_file = "/flipkart_preprocessed_dataset/flipkart.csv"
    rag_output_file = "/RAG/master_product_catalog.csv"
    required_cols = [
        "uniq_id",
        "product_name",
        "description",
        "brand",
        "image",
        "product_specifications",
        "product_category_tree",
    ]
    filter_null_cols = [
        "description",
        "image",
        "product_specifications",
        "product_category_tree",
    ]
    ray_resources = {"cpu": 1}
    ray_runtime_env = {
        "py_modules": ["./datapreprocessing"],  # Path to your module's directory
        "pip": [
            "google-cloud-storage==2.19.0",
            "spacy==3.7.6",
            "jsonpickle==4.0.1",
            "pandas==2.2.3",
            "pydantic==2.10.5",
        ],
        "env_vars": {"PIP_NO_CACHE_DIR": "1", "PIP_DISABLE_PIP_VERSION_CHECK": "1"},
    }
    chunk_size = 199
    # The following 4 parameters define which method to run as ray remote
    package_name = "datapreprocessing"
    module_name = "datacleaner"
    class_name = "DataPreprocessor"
    method_name = "process_data"

    logger.info("Started")
    data_loader = DataLoader(IMAGE_BUCKET, input_processing_file)
    df = data_loader.load_raw_data()

    data_prep = DataPrep(df, required_cols, filter_null_cols, chunk_size)
    df = data_prep.update_dataframe()

    # Chunk the dataset
    res = data_prep.split_dataframe()

    # create a RayUtils object with the info required to run a task
    ray_obj = RayUtils(
        RAY_CLUSTER_HOST,
        ray_resources,
        ray_runtime_env,
        package_name,
        module_name,
        class_name,
        method_name,
        res,
        IMAGE_BUCKET,
        GCS_IMAGE_FOLDER,
    )
    result_df = ray_obj.run_remote()
    # Replace NaN with None
    result_df = result_df.replace({np.nan: None})

    # Store the preprocessed data into GCS
    result_df.to_csv(
        "gs://" + IMAGE_BUCKET + output_processing_file,
        index=False,
    )
    logger.info("Finished")