# PyKX Query Components
Query all components of the application.

## Architecture
<img src="images/Deepdive Diagrams-BasicTick V3.drawio.png"  width="80%">


In [1]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

import os
import boto3
import json
import datetime

import pykx as kx

from env import *
from managed_kx import *

# Cluster names and database
from config import *


In [2]:
# Using credentials and create service client
session = boto3.Session()

# create finspace client
client = session.client(service_name='finspace')

# Connections to Clusters

In [3]:
hdb = get_pykx_connection(client, 
                          environmentId=ENV_ID, clusterName=HDB_CLUSTER_NAME, 
                          userName=KDB_USERNAME, boto_session=session)
tp  = get_pykx_connection(client, 
                           environmentId=ENV_ID, clusterName=TP_CLUSTER_NAME, 
                           userName=KDB_USERNAME, boto_session=session)
rdb = get_pykx_connection(client, 
                           environmentId=ENV_ID, clusterName=RDB_CLUSTER_NAME, 
                           userName=KDB_USERNAME, boto_session=session)
rts = get_pykx_connection(client, 
                           environmentId=ENV_ID, clusterName=RTS_CLUSTER_NAME, 
                           userName=KDB_USERNAME, boto_session=session)
gw = get_pykx_connection(client, 
                          environmentId=ENV_ID, clusterName=GW_CLUSTER_NAME, 
                          userName=KDB_USERNAME, boto_session=session)

# Query the HDB
With the HDB connection, query its data.

In [4]:
# ensure database is loaded
hdb('.Q.lo[hsym`$.aws.akdbp,"/",.aws.akdb,"/";0b;0b]')

# inventory of tables in the database and rows in each
print("All Tables and Counts")
display( hdb("tables[]!count each value each tables[]") )
print(40*'=')

# Dates and Counts of one table
tables = hdb('tables[]').py()

for t in tables:
    # anything to display?
    tt = hdb(f"select {t}s:count i by date from {t} where date in 10#desc date").pd()
    r = rdb(f'count {t}').py()

    if r == 0: 
        continue

    print(f'{t}: {r:,}')
    print(40*'-')
    display(tt)
    print(40*'=')

All Tables and Counts


quote: 547,477
----------------------------------------


Unnamed: 0_level_0,quotes
date,Unnamed: 1_level_1
2024-11-18,4323449
2024-11-19,4440838
2024-11-20,4446429
2024-11-21,4422176
2024-11-22,4447795
2024-11-25,4424949


trade: 109,894
----------------------------------------


Unnamed: 0_level_0,trades
date,Unnamed: 1_level_1
2024-11-18,866361
2024-11-19,888436
2024-11-20,888187
2024-11-21,883938
2024-11-22,889931
2024-11-25,884716




# Query the RBD
With the RDB connection, query its data. Will use a q magic cell to send a function to the RDB and run it from Python as well.


In [5]:
# inventory of tables in the database and rows in each
print("Counts")
display( rdb("tables[]!count each value each tables[]") )

# last timestamps in each table
print("Last Times")
display( rdb("tables[]!{exec first max `time$time from x}each tables[]") )
print()
print(40*'=')


# Dates and Counts of one table
tables = rdb('tables[]').py()

for t in tables:
    r = rdb(f'count {t}').py()

    if r == 0: 
        continue

    print(f'{t}: {r:,}')
    print(40*'-')
    # Summarize table by hour
    display( rdb(f"select {t}s:count i by hour:`hh$time from {t}") )
    print(40*'=')

Counts


Last Times



quote: 547,477
----------------------------------------


Unnamed: 0_level_0,quotes
hour,Unnamed: 1_level_1
16i,322730
17i,224747


trade: 109,894
----------------------------------------


Unnamed: 0_level_0,trades
hour,Unnamed: 1_level_1
16i,64532
17i,45362




## Define a function on the RDB
Using a q magic cell, define a function on the RDB.

In [6]:
# get the RDB conneciton string
rdb_conn_str = get_kx_connection_string(client, 
                                  environmentId=ENV_ID, clusterName=RDB_CLUSTER_NAME, 
                                   userName=KDB_USERNAME, boto_session=session)

# parse the RDB connection string to its components
host, port, username, password = parse_connection_string(rdb_conn_str)

In [7]:
%%q --host $host --port $port --user $username --pass $password

