ManagedkdbInsights/basic_tick_V3/pykx_sub_calc.ipynb (271 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "id": "1334d92e-34f4-448c-b6af-73cba89e9d6d", "metadata": {}, "source": [ "# PyKX Sub-Process Statistics\n", "Use an external q process to connect to a managed kdb cluster and measure communication latency." ] }, { "cell_type": "code", "execution_count": null, "id": "11e9b62a-9cda-4a57-99ca-3b13aff6b7f3", "metadata": {}, "outputs": [], "source": [ "import warnings\n", "warnings.simplefilter(action='ignore', category=FutureWarning)\n", "\n", "import os\n", "import subprocess\n", "import boto3\n", "import json\n", "import datetime\n", "\n", "import pykx as kx\n", "import matplotlib.pyplot as plt\n", "\n", "from managed_kx import *\n", "from env import *\n", "\n", "# Cluster names and database\n", "from config import *\n", "\n", "# ----------------------------------------------------------------\n", "CODEBASE=\"basictick\"\n", "\n", "KILL_SUBSCRIBER = False\n", "# ----------------------------------------------------------------\n" ] }, { "cell_type": "code", "execution_count": null, "id": "cb429a9f-e8a4-41d2-8572-c4e8a49ad387", "metadata": {}, "outputs": [], "source": [ "# Using credentials and create service client\n", "session = boto3.Session()\n", "\n", "# create finspace client\n", "client = session.client(service_name='finspace')" ] }, { "cell_type": "markdown", "id": "c097c949", "metadata": {}, "source": [ "# Start q process that subscribes to RTS\n", "This notebook will record the measured latency in communications between this created q process and the RTS cluster.\n", "\n", "## Configuration\n", "Environment variable QHOME is set to where q is locally to this notebook.\n" ] }, { "cell_type": "code", "execution_count": null, "id": "62f12aa1", "metadata": {}, "outputs": [], "source": [ "# get the connection string\n", "conn_str = get_kx_connection_string(client, environmentId=ENV_ID, clusterName=RTS_CLUSTER_NAME, userName=KDB_USERNAME, boto_session=session)\n", "\n", "subscriber_mode = \"trade_last\" \n", "#subscriber_mode = \"trade_vwap\" \n", "#subscriber_mode = \"trade_hlcv\" \n", "\n", "# Is process already running?\n", "create = True\n", "\n", "try:\n", " # Will connect if running\n", " with kx.QConnection(port=SUBSCRIBER_PORT) as q:\n", " create=False \n", " print(f\"Running on port: {SUBSCRIBER_PORT}\")\n", "except RuntimeError:\n", " pass\n", "except:\n", " pass\n", "\n", "if create: \n", " try:\n", " # start q process kxtaqsubscriber to connect to the TP at $TP_CONN\n", " if os.getenv('QHOME') is not None:\n", " subprocess.Popen(f'cd {CODEBASE}; nohup $QHOME/l64/q kxtaqsubscriber.q -p {SUBSCRIBER_PORT} -endpoint \"{conn_str}\" -mode {subscriber_mode}', shell=True)\n", " print(f\"Started on port: {SUBSCRIBER_PORT}\")\n", " else:\n", " print(\"Environment variable QHOME is not set, please set to where kdb is installed\")\n", " except:\n", " pass\n", "\n", " # wait for RTS subscriber to start collecting data\n", " time.sleep(10)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "d01a490c", "metadata": {}, "outputs": [], "source": [ "%%q \n", "profile:hopen`::5040\n", "percentile:{(asc x) floor y*count x}\n", "\n", "results:{\n", " select \n", " counter:count i, \n", " min_latency:min source_to_consumer_latency, \n", " max_latency:max source_to_consumer_latency, \n", " latency_p50:percentile[source_to_consumer_latency;.50], \n", " latency_p90:percentile[source_to_consumer_latency;.90], \n", " latency_p99:percentile[source_to_consumer_latency;.99], \n", " avg_source_to_calc_latency:\"n\"$avg source_to_calc_latency, \n", " avg_source_to_consumer_latency:\"n\"$avg source_to_consumer_latency \n", " from profile\".perf.stats\"\n", " }" ] }, { "cell_type": "code", "execution_count": null, "id": "5c52a775", "metadata": {}, "outputs": [], "source": [ "display( kx.q('results[]').pd() )" ] }, { "cell_type": "code", "execution_count": null, "id": "dcf2f9a5", "metadata": {}, "outputs": [], "source": [ "def plot_time_vs_timedelta(dataframe, x_column, y_column):\n", " # Convert timedelta to seconds for plotting\n", " dataframe['y_seconds'] = dataframe[y_column].dt.total_seconds()\n", "\n", " plt.figure(figsize=(10,6))\n", " # Plot\n", " plt.plot(dataframe[x_column], dataframe['y_seconds'])\n", " \n", " # Set labels and title\n", " plt.xlabel('Time')\n", " plt.ylabel('Time Delta (seconds)')\n", " plt.title('Time vs Time Delta')\n", "\n", " # Show plot\n", " plt.show()" ] }, { "cell_type": "code", "execution_count": null, "id": "c37d39e8", "metadata": {}, "outputs": [], "source": [ "# get current results\n", "results = kx.q('results[]').pd() \n", "\n", "# if not 200 collected, wait for more (makes for a better graph)\n", "if results['counter'][0] < 200:\n", " time.sleep(20)\n", " results = kx.q('results[]').pd() \n", "\n", "display( results )\n", "\n", "# get raw stats and display as table\n", "stats = kx.q('profile\".perf.stats\"').pd()\n", "\n", "# Plot the stats\n", "plot_time_vs_timedelta(stats,'receiveTime','source_to_consumer_latency')" ] }, { "cell_type": "markdown", "id": "63296ea3-1fd9-4b17-a8e3-9d73748f5c76", "metadata": {}, "source": [ "# Cleanup\n", "\n", "Connect to the running process created above and have it exit.\n", "```\n", "$ q\n", "q) h:hopen`::5040\n", "q) @[h; \"exit 0\", {}]\n", "```\n", "\n", "KILL_SUBSCRIBER must be True for the process to be killed." ] }, { "cell_type": "code", "execution_count": null, "id": "304cb2cf-072a-4615-afcc-dfff14142ab1", "metadata": {}, "outputs": [], "source": [ "#KILL_SUBSCRIBER=True\n", "\n", "if KILL_SUBSCRIBER:\n", " try:\n", " with kx.QConnection(port=SUBSCRIBER_PORT) as q:\n", " q(\"exit 0\") \n", " except RuntimeError:\n", " print(\"Stopped\")\n", "else:\n", " print(\"Subscriber not killed\")" ] }, { "cell_type": "code", "execution_count": null, "id": "87acba9e", "metadata": {}, "outputs": [], "source": [ "print( f\"Last Run: {datetime.datetime.now()}\" )" ] }, { "cell_type": "code", "execution_count": null, "id": "0e54a9a4-f5c8-4e62-924a-9fbbff9329d3", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "conda_pytorch_p310", "language": "python", "name": "conda_pytorch_p310" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.14" } }, "nbformat": 4, "nbformat_minor": 5 }