supporting-blog-content/esql-millionaire/millionaire.ipynb (265 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Millionaire Odds vs. Hit by a Bus: An ESQL Analysis"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Python modules"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!pip cache purge\n",
"!pip uninstall -y elasticsearch\n",
"!pip install elasticsearch==8.14 pandas matplotlib spicy"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Loading Synthetic data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import numpy as np\n",
"from elasticsearch import Elasticsearch, helpers\n",
"\n",
"# Initialize Elasticsearch client\n",
"client = Elasticsearch(\n",
" hosts=\"YOUR_DEPLOYMENT_CLOUD_ID\",\n",
" api_key=\"YOUR_DEPLOYMENT_API_KEY\",\n",
")\n",
"\n",
"\n",
"# Generate synthetic data with a highly skewed distribution\n",
"num_records = 500000\n",
"np.random.seed(42) # Ensure reproducibility\n",
"\n",
"# Generate net worth using a highly skewed distribution\n",
"ages = np.random.randint(20, 80, num_records) # Random ages between 20 and 80\n",
"incomes = np.random.exponential(\n",
" scale=10000, size=num_records\n",
") # Exponential distribution for income\n",
"# Use a more skewed distribution for net worth with a much larger range\n",
"net_worths = np.random.exponential(\n",
" scale=100000000, size=num_records\n",
") # Extremely skewed net worth\n",
"\n",
"# Scale up the net worths to reach up to $100 billion\n",
"net_worths = np.clip(net_worths, 0, 100000000000)\n",
"\n",
"# Create DataFrame\n",
"df = pd.DataFrame(\n",
" {\n",
" \"id\": range(1, num_records + 1),\n",
" \"age\": ages,\n",
" \"income\": incomes,\n",
" \"net_worth\": net_worths,\n",
" \"counter\": range(1, num_records + 1), # Add a counter field for pagination\n",
" }\n",
")\n",
"\n",
"# Index the data into Elasticsearch\n",
"index_name = \"raw_wealth_data_large\"\n",
"client.indices.delete(index=index_name, ignore=[400, 404])\n",
"client.indices.create(index=index_name, ignore=400)\n",
"\n",
"\n",
"def generator(df):\n",
" for index, row in df.iterrows():\n",
" yield {\"_index\": index_name, \"_source\": row.to_dict()}\n",
"\n",
"\n",
"helpers.bulk(client, generator(df))\n",
"\n",
"print(\"Data indexed successfully.\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Pulling data with ES|QL and creating a dataframe"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import numpy as np\n",
"from elasticsearch import Elasticsearch, helpers\n",
"from io import StringIO\n",
"import matplotlib.pyplot as plt\n",
"from scipy.stats import pareto\n",
"\n",
"\n",
"# Function to execute ESQL query and fetch data in chunks\n",
"def execute_esql_query(query):\n",
" response = client.esql.query(query=query, format=\"csv\")\n",
" return pd.read_csv(StringIO(response.body))\n",
"\n",
"\n",
"# Function to fetch paginated data using the counter field\n",
"def fetch_paginated_data(index, num_records, size=10000):\n",
" all_data = pd.DataFrame()\n",
" for start in range(1, num_records + 1, size):\n",
" end = start + size - 1\n",
" query = f\"\"\"\n",
" FROM {index}\n",
" | WHERE counter >= {start} AND counter <= {end}\n",
" | limit {size}\n",
" \"\"\"\n",
" data_chunk = execute_esql_query(query)\n",
" all_data = pd.concat([all_data, data_chunk], ignore_index=True)\n",
" return all_data\n",
"\n",
"\n",
"# Fetch all data using pagination and ESQL\n",
"num_records = 500000\n",
"all_data_df = fetch_paginated_data(index_name, num_records)\n",
"print(f\"Total Data Retrieved: {len(all_data_df)} records\")\n",
"\n",
"# Fit a Pareto distribution to the data\n",
"shape, loc, scale = pareto.fit(all_data_df[\"net_worth\"], floc=0)\n",
"\n",
"# Calculate the probability density for each net worth\n",
"all_data_df[\"net_worth_probability\"] = pareto.pdf(\n",
" all_data_df[\"net_worth\"], shape, loc=loc, scale=scale\n",
")\n",
"\n",
"# Normalize the probabilities to sum to 1\n",
"all_data_df[\"net_worth_probability\"] /= all_data_df[\"net_worth_probability\"].sum()\n",
"\n",
"print(\"Data with Net Worth Probability:\")\n",
"print(all_data_df.head())\n",
"\n",
"# Find the Net Worth Corresponding to the Bus Hit Probability\n",
"target_probability = 0.0000181\n",
"cumulative_probability = all_data_df[\"net_worth_probability\"].cumsum()\n",
"target_net_worth_df = all_data_df[cumulative_probability >= target_probability].head(1)\n",
"target_net_worth = target_net_worth_df[\"net_worth\"].iloc[0]\n",
"print(f\"Net Worth with Probability >= {target_probability}: {target_net_worth}\")\n",
"\n",
"# Plot the Net Worth Probability Distribution\n",
"plt.figure(figsize=(10, 6))\n",
"plt.hist(\n",
" all_data_df[\"net_worth\"],\n",
" bins=100,\n",
" density=True,\n",
" alpha=0.6,\n",
" color=\"g\",\n",
" label=\"Empirical Data\",\n",
")\n",
"xmin, xmax = plt.xlim()\n",
"x = np.linspace(xmin, xmax, 100)\n",
"p = pareto.pdf(x, shape, loc=loc, scale=scale)\n",
"plt.plot(x, p, \"k\", linewidth=2, label=\"Fitted Pareto Distribution\")\n",
"plt.axhline(\n",
" y=target_probability, color=\"r\", linestyle=\"--\", label=\"Bus Hit Probability\"\n",
")\n",
"plt.axvline(\n",
" x=target_net_worth,\n",
" color=\"g\",\n",
" linestyle=\"--\",\n",
" label=f\"Net Worth = {target_net_worth:.2f}\",\n",
")\n",
"plt.xlabel(\"Net Worth\")\n",
"plt.ylabel(\"Probability\")\n",
"plt.title(\"Net Worth Probability Distribution\")\n",
"plt.legend()\n",
"plt.grid(True)\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Fit Pareto Distribution"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import matplotlib.pyplot as plt\n",
"from scipy.stats import pareto\n",
"\n",
"# Assuming all_data_df contains the fetched net worth data from Elasticsearch\n",
"# Fit a Pareto distribution to the data\n",
"shape, loc, scale = pareto.fit(all_data_df[\"net_worth\"], floc=0)\n",
"\n",
"# Plot the Net Worth Probability Distribution\n",
"plt.figure(figsize=(10, 6))\n",
"\n",
"# Plot histogram of empirical net worth data\n",
"plt.hist(\n",
" all_data_df[\"net_worth\"],\n",
" bins=100,\n",
" density=True,\n",
" alpha=0.6,\n",
" color=\"g\",\n",
" label=\"Empirical Data\",\n",
")\n",
"\n",
"# Plot fitted Pareto distribution\n",
"xmin, xmax = plt.xlim()\n",
"x = np.linspace(xmin, xmax, 100)\n",
"p = pareto.pdf(x, shape, loc=loc, scale=scale)\n",
"plt.plot(x, p, \"k\", linewidth=2, label=\"Fitted Pareto Distribution\")\n",
"\n",
"# Show the plot\n",
"plt.xlabel(\"Net Worth\")\n",
"plt.ylabel(\"Probability\")\n",
"plt.title(\"Net Worth Probability Distribution\")\n",
"plt.legend()\n",
"plt.grid(True)\n",
"plt.show()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}