projects/Aligned-Platform-EnergizeAI/aggregation/energize-aggregator.ipynb (583 lines of code) (raw):

{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Developed by Energize AI, Inc. (energize.ai)\n", "# For OpenAI, Democratic AI Grant\n", "# Aggregation Module\n", "# October 13, 2023" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Current Date and Time: 2023-09-25 17:30:16.939333\n" ] } ], "source": [ "from datetime import datetime\n", "current_datetime = datetime.now()\n", "print(\"Current Date and Time:\", current_datetime)" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Collecting jax\n", " Using cached jax-0.3.25-py3-none-any.whl\n", "Requirement already satisfied: numpy>=1.20 in /opt/conda/lib/python3.7/site-packages (from jax) (1.21.6)\n", "Collecting opt-einsum (from jax)\n", " Using cached opt_einsum-3.3.0-py3-none-any.whl (65 kB)\n", "Requirement already satisfied: scipy>=1.5 in /opt/conda/lib/python3.7/site-packages (from jax) (1.7.3)\n", "Requirement already satisfied: typing-extensions in /opt/conda/lib/python3.7/site-packages (from jax) (4.7.1)\n", "Installing collected packages: opt-einsum, jax\n", "Successfully installed jax-0.3.25 opt-einsum-3.3.0\n", "Collecting jaxlib\n", " Using cached jaxlib-0.3.25-cp37-cp37m-manylinux2014_x86_64.whl (71.2 MB)\n", "Requirement already satisfied: scipy>=1.5 in /opt/conda/lib/python3.7/site-packages (from jaxlib) (1.7.3)\n", "Requirement already satisfied: numpy>=1.20 in /opt/conda/lib/python3.7/site-packages (from jaxlib) (1.21.6)\n", "Installing collected packages: jaxlib\n", "Successfully installed jaxlib-0.3.25\n", "Collecting python-decouple\n", " Using cached python_decouple-3.8-py3-none-any.whl (9.9 kB)\n", "Installing collected packages: python-decouple\n", "Successfully installed python-decouple-3.8\n" ] } ], "source": [ "!pip install jax\n", "!pip install jaxlib\n", "!pip install python-decouple" ] }, { "cell_type": "markdown", "metadata": { "id": "3JkgXmsXfnvs" }, "source": [ "Load the latest user and guideline data from Energize." ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "id": "gQqP5Z0-Z41D" }, "outputs": [], "source": [ "import requests\n", "import pandas as pd\n", "import numpy as np\n", "\n", "ENV = \"PROD\"\n", "BASE = \"https://dev.api.energize.ai/api/openapi\" if ENV != \"PROD\" else \"https://api.energize.ai/api/openapi\"\n", "\n", "API_KEY = \"add energize api key\"\n", "\n", "headers = {\n", " \"authorization\": \"Bearer \" + API_KEY,\n", " \"Content-Type\": \"application/json\"\n", "}\n", "\n", "params = {'topics':True, 'guidelines': True, 'ratings': True, 'users': True, 'mu': True }\n", "\n", "# Make the request\n", "data = requests.get(\n", " f\"{BASE}/exports\",\n", " headers=headers,\n", " params=params\n", ").json()\n", "\n", "mu = data.pop(\"mu\")\n", "full_df = pd.DataFrame.from_dict(data, orient = \"index\")" ] }, { "cell_type": "markdown", "metadata": { "id": "w_f_XIsLedBd" }, "source": [ "\n", "The loss function is based on the Twitter Community Notes rating algorithm. You can read their open-sourced documentation [here](https://communitynotes.twitter.com/guide/en/under-the-hood/ranking-notes).\n", "\n", "For each user/guideline pair, the model predicts whether the user rated the guideline as helpful using the following:\n", "\n", "The model learns three things: embeddings for the guidelines and users and user/guideline specific intercepts, as well as a global intercept." ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "id": "fK4gXuwVZfk8" }, "outputs": [], "source": [ "import jax\n", "import jax.numpy as jnp\n", "\"\"\"\n", "Calculate the loss of a set of parameters given a response matrix.\n", "\n", "Inputs:\n", " mu (float): intercept adjustment across all users\n", " user_embeddings (array): vector embeddings of each user\n", " guideline_embeddings (float array): vector embeddings of each guideline, in the same space as user_embeddings\n", " user_intercepts (array): user-specific intercept adjustment\n", " guideline_intercepts (array): guideline-specific intercept adjustment\n", " response_matrix (array): sparse matrix, with each row corresponding to a user and each column to a guideline.\n", " consists of 0,1,and nan corresponding to disliked, liked, and not rated.\n", " lambda_i (float): regularization weighing of the intercepts\n", " lambda_f (float): regularization weighing of the embeddings.\n", "Outputs:\n", " (float) loss given parameters\n", "\"\"\"\n", "def cn_loss(mu,user_embeddings,guideline_embeddings,user_intercepts,guideline_intercepts, response_matrix,\n", " lambda_i = .15, lambda_f = .03):\n", " ypred = mu+ user_embeddings @ (guideline_embeddings.T) + (user_intercepts.reshape(-1,1) + guideline_intercepts) # calculated the predicted responses\n", " answered = 1-jnp.isnan(response_matrix) # mask array to determine if a user rated a guideline\n", "\n", " squared_error = jnp.sum(((jnp.nan_to_num(response_matrix) - ypred)* answered )**2 ) # calculate mse then mask using answered before summing\n", " user_frequences = jnp.sum(answered, axis=1) # number of responses each user made\n", " guideline_frequences = jnp.sum(answered, axis=0) #number of ratings each guideline has\n", "\n", " # calculate l2 regularization on the intercepts using frequencies and on mu using answered\n", " intercept_regularization = jnp.sum(user_frequences * (user_intercepts **2)) + jnp.sum((answered * mu)**2) + jnp.sum(guideline_frequences * (guideline_intercepts **2))\n", " # calculate l1 regularization on the embeddings frequencies\n", " vector_regularization = jnp.sum(jnp.abs(user_embeddings) * user_frequences.reshape((-1,1)) ) + jnp.sum(jnp.abs(guideline_embeddings) * guideline_frequences.reshape((-1,1)))\n", "\n", " #loss: (mse + regularization terms weighted by lambdas) / total number of responses\n", " return (squared_error + lambda_i * intercept_regularization + lambda_f * vector_regularization) / jnp.sum(answered)\n" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "id": "ZQL_4PQ1gG-n" }, "outputs": [], "source": [ "from tqdm import tqdm\n", "\n", "\n", "\"\"\"\n", "Learn parameters and embeddings from scratch based on a loss function until convergence or an iteration cap.\n", "\n", "Inputs:\n", " loss (function): loss function to use to train\n", " data (array): sparse response matrix of response data\n", " iteration_cap (int): maximum number of gradient descent iterations\n", " pbar (bool): whether to show a tqdm progress bar while training.\n", " convergence_threshold (float): upper bound on change in loss between iterations for convergence\n", " ndim (int): embedding dimensionality.\n", "Output:\n", " (dict) dictionary with the learned parameters.\n", "\"\"\"\n", "\n", "def train_model(loss, data, iteration_cap=10000, pbar = False, lr = .1, convergence_threshold= 10e-9, ndim = 1):\n", " mu = 0.0\n", " user_embeddings = np.random.random((data.shape[0],ndim))\n", " guideline_embeddings = np.random.random((data.shape[1],ndim))\n", " user_intercepts = np.random.random(data.shape[0])\n", " guideline_intercepts = np.random.random(data.shape[1])\n", " losses = [] # we append losses to this through the training process\n", "\n", " # use jax to produce a gradient of the loss function. Pass argnums to not also differentiate over the data.\n", " loss_gradient = jax.grad(loss, argnums=range(0,5))\n", "\n", " #we define generator for the training loop\n", " def generator():\n", " while (len(losses) < iteration_cap) and (len(losses) < 100 or not np.isclose(losses[-1], losses[-2], atol = convergence_threshold)):\n", " yield\n", " iterator = generator()\n", " if pbar: iterator = tqdm(iterator) #if pbar is passed as a parameter then tqdm the generator\n", "\n", "\n", " for _ in iterator:\n", " # calculate loss then the gradient of that loss\n", " losses.append(loss(mu,user_embeddings, guideline_embeddings, user_intercepts, guideline_intercepts, data))\n", " lg = loss_gradient(mu,user_embeddings, guideline_embeddings, user_intercepts, guideline_intercepts, data)\n", "\n", " # gradient descent\n", " mu -= (lg[0] * lr)\n", " user_embeddings -= (lg[1] * lr)\n", " guideline_embeddings -= (lg[2] * lr)\n", " user_intercepts -= (lg[3] * lr)\n", " guideline_intercepts -= (lg[4] * lr)\n", "\n", " return {\"mu\": mu,\n", " \"user_embeddings\": user_embeddings,\n", " \"guideline_embeddings\": guideline_embeddings,\n", " \"user_intercepts\": user_intercepts,\n", " \"guideline_intercepts\": guideline_intercepts,\n", " \"training_losses\": losses\n", " }\n", "\"\"\"\n", "Learn parameters and embeddings from prexisting parameters and embeddings based on a loss function until convergence or an iteration cap.\n", "\n", "Inputs:\n", " loss (function): loss function to use to train\n", " mu (float): previous mu\n", " response_data (DataFrame): user by guideline response matrix\n", " guideline_data (DataFrame): dataframe with prior guideline embeddings and intercepts\n", " user_data (DataFrame): dataframe with prior user embeddings and intercepts\n", " iteration_cap (int): maximum number of gradient descent iterations\n", " pbar (bool): whether to show a tqdm progress bar while training.\n", " convergence_threshold (float): upper bound on change in loss between iterations for convergence\n", " ndim (int): embedding dimensionality.\n", "Output:\n", " (dict) dictionary with the learned parameters.\n", "\"\"\"\n", "\n", "def train_model_initialized(loss, mu, response_data, guideline_data, user_data,\n", " iteration_cap=10000, pbar = False, lr = .1, convergence_threshold= 10e-9, ndim = 1):\n", " # clean data\n", " user_data[\"embeddings\"] = user_data[\"embeddings\"].apply(lambda x: np.nan if isinstance(x, list) and not x else x)\n", " user_data = user_data.dropna()\n", " guideline_data[\"embeddings\"] = guideline_data[\"embeddings\"].apply(lambda x: np.nan if isinstance(x, list) and not x else x)\n", " guideline_data = guideline_data.dropna()\n", " # get ids that show up in the response matrix\n", " data = response_data.values\n", " author_ids = ratings_to_response_matrix(response_df, asArray = False).index\n", " guideline_ids = ratings_to_response_matrix(response_df, asArray = False).columns\n", "\n", " # these one-liners use the previous embedding if provided or randomly initialize if not\n", " user_embeddings = np.array([np.array(user_data[user_data[\"id\"] == a][\"embeddings\"])[0] if a in user_data[\"id\"].tolist() else np.random.random((ndim)) for a in author_ids])\n", " guideline_embeddings = np.array([np.array(guideline_data[guideline_data[\"id\"] == a][\"embeddings\"])[0] if a in guideline_data[\"id\"].tolist() else np.random.random((ndim)) for a in guideline_ids])\n", " user_intercepts = np.array([np.array(user_data[user_data[\"id\"] == a][\"intercept\"])[0] if a in user_data[\"id\"].tolist() else np.random.random() for a in author_ids])\n", " guideline_intercepts = np.array([np.array(guideline_data[guideline_data[\"id\"] == a][\"intercept\"])[0] if a in guideline_data[\"id\"].tolist() else np.random.random() for a in guideline_ids])\n", "\n", " losses = [] # we append losses to this through the training process\n", "\n", " # use jax to produce a gradient of the loss function. Pass argnums to not also differentiate over the data.\n", " loss_gradient = jax.grad(loss, argnums=range(0,5))\n", "\n", " #we define generator for the training loop\n", " def generator():\n", " while (len(losses) < iteration_cap) and (len(losses) < 100 or not np.isclose(losses[-1], losses[-2], atol = convergence_threshold)):\n", " yield\n", " iterator = generator()\n", " if pbar: iterator = tqdm(iterator)\n", "\n", "\n", " for _ in iterator: # training loop\n", " losses.append(loss(mu,user_embeddings, guideline_embeddings, user_intercepts, guideline_intercepts, data)) # calcuate loss\n", " lg = loss_gradient(mu,user_embeddings, guideline_embeddings, user_intercepts, guideline_intercepts, data) # gradient of loss\n", "\n", " # gradient descent\n", " mu -= (lg[0] * lr)\n", " user_embeddings -= (lg[1] * lr)\n", " guideline_embeddings -= (lg[2] * lr)\n", " user_intercepts -= (lg[3] * lr)\n", " guideline_intercepts -= (lg[4] * lr)\n", "\n", " return {\"mu\": mu,\n", " \"user_embeddings\": dict(map(lambda i,j : (i,j.tolist()) , author_ids,user_embeddings)),\n", " \"guideline_embeddings\": dict(map(lambda i,j : (i,j.tolist()) , guideline_ids, guideline_embeddings)),\n", " \"user_intercepts\": dict(map(lambda i,j : (i,float(j)) , author_ids,user_intercepts)),\n", " \"guideline_intercepts\": dict(map(lambda i,j : (i,float(j)) , guideline_ids,guideline_intercepts)),\n", " \"training_losses\": losses\n", " }" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "id": "wy8tQjYsiwhA" }, "outputs": [], "source": [ "\"\"\"\n", "Helper functions for working with Energize data.\n", "\"\"\"\n", "\n", "\"\"\"\n", "Loads a dataframe of responses and produces a sparse matrix.\n", "Inputs:\n", " df (Dataframe): response df generated from Energize data.\n", " asArray (Bool): whether the output should be returned as an array or dataframe.\n", " defaults True, corresponding to array output.\n", "Output:\n", " Depending on asArray: Either sparse array or sparse dataframe of the response matrix.\n", "\"\"\"\n", "def ratings_to_response_matrix(df, asArray = True):\n", " df[\"nrating\"] = (df[\"rating\"]==\"helpful\").astype(float)\n", " out = df[[\"nrating\", \"guidelineId\", \"authorId\"]].pivot(columns = \"guidelineId\", index= \"authorId\")\n", " if asArray:\n", " return out.values\n", " else:\n", " return out[\"nrating\"]\n", "\n", "\"\"\"\n", "Loads a dataframe of guidelines and returns a dictionary mapping guideline ids to guideline texts.\n", "Input:\n", " df (DataFrame): guideline df generated from Energize data.\n", "Output:\n", " (dict) guidelineId-guideline mapping.\n", "\"\"\"\n", "def make_guideline_map(df):\n", " d = dict()\n", " df.apply(lambda x: d.update({x[\"id\"]:x[\"value\"]}), axis=1)\n", " return d\n", "\n", "def get_guideline_data(df):\n", " guideline_embeddings = full_df.loc[\"guidelines\"].dropna().apply(lambda x: x[[\"embedding\", \"intercept\"]])\n", " return\n", "\n", "\n" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "0it [00:00, ?it/s]WARNING:jax._src.lib.xla_bridge:No GPU/TPU found, falling back to CPU. (Set TF_CPP_MIN_LOG_LEVEL=0 and rerun for more info.)\n", "100it [00:09, 10.82it/s]\n" ] } ], "source": [ "response_df = pd.DataFrame.from_records(full_df.loc[\"ratings\"].dropna())\n", "guideline_df = pd.DataFrame.from_records(full_df.loc[\"guidelines\"].dropna())[[\"id\", \"embeddings\", \"intercept\"]]\n", "user_df = pd.DataFrame.from_records(full_df.loc[\"users\"].dropna())[[\"id\", \"embeddings\", \"intercept\"]]\n", "response_matrix = ratings_to_response_matrix(response_df, asArray = False)\n", "\n", "training_result = train_model_initialized(cn_loss, mu, response_matrix, guideline_df, user_df, pbar = True)\n" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "\"\"\"\n", "Using the data from the model training, produce a constitution.\n", "\n", "Inputs:\n", " training_result (dict): dictionary produced from the train_model function, including all embeddings and intercepts.\n", " response_df (DataFrame): response df generated from Energize data.\n", " guideline_df (DataFrame): response df generated from Energize data.\n", " response_matrix (DataFrame): user-guideline response matrix\n", " threshold (float): lower bound on the intercept for a guideline to make it to the constitution.\n", " min_ratings (int): lower bound on number of ratings for a response to be accepted\n", " consensus_min (float): lower bound of raw support for acceptance\n", "Output (dict): dictionary containing accepted guideline ids and the corresponding consensus scores\n", "\"\"\"\n", "\n", "def produce_constitution (training_result, response_df, guideline_df, response_matrix, threshold = .375, min_ratings = 5, consensus_min= .8):\n", " # find subset which will be part of constitution\n", " accepted = {key:training_result[\"guideline_intercepts\"][key] for key in training_result[\"guideline_intercepts\"] if training_result[\"guideline_intercepts\"][key] > threshold}\n", "\n", " df=pd.DataFrame(np.sum((~np.isnan(response_matrix)).astype(int), axis=0))\n", " df[\"raw_rates\"] = np.apply_along_axis(lambda x: np.mean(x[~np.isnan(x)]), 0, response_matrix)\n", " df = df.loc[(df[0] >= min_ratings) * (df[\"raw_rates\"] >= consensus_min)] # only accept guidelines with at least min_ratings responses and <= 80% support\n", " df = df.loc[[i in accepted.keys() for i in df.index]] # subset by accepted\n", " df[\"intercepts\"] = [accepted[id] for id in df.index] # fill intercepts\n", " df[\"consensus_score\"] = (1.1-df[\"raw_rates\"]) * df[\"intercepts\"] + df[\"raw_rates\"] - .1\n", "\n", " accepted_consensus_scores = df[\"consensus_score\"].to_dict()\n", " accepted_ids = list(accepted_consensus_scores.keys())\n", "\n", " return {\"accepted_ids\": accepted_ids,\n", " \"accepted_consensus_scores\": accepted_consensus_scores,\n", " }\n", "\n", "guideline_map_df = pd.DataFrame.from_records(full_df.loc[\"guidelines\"].dropna().apply(lambda x: x[\"value\"]))\n", "constitution = produce_constitution(training_result, response_df, guideline_map_df, response_matrix)" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "data = {\n", " \"accepted_guideline_ids\": constitution[\"accepted_ids\"],\n", " \"guideline_intercepts\": training_result[\"guideline_intercepts\"],\n", " \"guideline_embeddings\": training_result[\"guideline_embeddings\"],\n", " \"user_embeddings\": training_result[\"user_embeddings\"],\n", " \"user_intercepts\": training_result[\"user_intercepts\"],\n", " \"guideline_scores\": constitution[\"accepted_consensus_scores\"],\n", " \"mu\": training_result[\"mu\"].tolist()\n", " }" ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "id": "6X4x53-6kmKv" }, "outputs": [ { "data": { "text/plain": [ "{'success': True}" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def post_to_public(constitution, training_result, data):\n", " headers = {\n", " \"authorization\": \"Bearer \" + API_KEY,\n", " \"Content-Type\": \"application/json\"\n", " }\n", " url = f\"{BASE}/constitution\"\n", " res = requests.post(url, json=data, headers=headers)\n", " return res\n", "\n", "post_to_public(constitution, training_result, data).json()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Update demographic data (optional)\n", "\n", "prolific_api_key = \"add prolific api key\"\n", "\n", "# get all prolific study ids\n", "\n", "base_url = \"https://api.prolific.co/api/v1/studies/\"\n", "params = {\"state\": \"(active|completed)\"}\n", "\n", "headers = {\n", " \"Content-Type\": \"application/json\",\n", " \"Authorization\": f\"Token {prolific_api_key}\"\n", "}\n", "\n", "response = requests.get(base_url, params=params, headers=headers)\n", "\n", "if response.status_code == 200:\n", " study_data = response.json()\n", " study_ids = [study['id'] for study in study_data['results']]\n", "else:\n", " print(f\"Error: {response.status_code}\")\n", " study_ids = []\n", "\n", "print(study_ids)\n", "\n", "import csv\n", "from io import StringIO\n", "\n", "def csv_to_json(csv_content: str):\n", " csv_file = StringIO(csv_content)\n", " reader = csv.DictReader(csv_file)\n", "\n", " result = [dict(row) for row in reader]\n", " return result\n", "\n", "study_id_to_demo_data = dict()\n", "\n", "for study_id in study_ids:\n", " demo_url = f\"https://api.prolific.co/api/v1/studies/{study_id}/export/\"\n", "\n", " print(demo_url)\n", "\n", " headers = {\n", " \"Content-Type\": \"text/csv\",\n", " \"Authorization\": f\"Token {prolific_api_key}\"\n", " }\n", "\n", " response = requests.get(demo_url, headers=headers, stream=True)\n", "\n", " if response.status_code == 200:\n", " # Reading content in chunks and joining to get the full data\n", " demo_data = ''.join(chunk.decode('utf-8') for chunk in response.iter_content(chunk_size=8192))\n", " \n", " parsed_data = csv_to_json(demo_data)\n", " \n", " study_id_to_demo_data[study_id] = parsed_data\n", "\n", " else:\n", " print(response.text)\n", " print(f\"Error: {response.status_code}\")\n", "\n", " break\n", "\n", "\n", "# Define the API endpoint and headers\n", "headers = {\n", " \"authorization\": \"Bearer \" + API_KEY,\n", " \"Content-Type\": \"application/json\"\n", "}\n", "url = f\"{BASE}/prolific/demographics\"\n", "\n", "data = {\n", " 'studyIdToDemoData': study_id_to_demo_data,\n", "}\n", "\n", "# Send the POST request\n", "response = requests.post(url, json=data, headers=headers)\n", "\n", "# Handle the response\n", "if response.status_code == 200:\n", " print(\"Successfully sent data!\")\n", " # print(response.json()) # Assuming the response is a JSON object\n", "else:\n", " print(f\"Failed to send data. Status code: {response.status_code}\")\n", " # print(response.text)" ] } ], "metadata": { "colab": { "provenance": [] }, "kernelspec": { "display_name": "Python (Local)", "language": "python", "name": "local-base" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.12" } }, "nbformat": 4, "nbformat_minor": 4 }