ManagedkdbInsights/basic_tick_V3/pykx_sub_calc.ipynb (271 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"id": "1334d92e-34f4-448c-b6af-73cba89e9d6d",
"metadata": {},
"source": [
"# PyKX Sub-Process Statistics\n",
"Use an external q process to connect to a managed kdb cluster and measure communication latency."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "11e9b62a-9cda-4a57-99ca-3b13aff6b7f3",
"metadata": {},
"outputs": [],
"source": [
"import warnings\n",
"warnings.simplefilter(action='ignore', category=FutureWarning)\n",
"\n",
"import os\n",
"import subprocess\n",
"import boto3\n",
"import json\n",
"import datetime\n",
"\n",
"import pykx as kx\n",
"import matplotlib.pyplot as plt\n",
"\n",
"from managed_kx import *\n",
"from env import *\n",
"\n",
"# Cluster names and database\n",
"from config import *\n",
"\n",
"# ----------------------------------------------------------------\n",
"CODEBASE=\"basictick\"\n",
"\n",
"KILL_SUBSCRIBER = False\n",
"# ----------------------------------------------------------------\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "cb429a9f-e8a4-41d2-8572-c4e8a49ad387",
"metadata": {},
"outputs": [],
"source": [
"# Using credentials and create service client\n",
"session = boto3.Session()\n",
"\n",
"# create finspace client\n",
"client = session.client(service_name='finspace')"
]
},
{
"cell_type": "markdown",
"id": "c097c949",
"metadata": {},
"source": [
"# Start q process that subscribes to RTS\n",
"This notebook will record the measured latency in communications between this created q process and the RTS cluster.\n",
"\n",
"## Configuration\n",
"Environment variable QHOME is set to where q is locally to this notebook.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "62f12aa1",
"metadata": {},
"outputs": [],
"source": [
"# get the connection string\n",
"conn_str = get_kx_connection_string(client, environmentId=ENV_ID, clusterName=RTS_CLUSTER_NAME, userName=KDB_USERNAME, boto_session=session)\n",
"\n",
"subscriber_mode = \"trade_last\" \n",
"#subscriber_mode = \"trade_vwap\" \n",
"#subscriber_mode = \"trade_hlcv\" \n",
"\n",
"# Is process already running?\n",
"create = True\n",
"\n",
"try:\n",
" # Will connect if running\n",
" with kx.QConnection(port=SUBSCRIBER_PORT) as q:\n",
" create=False \n",
" print(f\"Running on port: {SUBSCRIBER_PORT}\")\n",
"except RuntimeError:\n",
" pass\n",
"except:\n",
" pass\n",
"\n",
"if create: \n",
" try:\n",
" # start q process kxtaqsubscriber to connect to the TP at $TP_CONN\n",
" if os.getenv('QHOME') is not None:\n",
" subprocess.Popen(f'cd {CODEBASE}; nohup $QHOME/l64/q kxtaqsubscriber.q -p {SUBSCRIBER_PORT} -endpoint \"{conn_str}\" -mode {subscriber_mode}', shell=True)\n",
" print(f\"Started on port: {SUBSCRIBER_PORT}\")\n",
" else:\n",
" print(\"Environment variable QHOME is not set, please set to where kdb is installed\")\n",
" except:\n",
" pass\n",
"\n",
" # wait for RTS subscriber to start collecting data\n",
" time.sleep(10)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d01a490c",
"metadata": {},
"outputs": [],
"source": [
"%%q \n",
"profile:hopen`::5040\n",
"percentile:{(asc x) floor y*count x}\n",
"\n",
"results:{\n",
" select \n",
" counter:count i, \n",
" min_latency:min source_to_consumer_latency, \n",
" max_latency:max source_to_consumer_latency, \n",
" latency_p50:percentile[source_to_consumer_latency;.50], \n",
" latency_p90:percentile[source_to_consumer_latency;.90], \n",
" latency_p99:percentile[source_to_consumer_latency;.99], \n",
" avg_source_to_calc_latency:\"n\"$avg source_to_calc_latency, \n",
" avg_source_to_consumer_latency:\"n\"$avg source_to_consumer_latency \n",
" from profile\".perf.stats\"\n",
" }"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5c52a775",
"metadata": {},
"outputs": [],
"source": [
"display( kx.q('results[]').pd() )"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "dcf2f9a5",
"metadata": {},
"outputs": [],
"source": [
"def plot_time_vs_timedelta(dataframe, x_column, y_column):\n",
" # Convert timedelta to seconds for plotting\n",
" dataframe['y_seconds'] = dataframe[y_column].dt.total_seconds()\n",
"\n",
" plt.figure(figsize=(10,6))\n",
" # Plot\n",
" plt.plot(dataframe[x_column], dataframe['y_seconds'])\n",
" \n",
" # Set labels and title\n",
" plt.xlabel('Time')\n",
" plt.ylabel('Time Delta (seconds)')\n",
" plt.title('Time vs Time Delta')\n",
"\n",
" # Show plot\n",
" plt.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c37d39e8",
"metadata": {},
"outputs": [],
"source": [
"# get current results\n",
"results = kx.q('results[]').pd() \n",
"\n",
"# if not 200 collected, wait for more (makes for a better graph)\n",
"if results['counter'][0] < 200:\n",
" time.sleep(20)\n",
" results = kx.q('results[]').pd() \n",
"\n",
"display( results )\n",
"\n",
"# get raw stats and display as table\n",
"stats = kx.q('profile\".perf.stats\"').pd()\n",
"\n",
"# Plot the stats\n",
"plot_time_vs_timedelta(stats,'receiveTime','source_to_consumer_latency')"
]
},
{
"cell_type": "markdown",
"id": "63296ea3-1fd9-4b17-a8e3-9d73748f5c76",
"metadata": {},
"source": [
"# Cleanup\n",
"\n",
"Connect to the running process created above and have it exit.\n",
"```\n",
"$ q\n",
"q) h:hopen`::5040\n",
"q) @[h; \"exit 0\", {}]\n",
"```\n",
"\n",
"KILL_SUBSCRIBER must be True for the process to be killed."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "304cb2cf-072a-4615-afcc-dfff14142ab1",
"metadata": {},
"outputs": [],
"source": [
"#KILL_SUBSCRIBER=True\n",
"\n",
"if KILL_SUBSCRIBER:\n",
" try:\n",
" with kx.QConnection(port=SUBSCRIBER_PORT) as q:\n",
" q(\"exit 0\") \n",
" except RuntimeError:\n",
" print(\"Stopped\")\n",
"else:\n",
" print(\"Subscriber not killed\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "87acba9e",
"metadata": {},
"outputs": [],
"source": [
"print( f\"Last Run: {datetime.datetime.now()}\" )"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0e54a9a4-f5c8-4e62-924a-9fbbff9329d3",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "conda_pytorch_p310",
"language": "python",
"name": "conda_pytorch_p310"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.14"
}
},
"nbformat": 4,
"nbformat_minor": 5
}