src/jobs/util/grouping_pipeline.py (267 lines of code) (raw):

from functools import partial import math import numpy as np import pandas as pd from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.pipeline import Pipeline, FeatureUnion from sklearn.base import BaseEstimator, TransformerMixin from sklearn.preprocessing import FunctionTransformer, OneHotEncoder from sklearn.preprocessing import MultiLabelBinarizer from sklearn.feature_extraction import text from sklearn.preprocessing import Normalizer from nltk.stem import PorterStemmer from nltk.tokenize import word_tokenize import nltk from sklearn.metrics.cluster import rand_score, adjusted_rand_score from util.silhouette import silh_find_optimal_k import umap import hdbscan from kneed import KneeLocator from sklearn.cluster import KMeans import warnings from sentence_transformers import SentenceTransformer from util.labeled_data_utils import get_labeled_dataset, user_test_list EMBEDDING_MODEL_MINILM = "all-MiniLM-L6-v2" EMBEDDING_MODEL_MPNET = "all-mpnet-base-v2" EMBEDDING_NOMIC = "nomic-ai/nomic-embed-text-v1.5" EMBEDDING_MODEL_LIST = [EMBEDDING_MODEL_MINILM, EMBEDDING_MODEL_MPNET] T5_BASE_LOCAL_LABEL = "T5 Fine Tuned (Local)" OPENAI_CLOUD_LABEL = "OpenAI (Cloud)" EMBEDDING_TEXT_COLUMN = "emb_text" CUSTOM_STOP_WORDS = ["google", "slides", "docs", "sheets", "search"] CLUSTER_METHODS = ["kmeans", "dbscan"] EMBEDDING_TEXT_COMBINATIONS = ["title", "title+description", "name+description", "title+domain"] DIM_REDUCE_OPTIONS = [0, 5, 15] NUM_CLUSTER_METHODS = ["knee", "silhouette"] TOPIC_GENERATOR_OPTIONS = [T5_BASE_LOCAL_LABEL, OPENAI_CLOUD_LABEL] class ModelProvider: def __init__(self): self.models = {} def get_model(self, name): if name not in self.models: self.models[name] = SentenceTransformer(name, trust_remote_code=True) return self.models[name] def get_prefix(self, name): if name == EMBEDDING_NOMIC: return "clustering: " return None def generate_embedding_features(data, model_name, model_provider: ModelProvider): data_list = data[EMBEDDING_TEXT_COLUMN].values.tolist() prefix = model_provider.get_prefix(model_name) if prefix: data_list = list(map(lambda a: f"{prefix}{a}", data_list)) return model_provider.get_model(model_name).encode(data_list, show_progress_bar=False) def get_title_embedding_transformer(model_name: str, model_provider: ModelProvider): return FunctionTransformer(partial(generate_embedding_features, model_name=model_name, model_provider=model_provider)) class ItemSelector(BaseEstimator, TransformerMixin): def __init__(self, column): self.column = column def fit(self, X, y=None): return self def transform(self, X): return X[self.column] class EmbeddingScaler(BaseEstimator, TransformerMixin): def __init__(self, scale_factor=1.0): self.scale_factor = scale_factor def fit(self, X, y=None): return self def transform(self, X): return X * self.scale_factor class MultiLabelBinarizerWrapper: def __init__(self): self.mlb = MultiLabelBinarizer() def fit(self, X, y=None): self.mlb.fit(X["domain_category_info"].to_list()) return self def transform(self, X): aa = self.mlb.transform(X["domain_category_info"].to_list()) return aa def generate_pipeline(config, model_provider: ModelProvider): history_scale = config["history_scale"] domain_scale = config["domain_scale"] title_embedding_scale = config["title_embedding_scale"] tf_idf_scale = config["tf_idf_scale"] pipeline_domain = Pipeline( [ ("selector", ItemSelector(column=["domain"])), ("domain_features", OneHotEncoder(handle_unknown="ignore")), ('scaler', EmbeddingScaler(scale_factor=domain_scale)) ] ) pipeline_history = Pipeline( [ ("selector", ItemSelector(column=["browse_group"])), ("domain_features", OneHotEncoder(handle_unknown="ignore")), ('scaler', EmbeddingScaler(scale_factor=history_scale)) ] ) # pipeline_domain_category = Pipeline( # [ # ("selector", ItemSelector(column=["domain_category_info"])), # ("domain_cat_features", MultiLabelBinarizerWrapper()), # ('scaler', EmbeddingScaler(scale_factor=domain_category_scale)) # ] # ) title_embedding_transformer = get_title_embedding_transformer(config["embedding_model"], model_provider=model_provider) pipeline_title_embeddings = Pipeline([("title_embedding_features", title_embedding_transformer), ('scaler', EmbeddingScaler(scale_factor=title_embedding_scale))]) stemmer = PorterStemmer() def stem_preprocess(text): tokens = word_tokenize(text) return ' '.join([stemmer.stem(token) for token in tokens]) stop_words = list(text.ENGLISH_STOP_WORDS) # stop_words.extend(CUSTOM_STOP_WORDS) pipeline_tfidf = Pipeline( [ ("selector", ItemSelector(column=EMBEDDING_TEXT_COLUMN)), ( "tfidf_title", TfidfVectorizer( # preprocessor=stemming_tokenizer, ngram_range=(1, 2), stop_words= list(stop_words) + ["google", "search", "sheets", "docs"], max_df=0.95, min_df=3, max_features=1000, ) ), ('scaler', EmbeddingScaler(scale_factor=tf_idf_scale)) ] ) combined_features = FeatureUnion([ ("pipeline_title_embeddings", pipeline_title_embeddings), ("pipeline_tfidf", pipeline_tfidf), ("pipeline_domain", pipeline_domain), ("pipeline_history", pipeline_history) ]) final_pipeline = Pipeline( [ ("features", combined_features), ('normalizer', Normalizer()), ] ) return final_pipeline def add_text_for_embedding(df: pd.DataFrame, fields): has_set = False for col in fields.split("+"): if not has_set: df[EMBEDDING_TEXT_COLUMN] = df[col] has_set = True else: df[EMBEDDING_TEXT_COLUMN] = df[EMBEDDING_TEXT_COLUMN] + ". " + df[col].fillna("") return df def generate_best_cluster_model(embeddings, cluster_space, verbose=False, use_dbscan=True, eps=0.3, num_cluster_method="knee"): """ takes embeddings and returns the best model using the elbow method and kmeans cluster_space is the range to search for the best cluster - e.g. range(1, 50) This can take a while to run for large datasets """ from sklearn.cluster import DBSCAN # HDBSCAN or another clustering algorithm that has .fit and .predict functions and # the .labels_ variable to extract the labels # self.hdbscan_model = hdbscan_model or hdbscan.HDBSCAN( # min_cluster_size=self.min_topic_size, # metric="euclidean", # cluster_selection_method="eom", # prediction_data=True, # ) if use_dbscan: db = hdbscan.HDBSCAN( min_cluster_size=2, metric="euclidean", cluster_selection_method="eom", prediction_data=True ) # db = DBSCAN(eps=eps, min_samples=2, metric='cosine').fit_predict(embeddings) # db = DBSCAN(eps=eps, min_samples=2, metric='euclidian').fit_predict(embeddings) return db if num_cluster_method == "knee": with warnings.catch_warnings(): warnings.simplefilter("ignore") sum_of_squared_distances = [] k_to_model = {} for k in cluster_space: if k > len(embeddings): break model_k = KMeans(n_clusters=k).fit(embeddings) sum_of_squared_distances.append(model_k.inertia_) k_to_model[k] = model_k if verbose: print(k, model_k.inertia_) kn = KneeLocator( cluster_space, sum_of_squared_distances, curve='convex', direction='decreasing', interp_method='interp1d', ) if verbose: print('Best number of clusters: {}'.format(kn.knee)) # kn.knee returns optimal cluster value if kn.knee is None: print("Warning -- knee not found -- defaulting to 4") return k_to_model[4] return k_to_model[kn.knee] else: k = silh_find_optimal_k(embeddings, cluster_space) return KMeans(n_clusters=k).fit(embeddings) def run_pipeline(config, df, saved_set_name=None, model_provider = None): """ Runs a tab grouping pipeline, grouping tabs into clusters and labeling each cluster Args: config: Configuration options df: Dataset saved_set_name: Name to save embeddings as a tsv for other use model_provider: Language model class Returns: dataset, rand score (based on labels), adjusted rand score """ df = df.sample(frac=1) dbscan = config["clustering_method"] == "dbscan" pipeline = generate_pipeline(config, model_provider=model_provider) df = add_text_for_embedding(df, config["text_for_embedding"]) model = pipeline.fit(df) pipeline_result = model.transform(df).toarray() if saved_set_name is not None: np.savetxt(f"output/{saved_set_name}.tsv", pipeline_result, delimiter="\t") df[["title", "smart_group_label"]].to_csv(f"output/{saved_set_name}_labels.tsv", sep="\t") embeddings_as_list = [pipeline_result.tolist() for _row in pipeline_result] if config["remap"] > 0: umap_model = umap.UMAP( n_neighbors=config["remap"], n_components=5, min_dist=0.0, metric="cosine" ) pipeline_result = umap_model.fit_transform(pipeline_result) # embeddings_as_list = [pipeline_result.tolist() for _row in pipeline_result] max_clusters = min(math.floor(math.log(len(embeddings_as_list)) * 2.0 + 1), len(embeddings_as_list)) best_cluster = generate_best_cluster_model(pipeline_result, range(2, max_clusters), verbose=False, use_dbscan=dbscan, eps=config["dbscan_eps"], num_cluster_method=config["num_cluster_method"]) if dbscan: clusters = best_cluster.fit_predict(pipeline_result) else: clusters = best_cluster.predict(pipeline_result) df["predicted_cluster"] = clusters df["embeddings"] = embeddings_as_list rscore = rand_score(df["smart_group_label"], df["predicted_cluster"]) adj_rscore = adjusted_rand_score(df["smart_group_label"], df["predicted_cluster"]) return df, rscore, adj_rscore def get_default_config(): config = {} config["history_scale"] = 0.0 config["domain_scale"] = 0.0 config["title_embedding_scale"] = 1.0 config["tf_idf_scale"] = 0.0 config["clustering_method"] = "kmeans" config["dbscan_eps"] = 0.4 config["remap"] = 5 config["num_cluster_method"] = "knee" config["text_for_embedding"] = "title" config["embedding_model"] = EMBEDDING_MODEL_LIST[0] return config def sweep_params(): all_results = [] dataset_names = user_test_list for dataset_id in dataset_names: datasets, labeled_topics = get_labeled_dataset(dataset_id) model_provider = ModelProvider() for embedding_model in EMBEDDING_MODEL_LIST: for clustering_method in CLUSTER_METHODS: dbscan_eps_params = [0.4] if clustering_method == "kmeans": num_cluster_methods = NUM_CLUSTER_METHODS else: num_cluster_methods = ["knee"] if clustering_method == "dbscan": dbscan_eps_params = [0.4] # add others here for num_cluster_method in num_cluster_methods: for dbscan_eps in dbscan_eps_params: for remap in DIM_REDUCE_OPTIONS: for tf_idf_scale in [0.0]: config = get_default_config() config["embedding_model"] = embedding_model config["remap"] = remap config["dbscan_eps"] = dbscan_eps config["tf_idf_scale"] = tf_idf_scale config["clustering_method"] = clustering_method config["num_cluster_method"] = num_cluster_method res, score, adj_rscore = run_pipeline(config, datasets[0], model_provider=model_provider) result_dict = {**config, "dataset": dataset_id, "rand": score, "adj_rand": adj_rscore} all_results.append(result_dict) # wandb.log(result_dict) print("got result") return all_results if __name__ == "__main__": nltk.download('punkt') # wandb.init( # set the wandb project where this run will be logged # project="smart-tab-cluster-eval") all_results = [] for _k in range(4): all_results.extend(sweep_params()) print("***next sweep") df = pd.DataFrame(all_results) df.to_csv("./output/all_pipeline_embedding_test.csv")