projects/Aligned-Platform-EnergizeAI/aggregation/energize-aggregator.ipynb (583 lines of code) (raw):
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Developed by Energize AI, Inc. (energize.ai)\n",
"# For OpenAI, Democratic AI Grant\n",
"# Aggregation Module\n",
"# October 13, 2023"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Current Date and Time: 2023-09-25 17:30:16.939333\n"
]
}
],
"source": [
"from datetime import datetime\n",
"current_datetime = datetime.now()\n",
"print(\"Current Date and Time:\", current_datetime)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Collecting jax\n",
" Using cached jax-0.3.25-py3-none-any.whl\n",
"Requirement already satisfied: numpy>=1.20 in /opt/conda/lib/python3.7/site-packages (from jax) (1.21.6)\n",
"Collecting opt-einsum (from jax)\n",
" Using cached opt_einsum-3.3.0-py3-none-any.whl (65 kB)\n",
"Requirement already satisfied: scipy>=1.5 in /opt/conda/lib/python3.7/site-packages (from jax) (1.7.3)\n",
"Requirement already satisfied: typing-extensions in /opt/conda/lib/python3.7/site-packages (from jax) (4.7.1)\n",
"Installing collected packages: opt-einsum, jax\n",
"Successfully installed jax-0.3.25 opt-einsum-3.3.0\n",
"Collecting jaxlib\n",
" Using cached jaxlib-0.3.25-cp37-cp37m-manylinux2014_x86_64.whl (71.2 MB)\n",
"Requirement already satisfied: scipy>=1.5 in /opt/conda/lib/python3.7/site-packages (from jaxlib) (1.7.3)\n",
"Requirement already satisfied: numpy>=1.20 in /opt/conda/lib/python3.7/site-packages (from jaxlib) (1.21.6)\n",
"Installing collected packages: jaxlib\n",
"Successfully installed jaxlib-0.3.25\n",
"Collecting python-decouple\n",
" Using cached python_decouple-3.8-py3-none-any.whl (9.9 kB)\n",
"Installing collected packages: python-decouple\n",
"Successfully installed python-decouple-3.8\n"
]
}
],
"source": [
"!pip install jax\n",
"!pip install jaxlib\n",
"!pip install python-decouple"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "3JkgXmsXfnvs"
},
"source": [
"Load the latest user and guideline data from Energize."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"id": "gQqP5Z0-Z41D"
},
"outputs": [],
"source": [
"import requests\n",
"import pandas as pd\n",
"import numpy as np\n",
"\n",
"ENV = \"PROD\"\n",
"BASE = \"https://dev.api.energize.ai/api/openapi\" if ENV != \"PROD\" else \"https://api.energize.ai/api/openapi\"\n",
"\n",
"API_KEY = \"add energize api key\"\n",
"\n",
"headers = {\n",
" \"authorization\": \"Bearer \" + API_KEY,\n",
" \"Content-Type\": \"application/json\"\n",
"}\n",
"\n",
"params = {'topics':True, 'guidelines': True, 'ratings': True, 'users': True, 'mu': True }\n",
"\n",
"# Make the request\n",
"data = requests.get(\n",
" f\"{BASE}/exports\",\n",
" headers=headers,\n",
" params=params\n",
").json()\n",
"\n",
"mu = data.pop(\"mu\")\n",
"full_df = pd.DataFrame.from_dict(data, orient = \"index\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "w_f_XIsLedBd"
},
"source": [
"\n",
"The loss function is based on the Twitter Community Notes rating algorithm. You can read their open-sourced documentation [here](https://communitynotes.twitter.com/guide/en/under-the-hood/ranking-notes).\n",
"\n",
"For each user/guideline pair, the model predicts whether the user rated the guideline as helpful using the following:\n",
"\n",
"The model learns three things: embeddings for the guidelines and users and user/guideline specific intercepts, as well as a global intercept."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"id": "fK4gXuwVZfk8"
},
"outputs": [],
"source": [
"import jax\n",
"import jax.numpy as jnp\n",
"\"\"\"\n",
"Calculate the loss of a set of parameters given a response matrix.\n",
"\n",
"Inputs:\n",
" mu (float): intercept adjustment across all users\n",
" user_embeddings (array): vector embeddings of each user\n",
" guideline_embeddings (float array): vector embeddings of each guideline, in the same space as user_embeddings\n",
" user_intercepts (array): user-specific intercept adjustment\n",
" guideline_intercepts (array): guideline-specific intercept adjustment\n",
" response_matrix (array): sparse matrix, with each row corresponding to a user and each column to a guideline.\n",
" consists of 0,1,and nan corresponding to disliked, liked, and not rated.\n",
" lambda_i (float): regularization weighing of the intercepts\n",
" lambda_f (float): regularization weighing of the embeddings.\n",
"Outputs:\n",
" (float) loss given parameters\n",
"\"\"\"\n",
"def cn_loss(mu,user_embeddings,guideline_embeddings,user_intercepts,guideline_intercepts, response_matrix,\n",
" lambda_i = .15, lambda_f = .03):\n",
" ypred = mu+ user_embeddings @ (guideline_embeddings.T) + (user_intercepts.reshape(-1,1) + guideline_intercepts) # calculated the predicted responses\n",
" answered = 1-jnp.isnan(response_matrix) # mask array to determine if a user rated a guideline\n",
"\n",
" squared_error = jnp.sum(((jnp.nan_to_num(response_matrix) - ypred)* answered )**2 ) # calculate mse then mask using answered before summing\n",
" user_frequences = jnp.sum(answered, axis=1) # number of responses each user made\n",
" guideline_frequences = jnp.sum(answered, axis=0) #number of ratings each guideline has\n",
"\n",
" # calculate l2 regularization on the intercepts using frequencies and on mu using answered\n",
" intercept_regularization = jnp.sum(user_frequences * (user_intercepts **2)) + jnp.sum((answered * mu)**2) + jnp.sum(guideline_frequences * (guideline_intercepts **2))\n",
" # calculate l1 regularization on the embeddings frequencies\n",
" vector_regularization = jnp.sum(jnp.abs(user_embeddings) * user_frequences.reshape((-1,1)) ) + jnp.sum(jnp.abs(guideline_embeddings) * guideline_frequences.reshape((-1,1)))\n",
"\n",
" #loss: (mse + regularization terms weighted by lambdas) / total number of responses\n",
" return (squared_error + lambda_i * intercept_regularization + lambda_f * vector_regularization) / jnp.sum(answered)\n"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"id": "ZQL_4PQ1gG-n"
},
"outputs": [],
"source": [
"from tqdm import tqdm\n",
"\n",
"\n",
"\"\"\"\n",
"Learn parameters and embeddings from scratch based on a loss function until convergence or an iteration cap.\n",
"\n",
"Inputs:\n",
" loss (function): loss function to use to train\n",
" data (array): sparse response matrix of response data\n",
" iteration_cap (int): maximum number of gradient descent iterations\n",
" pbar (bool): whether to show a tqdm progress bar while training.\n",
" convergence_threshold (float): upper bound on change in loss between iterations for convergence\n",
" ndim (int): embedding dimensionality.\n",
"Output:\n",
" (dict) dictionary with the learned parameters.\n",
"\"\"\"\n",
"\n",
"def train_model(loss, data, iteration_cap=10000, pbar = False, lr = .1, convergence_threshold= 10e-9, ndim = 1):\n",
" mu = 0.0\n",
" user_embeddings = np.random.random((data.shape[0],ndim))\n",
" guideline_embeddings = np.random.random((data.shape[1],ndim))\n",
" user_intercepts = np.random.random(data.shape[0])\n",
" guideline_intercepts = np.random.random(data.shape[1])\n",
" losses = [] # we append losses to this through the training process\n",
"\n",
" # use jax to produce a gradient of the loss function. Pass argnums to not also differentiate over the data.\n",
" loss_gradient = jax.grad(loss, argnums=range(0,5))\n",
"\n",
" #we define generator for the training loop\n",
" def generator():\n",
" while (len(losses) < iteration_cap) and (len(losses) < 100 or not np.isclose(losses[-1], losses[-2], atol = convergence_threshold)):\n",
" yield\n",
" iterator = generator()\n",
" if pbar: iterator = tqdm(iterator) #if pbar is passed as a parameter then tqdm the generator\n",
"\n",
"\n",
" for _ in iterator:\n",
" # calculate loss then the gradient of that loss\n",
" losses.append(loss(mu,user_embeddings, guideline_embeddings, user_intercepts, guideline_intercepts, data))\n",
" lg = loss_gradient(mu,user_embeddings, guideline_embeddings, user_intercepts, guideline_intercepts, data)\n",
"\n",
" # gradient descent\n",
" mu -= (lg[0] * lr)\n",
" user_embeddings -= (lg[1] * lr)\n",
" guideline_embeddings -= (lg[2] * lr)\n",
" user_intercepts -= (lg[3] * lr)\n",
" guideline_intercepts -= (lg[4] * lr)\n",
"\n",
" return {\"mu\": mu,\n",
" \"user_embeddings\": user_embeddings,\n",
" \"guideline_embeddings\": guideline_embeddings,\n",
" \"user_intercepts\": user_intercepts,\n",
" \"guideline_intercepts\": guideline_intercepts,\n",
" \"training_losses\": losses\n",
" }\n",
"\"\"\"\n",
"Learn parameters and embeddings from prexisting parameters and embeddings based on a loss function until convergence or an iteration cap.\n",
"\n",
"Inputs:\n",
" loss (function): loss function to use to train\n",
" mu (float): previous mu\n",
" response_data (DataFrame): user by guideline response matrix\n",
" guideline_data (DataFrame): dataframe with prior guideline embeddings and intercepts\n",
" user_data (DataFrame): dataframe with prior user embeddings and intercepts\n",
" iteration_cap (int): maximum number of gradient descent iterations\n",
" pbar (bool): whether to show a tqdm progress bar while training.\n",
" convergence_threshold (float): upper bound on change in loss between iterations for convergence\n",
" ndim (int): embedding dimensionality.\n",
"Output:\n",
" (dict) dictionary with the learned parameters.\n",
"\"\"\"\n",
"\n",
"def train_model_initialized(loss, mu, response_data, guideline_data, user_data,\n",
" iteration_cap=10000, pbar = False, lr = .1, convergence_threshold= 10e-9, ndim = 1):\n",
" # clean data\n",
" user_data[\"embeddings\"] = user_data[\"embeddings\"].apply(lambda x: np.nan if isinstance(x, list) and not x else x)\n",
" user_data = user_data.dropna()\n",
" guideline_data[\"embeddings\"] = guideline_data[\"embeddings\"].apply(lambda x: np.nan if isinstance(x, list) and not x else x)\n",
" guideline_data = guideline_data.dropna()\n",
" # get ids that show up in the response matrix\n",
" data = response_data.values\n",
" author_ids = ratings_to_response_matrix(response_df, asArray = False).index\n",
" guideline_ids = ratings_to_response_matrix(response_df, asArray = False).columns\n",
"\n",
" # these one-liners use the previous embedding if provided or randomly initialize if not\n",
" user_embeddings = np.array([np.array(user_data[user_data[\"id\"] == a][\"embeddings\"])[0] if a in user_data[\"id\"].tolist() else np.random.random((ndim)) for a in author_ids])\n",
" guideline_embeddings = np.array([np.array(guideline_data[guideline_data[\"id\"] == a][\"embeddings\"])[0] if a in guideline_data[\"id\"].tolist() else np.random.random((ndim)) for a in guideline_ids])\n",
" user_intercepts = np.array([np.array(user_data[user_data[\"id\"] == a][\"intercept\"])[0] if a in user_data[\"id\"].tolist() else np.random.random() for a in author_ids])\n",
" guideline_intercepts = np.array([np.array(guideline_data[guideline_data[\"id\"] == a][\"intercept\"])[0] if a in guideline_data[\"id\"].tolist() else np.random.random() for a in guideline_ids])\n",
"\n",
" losses = [] # we append losses to this through the training process\n",
"\n",
" # use jax to produce a gradient of the loss function. Pass argnums to not also differentiate over the data.\n",
" loss_gradient = jax.grad(loss, argnums=range(0,5))\n",
"\n",
" #we define generator for the training loop\n",
" def generator():\n",
" while (len(losses) < iteration_cap) and (len(losses) < 100 or not np.isclose(losses[-1], losses[-2], atol = convergence_threshold)):\n",
" yield\n",
" iterator = generator()\n",
" if pbar: iterator = tqdm(iterator)\n",
"\n",
"\n",
" for _ in iterator: # training loop\n",
" losses.append(loss(mu,user_embeddings, guideline_embeddings, user_intercepts, guideline_intercepts, data)) # calcuate loss\n",
" lg = loss_gradient(mu,user_embeddings, guideline_embeddings, user_intercepts, guideline_intercepts, data) # gradient of loss\n",
"\n",
" # gradient descent\n",
" mu -= (lg[0] * lr)\n",
" user_embeddings -= (lg[1] * lr)\n",
" guideline_embeddings -= (lg[2] * lr)\n",
" user_intercepts -= (lg[3] * lr)\n",
" guideline_intercepts -= (lg[4] * lr)\n",
"\n",
" return {\"mu\": mu,\n",
" \"user_embeddings\": dict(map(lambda i,j : (i,j.tolist()) , author_ids,user_embeddings)),\n",
" \"guideline_embeddings\": dict(map(lambda i,j : (i,j.tolist()) , guideline_ids, guideline_embeddings)),\n",
" \"user_intercepts\": dict(map(lambda i,j : (i,float(j)) , author_ids,user_intercepts)),\n",
" \"guideline_intercepts\": dict(map(lambda i,j : (i,float(j)) , guideline_ids,guideline_intercepts)),\n",
" \"training_losses\": losses\n",
" }"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"id": "wy8tQjYsiwhA"
},
"outputs": [],
"source": [
"\"\"\"\n",
"Helper functions for working with Energize data.\n",
"\"\"\"\n",
"\n",
"\"\"\"\n",
"Loads a dataframe of responses and produces a sparse matrix.\n",
"Inputs:\n",
" df (Dataframe): response df generated from Energize data.\n",
" asArray (Bool): whether the output should be returned as an array or dataframe.\n",
" defaults True, corresponding to array output.\n",
"Output:\n",
" Depending on asArray: Either sparse array or sparse dataframe of the response matrix.\n",
"\"\"\"\n",
"def ratings_to_response_matrix(df, asArray = True):\n",
" df[\"nrating\"] = (df[\"rating\"]==\"helpful\").astype(float)\n",
" out = df[[\"nrating\", \"guidelineId\", \"authorId\"]].pivot(columns = \"guidelineId\", index= \"authorId\")\n",
" if asArray:\n",
" return out.values\n",
" else:\n",
" return out[\"nrating\"]\n",
"\n",
"\"\"\"\n",
"Loads a dataframe of guidelines and returns a dictionary mapping guideline ids to guideline texts.\n",
"Input:\n",
" df (DataFrame): guideline df generated from Energize data.\n",
"Output:\n",
" (dict) guidelineId-guideline mapping.\n",
"\"\"\"\n",
"def make_guideline_map(df):\n",
" d = dict()\n",
" df.apply(lambda x: d.update({x[\"id\"]:x[\"value\"]}), axis=1)\n",
" return d\n",
"\n",
"def get_guideline_data(df):\n",
" guideline_embeddings = full_df.loc[\"guidelines\"].dropna().apply(lambda x: x[[\"embedding\", \"intercept\"]])\n",
" return\n",
"\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"0it [00:00, ?it/s]WARNING:jax._src.lib.xla_bridge:No GPU/TPU found, falling back to CPU. (Set TF_CPP_MIN_LOG_LEVEL=0 and rerun for more info.)\n",
"100it [00:09, 10.82it/s]\n"
]
}
],
"source": [
"response_df = pd.DataFrame.from_records(full_df.loc[\"ratings\"].dropna())\n",
"guideline_df = pd.DataFrame.from_records(full_df.loc[\"guidelines\"].dropna())[[\"id\", \"embeddings\", \"intercept\"]]\n",
"user_df = pd.DataFrame.from_records(full_df.loc[\"users\"].dropna())[[\"id\", \"embeddings\", \"intercept\"]]\n",
"response_matrix = ratings_to_response_matrix(response_df, asArray = False)\n",
"\n",
"training_result = train_model_initialized(cn_loss, mu, response_matrix, guideline_df, user_df, pbar = True)\n"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"\"\"\"\n",
"Using the data from the model training, produce a constitution.\n",
"\n",
"Inputs:\n",
" training_result (dict): dictionary produced from the train_model function, including all embeddings and intercepts.\n",
" response_df (DataFrame): response df generated from Energize data.\n",
" guideline_df (DataFrame): response df generated from Energize data.\n",
" response_matrix (DataFrame): user-guideline response matrix\n",
" threshold (float): lower bound on the intercept for a guideline to make it to the constitution.\n",
" min_ratings (int): lower bound on number of ratings for a response to be accepted\n",
" consensus_min (float): lower bound of raw support for acceptance\n",
"Output (dict): dictionary containing accepted guideline ids and the corresponding consensus scores\n",
"\"\"\"\n",
"\n",
"def produce_constitution (training_result, response_df, guideline_df, response_matrix, threshold = .375, min_ratings = 5, consensus_min= .8):\n",
" # find subset which will be part of constitution\n",
" accepted = {key:training_result[\"guideline_intercepts\"][key] for key in training_result[\"guideline_intercepts\"] if training_result[\"guideline_intercepts\"][key] > threshold}\n",
"\n",
" df=pd.DataFrame(np.sum((~np.isnan(response_matrix)).astype(int), axis=0))\n",
" df[\"raw_rates\"] = np.apply_along_axis(lambda x: np.mean(x[~np.isnan(x)]), 0, response_matrix)\n",
" df = df.loc[(df[0] >= min_ratings) * (df[\"raw_rates\"] >= consensus_min)] # only accept guidelines with at least min_ratings responses and <= 80% support\n",
" df = df.loc[[i in accepted.keys() for i in df.index]] # subset by accepted\n",
" df[\"intercepts\"] = [accepted[id] for id in df.index] # fill intercepts\n",
" df[\"consensus_score\"] = (1.1-df[\"raw_rates\"]) * df[\"intercepts\"] + df[\"raw_rates\"] - .1\n",
"\n",
" accepted_consensus_scores = df[\"consensus_score\"].to_dict()\n",
" accepted_ids = list(accepted_consensus_scores.keys())\n",
"\n",
" return {\"accepted_ids\": accepted_ids,\n",
" \"accepted_consensus_scores\": accepted_consensus_scores,\n",
" }\n",
"\n",
"guideline_map_df = pd.DataFrame.from_records(full_df.loc[\"guidelines\"].dropna().apply(lambda x: x[\"value\"]))\n",
"constitution = produce_constitution(training_result, response_df, guideline_map_df, response_matrix)"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"data = {\n",
" \"accepted_guideline_ids\": constitution[\"accepted_ids\"],\n",
" \"guideline_intercepts\": training_result[\"guideline_intercepts\"],\n",
" \"guideline_embeddings\": training_result[\"guideline_embeddings\"],\n",
" \"user_embeddings\": training_result[\"user_embeddings\"],\n",
" \"user_intercepts\": training_result[\"user_intercepts\"],\n",
" \"guideline_scores\": constitution[\"accepted_consensus_scores\"],\n",
" \"mu\": training_result[\"mu\"].tolist()\n",
" }"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"id": "6X4x53-6kmKv"
},
"outputs": [
{
"data": {
"text/plain": [
"{'success': True}"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"def post_to_public(constitution, training_result, data):\n",
" headers = {\n",
" \"authorization\": \"Bearer \" + API_KEY,\n",
" \"Content-Type\": \"application/json\"\n",
" }\n",
" url = f\"{BASE}/constitution\"\n",
" res = requests.post(url, json=data, headers=headers)\n",
" return res\n",
"\n",
"post_to_public(constitution, training_result, data).json()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Update demographic data (optional)\n",
"\n",
"prolific_api_key = \"add prolific api key\"\n",
"\n",
"# get all prolific study ids\n",
"\n",
"base_url = \"https://api.prolific.co/api/v1/studies/\"\n",
"params = {\"state\": \"(active|completed)\"}\n",
"\n",
"headers = {\n",
" \"Content-Type\": \"application/json\",\n",
" \"Authorization\": f\"Token {prolific_api_key}\"\n",
"}\n",
"\n",
"response = requests.get(base_url, params=params, headers=headers)\n",
"\n",
"if response.status_code == 200:\n",
" study_data = response.json()\n",
" study_ids = [study['id'] for study in study_data['results']]\n",
"else:\n",
" print(f\"Error: {response.status_code}\")\n",
" study_ids = []\n",
"\n",
"print(study_ids)\n",
"\n",
"import csv\n",
"from io import StringIO\n",
"\n",
"def csv_to_json(csv_content: str):\n",
" csv_file = StringIO(csv_content)\n",
" reader = csv.DictReader(csv_file)\n",
"\n",
" result = [dict(row) for row in reader]\n",
" return result\n",
"\n",
"study_id_to_demo_data = dict()\n",
"\n",
"for study_id in study_ids:\n",
" demo_url = f\"https://api.prolific.co/api/v1/studies/{study_id}/export/\"\n",
"\n",
" print(demo_url)\n",
"\n",
" headers = {\n",
" \"Content-Type\": \"text/csv\",\n",
" \"Authorization\": f\"Token {prolific_api_key}\"\n",
" }\n",
"\n",
" response = requests.get(demo_url, headers=headers, stream=True)\n",
"\n",
" if response.status_code == 200:\n",
" # Reading content in chunks and joining to get the full data\n",
" demo_data = ''.join(chunk.decode('utf-8') for chunk in response.iter_content(chunk_size=8192))\n",
" \n",
" parsed_data = csv_to_json(demo_data)\n",
" \n",
" study_id_to_demo_data[study_id] = parsed_data\n",
"\n",
" else:\n",
" print(response.text)\n",
" print(f\"Error: {response.status_code}\")\n",
"\n",
" break\n",
"\n",
"\n",
"# Define the API endpoint and headers\n",
"headers = {\n",
" \"authorization\": \"Bearer \" + API_KEY,\n",
" \"Content-Type\": \"application/json\"\n",
"}\n",
"url = f\"{BASE}/prolific/demographics\"\n",
"\n",
"data = {\n",
" 'studyIdToDemoData': study_id_to_demo_data,\n",
"}\n",
"\n",
"# Send the POST request\n",
"response = requests.post(url, json=data, headers=headers)\n",
"\n",
"# Handle the response\n",
"if response.status_code == 200:\n",
" print(\"Successfully sent data!\")\n",
" # print(response.json()) # Assuming the response is a JSON object\n",
"else:\n",
" print(f\"Failed to send data. Status code: {response.status_code}\")\n",
" # print(response.text)"
]
}
],
"metadata": {
"colab": {
"provenance": []
},
"kernelspec": {
"display_name": "Python (Local)",
"language": "python",
"name": "local-base"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.12"
}
},
"nbformat": 4,
"nbformat_minor": 4
}