supporting-blog-content/esql-millionaire-odds/millionaire.ipynb (265 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Millionaire Odds vs. Hit by a Bus: An ESQL Analysis" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Python modules" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip cache purge\n", "!pip uninstall -y elasticsearch\n", "!pip install elasticsearch==8.14 pandas matplotlib spicy" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Loading Synthetic data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "import numpy as np\n", "from elasticsearch import Elasticsearch, helpers\n", "\n", "# Initialize Elasticsearch client\n", "client = Elasticsearch(\n", " hosts=\"YOUR_DEPLOYMENT_CLOUD_ID\",\n", " api_key=\"YOUR_DEPLOYMENT_API_KEY\",\n", ")\n", "\n", "\n", "# Generate synthetic data with a highly skewed distribution\n", "num_records = 500000\n", "np.random.seed(42) # Ensure reproducibility\n", "\n", "# Generate net worth using a highly skewed distribution\n", "ages = np.random.randint(20, 80, num_records) # Random ages between 20 and 80\n", "incomes = np.random.exponential(\n", " scale=10000, size=num_records\n", ") # Exponential distribution for income\n", "# Use a more skewed distribution for net worth with a much larger range\n", "net_worths = np.random.exponential(\n", " scale=100000000, size=num_records\n", ") # Extremely skewed net worth\n", "\n", "# Scale up the net worths to reach up to $100 billion\n", "net_worths = np.clip(net_worths, 0, 100000000000)\n", "\n", "# Create DataFrame\n", "df = pd.DataFrame(\n", " {\n", " \"id\": range(1, num_records + 1),\n", " \"age\": ages,\n", " \"income\": incomes,\n", " \"net_worth\": net_worths,\n", " \"counter\": range(1, num_records + 1), # Add a counter field for pagination\n", " }\n", ")\n", "\n", "# Index the data into Elasticsearch\n", "index_name = \"raw_wealth_data_large\"\n", "client.indices.delete(index=index_name, ignore=[400, 404])\n", "client.indices.create(index=index_name, ignore=400)\n", "\n", "\n", "def generator(df):\n", " for index, row in df.iterrows():\n", " yield {\"_index\": index_name, \"_source\": row.to_dict()}\n", "\n", "\n", "helpers.bulk(client, generator(df))\n", "\n", "print(\"Data indexed successfully.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Pulling data with ES|QL and creating a dataframe" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "import numpy as np\n", "from elasticsearch import Elasticsearch, helpers\n", "from io import StringIO\n", "import matplotlib.pyplot as plt\n", "from scipy.stats import pareto\n", "\n", "\n", "# Function to execute ESQL query and fetch data in chunks\n", "def execute_esql_query(query):\n", " response = client.esql.query(query=query, format=\"csv\")\n", " return pd.read_csv(StringIO(response.body))\n", "\n", "\n", "# Function to fetch paginated data using the counter field\n", "def fetch_paginated_data(index, num_records, size=10000):\n", " all_data = pd.DataFrame()\n", " for start in range(1, num_records + 1, size):\n", " end = start + size - 1\n", " query = f\"\"\"\n", " FROM {index}\n", " | WHERE counter >= {start} AND counter <= {end}\n", " | limit {size}\n", " \"\"\"\n", " data_chunk = execute_esql_query(query)\n", " all_data = pd.concat([all_data, data_chunk], ignore_index=True)\n", " return all_data\n", "\n", "\n", "# Fetch all data using pagination and ESQL\n", "num_records = 500000\n", "all_data_df = fetch_paginated_data(index_name, num_records)\n", "print(f\"Total Data Retrieved: {len(all_data_df)} records\")\n", "\n", "# Fit a Pareto distribution to the data\n", "shape, loc, scale = pareto.fit(all_data_df[\"net_worth\"], floc=0)\n", "\n", "# Calculate the probability density for each net worth\n", "all_data_df[\"net_worth_probability\"] = pareto.pdf(\n", " all_data_df[\"net_worth\"], shape, loc=loc, scale=scale\n", ")\n", "\n", "# Normalize the probabilities to sum to 1\n", "all_data_df[\"net_worth_probability\"] /= all_data_df[\"net_worth_probability\"].sum()\n", "\n", "print(\"Data with Net Worth Probability:\")\n", "print(all_data_df.head())\n", "\n", "# Find the Net Worth Corresponding to the Bus Hit Probability\n", "target_probability = 0.0000181\n", "cumulative_probability = all_data_df[\"net_worth_probability\"].cumsum()\n", "target_net_worth_df = all_data_df[cumulative_probability >= target_probability].head(1)\n", "target_net_worth = target_net_worth_df[\"net_worth\"].iloc[0]\n", "print(f\"Net Worth with Probability >= {target_probability}: {target_net_worth}\")\n", "\n", "# Plot the Net Worth Probability Distribution\n", "plt.figure(figsize=(10, 6))\n", "plt.hist(\n", " all_data_df[\"net_worth\"],\n", " bins=100,\n", " density=True,\n", " alpha=0.6,\n", " color=\"g\",\n", " label=\"Empirical Data\",\n", ")\n", "xmin, xmax = plt.xlim()\n", "x = np.linspace(xmin, xmax, 100)\n", "p = pareto.pdf(x, shape, loc=loc, scale=scale)\n", "plt.plot(x, p, \"k\", linewidth=2, label=\"Fitted Pareto Distribution\")\n", "plt.axhline(\n", " y=target_probability, color=\"r\", linestyle=\"--\", label=\"Bus Hit Probability\"\n", ")\n", "plt.axvline(\n", " x=target_net_worth,\n", " color=\"g\",\n", " linestyle=\"--\",\n", " label=f\"Net Worth = {target_net_worth:.2f}\",\n", ")\n", "plt.xlabel(\"Net Worth\")\n", "plt.ylabel(\"Probability\")\n", "plt.title(\"Net Worth Probability Distribution\")\n", "plt.legend()\n", "plt.grid(True)\n", "plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Fit Pareto Distribution" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import matplotlib.pyplot as plt\n", "from scipy.stats import pareto\n", "\n", "# Assuming all_data_df contains the fetched net worth data from Elasticsearch\n", "# Fit a Pareto distribution to the data\n", "shape, loc, scale = pareto.fit(all_data_df[\"net_worth\"], floc=0)\n", "\n", "# Plot the Net Worth Probability Distribution\n", "plt.figure(figsize=(10, 6))\n", "\n", "# Plot histogram of empirical net worth data\n", "plt.hist(\n", " all_data_df[\"net_worth\"],\n", " bins=100,\n", " density=True,\n", " alpha=0.6,\n", " color=\"g\",\n", " label=\"Empirical Data\",\n", ")\n", "\n", "# Plot fitted Pareto distribution\n", "xmin, xmax = plt.xlim()\n", "x = np.linspace(xmin, xmax, 100)\n", "p = pareto.pdf(x, shape, loc=loc, scale=scale)\n", "plt.plot(x, p, \"k\", linewidth=2, label=\"Fitted Pareto Distribution\")\n", "\n", "# Show the plot\n", "plt.xlabel(\"Net Worth\")\n", "plt.ylabel(\"Probability\")\n", "plt.title(\"Net Worth Probability Distribution\")\n", "plt.legend()\n", "plt.grid(True)\n", "plt.show()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.5" } }, "nbformat": 4, "nbformat_minor": 2 }