/ define a function to calculate TWAP
generateTWAP:{[syms;st;et] 
    if[syms~`;syms:exec distinct sym from trade];
    // Calculate statistics from trade and quote tables, join the tables with 
    // appropriate join function in this case a union join 
    quoteMetrics:select avg_spread:avg (ask-bid),twa_spread:(next[time]- time) wavg (ask-bid), avg_size:0.5*avg (asize+bsize),avg_duration:"t"$avg next[time]-time by sym from quote where sym in syms,time within(st;et); 
    tradeMetrics:select std_dev:2*dev price, twap:(next[time]-time) wavg price,max_price:max price, min_price:min price,vwap:size wavg price by sym from trade where sym in syms,time within(st;et); 
    quoteMetrics uj tradeMetrics 
 }


## Call Function on RDB and Display Results
Function is called on the RDB, results are then returned as a Pandas DataFrame using PyKX and display the results as a table in the notebook.

In [8]:
# Call the function for all tickers and a time range
display( rdb("generateTWAP[`;00:00:00.040; 23:59:59.999]") )

# call the function for some tickers and another range
display( rdb("generateTWAP[`AAPL`IBM;00:00:00.040; 23:59:59.999]") )

Unnamed: 0_level_0,avg_spread,twa_spread,avg_size,avg_duration,std_dev,twap,max_price,min_price,vwap
sym,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
AAPL,1.005583,0.9942711,54.51072,11:28:07.492,1.911798,83.05152,85.34,80.03,83.07975
AIG,1.001028,1.254725,54.74056,28:13:06.874,0.4425576,26.95132,27.85,26.56,27.03385
AMD,1.005192,1.092699,54.37874,19:11:11.378,0.7167555,33.62228,34.52,32.52,33.5295
DELL,1.006072,0.9395575,54.53501,28:59:00.108,0.2057405,12.31718,12.52,12.02,12.28705
DOW,1.005755,0.8788709,54.71421,57:31:11.042,0.205959,20.19641,20.49,19.87,20.19495
GOOG,0.9981131,1.118494,54.41368,09:36:42.553,1.949393,72.8231,75.07,69.4,72.4923
HPQ,1.004438,0.9333208,54.51127,28:50:46.271,0.5793449,35.98533,37.06,35.24,36.03893
IBM,0.9995193,0.4679382,54.40382,28:41:29.838,0.6769956,42.82357,43.73,41.48,42.61401
INTC,0.9987423,0.8842898,54.30217,19:00:18.225,0.9998559,50.80405,52.68,49.89,51.04574
MSFT,0.9958642,1.012356,54.43309,19:24:25.302,0.6744345,29.6254,30.42,28.8,29.56774


Unnamed: 0_level_0,avg_spread,twa_spread,avg_size,avg_duration,std_dev,twap,max_price,min_price,vwap
sym,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
AAPL,1.005583,0.9942711,54.51072,11:28:07.492,1.911798,83.05152,85.34,80.03,83.07975
IBM,0.9995193,0.4679382,54.40382,28:41:29.838,0.6769956,42.82357,43.73,41.48,42.61401


# Query the GW

In [9]:
# Query the GW for its connected processes, are all connected?
proc_pdf = gw("select process, handle, connected, address from .conn.procs").pd()

# are any processes not connected? if so-reconnect
if (len(proc_pdf) == 0) or len(proc_pdf[proc_pdf.connected == False].index) > 0:
    print("reinit Gateway")
    gw("reinit[hdb_name; rdb_name]")
    proc_pdf = gw("select process, handle, connected, address from .conn.procs").pd()

# truncate address with elipsis
proc_pdf['address'] = proc_pdf['address'].str.slice(0,110)+"..."

# display table
display(proc_pdf)

Unnamed: 0,process,handle,connected,address
0,rdb,12,True,:tcps://ip-192-168-7-230.ec2.internal:443:GATEWAY_basictickdb:Host=ip-192-168-7-230.ec2.internal&Port=443&User...
1,hdb,13,True,:tcps://ip-192-168-8-181.ec2.internal:443:GATEWAY_basictickdb:Host=ip-192-168-8-181.ec2.internal&Port=443&User...
2,hdb,14,True,:tcps://ip-192-168-14-13.ec2.internal:443:GATEWAY_basictickdb:Host=ip-192-168-14-13.ec2.internal&Port=443&User...


In [10]:
# query GW using queryData function on gateway

# query and sample specific table for date range (today -3 days to tomorrow)
gw("res: `time xasc queryData[`trade;`;.z.D-3;.z.D+1]").pd()

# first/last 5 rows from res
display( gw("select [3] from res").pd() )
display( gw("select [-3] from res").pd() )

# select for a specific table from ticker with time range (5 days ago to now)
display( hdb(".query.data[`trade;`IBM;.z.P-5D;.z.P]") )

Unnamed: 0,sym,time,price,size,source
0,SBUX,2024-11-25 09:30:00.000003557,64.83,93,HDB
1,HPQ,2024-11-25 09:30:00.000046186,37.39,71,HDB
2,GOOG,2024-11-25 09:30:00.000050642,76.4,20,HDB


Unnamed: 0,sym,time,price,size,source
0,DOW,2024-11-26 17:27:55.810468123,20.15,14,RDB
1,DELL,2024-11-26 17:27:55.810468123,12.33,52,RDB
2,INTC,2024-11-26 17:27:55.810468123,52.05,98,RDB


Unnamed: 0,time,sym,price,size,source
,,,,,
0,2024.11.21D17:28:00.374830474,IBM,45.5,12,HDB
1,2024.11.21D17:28:00.426898373,IBM,45.48,13,HDB
2,2024.11.21D17:28:00.750113726,IBM,45.52,25,HDB
3,2024.11.21D17:28:00.986644218,IBM,45.59,62,HDB
4,2024.11.21D17:28:01.010840251,IBM,45.56,53,HDB
5,2024.11.21D17:28:01.060328649,IBM,45.53,33,HDB
6,2024.11.21D17:28:01.072235192,IBM,45.52,58,HDB
7,2024.11.21D17:28:01.530823113,IBM,45.61,58,HDB
8,2024.11.21D17:28:01.928528846,IBM,45.63,24,HDB


# Query the RTS Cluster
The RTS cluster is subscribing to the tickerplant and maintaining another set of tables. Connect to the RTS and show the contents of its tables.

In [11]:
# inventory of tables in the database and rows in each
print("All Tables and Counts")
display( rts("tables[]!count each value each tables[]") )

tables = rts('tables[]').py()

# show contents of tables
for t in tables:
    # anything to display?
    tt = rts(f"select from {t}").pd()
    r = len(tt.index)

    # nothing in table
    if r == 0: 
        continue

    # print table contents
    print(f'{t}: {r:,}')
    print(100*'=')

    # Contents of table, transponse if small
    if r < 20:
        display(tt.T)
    else:
        display(tt)
    print(100*'-')


All Tables and Counts


trade_hlcv: 15


sym,AAPL,AIG,AMD,DELL,DOW,GOOG,HPQ,IBM,INTC,MSFT,ORCL,PEP,PRU,SBUX,TXN
high,85.34,27.85,34.52,12.52,20.49,75.07,37.06,43.73,52.68,30.42,36.38,22.63,60.41,65.25,18.19
low,80.03,26.56,32.52,12.02,19.87,69.4,35.24,41.48,49.89,28.8,34.98,21.49,58.2,60.79,17.57
close,83.86,26.84,33.56,12.33,20.15,73.03,36.0,42.86,52.05,29.51,35.64,22.26,59.5,62.24,17.8
volume,719833.0,290701.0,435090.0,281367.0,144134.0,853911.0,284019.0,289895.0,432180.0,426393.0,293940.0,565152.0,277317.0,567218.0,145116.0


----------------------------------------------------------------------------------------------------
trade_last: 15


sym,AAPL,AIG,AMD,DELL,DOW,GOOG,HPQ,IBM,INTC,MSFT,ORCL,PEP,PRU,SBUX,TXN
time,2024-11-26 17:27:55.810468123,2024-11-26 17:27:55.810468123,2024-11-26 17:27:55.810468123,2024-11-26 17:27:55.810468123,2024-11-26 17:27:55.810468123,2024-11-26 17:27:55.810468123,2024-11-26 17:27:55.810468123,2024-11-26 17:27:55.810468123,2024-11-26 17:27:55.810468123,2024-11-26 17:27:55.810468123,2024-11-26 17:27:55.810468123,2024-11-26 17:27:55.810468123,2024-11-26 17:27:55.810468123,2024-11-26 17:27:55.810468123,2024-11-26 17:27:55.810468123
price,83.86,26.84,33.56,12.33,20.15,73.03,36.0,42.86,52.05,29.51,35.64,22.26,59.5,62.24,17.8
size,82,72,79,52,14,56,67,62,98,45,98,98,72,97,50


----------------------------------------------------------------------------------------------------
trade_vwap: 15


sym,AAPL,AIG,AMD,DELL,DOW,GOOG,HPQ,IBM,INTC,MSFT,ORCL,PEP,PRU,SBUX,TXN
vwap,8973.671857,2920.2723,3621.391134,1326.710839,2181.112313,7829.765631,3891.171244,4603.00785,5515.438957,3193.378522,3838.688921,2379.048779,6407.095205,6806.223528,1932.874539
volume,719833.0,290701.0,435090.0,281367.0,144134.0,853911.0,284019.0,289895.0,432180.0,426393.0,293940.0,565152.0,277317.0,567218.0,145116.0


----------------------------------------------------------------------------------------------------


In [12]:
print( f"Last Run: {datetime.datetime.now()}" )

Last Run: 2024-11-26 17:28:00.176554
