ManagedkdbInsights/basic_tick_V3/basictick/rtsmkdb.q (98 lines of code) (raw):
/ Basic RTS process (leveraging rdbmkdb.q)
show "RTS: START"
show "Command Line Arguments..."
params:.Q.opt .z.X
show params
/ cd to code directory
\cd /opt/kx/app/code
\l connectmkdb.q
tp_name:first params`tp
lastTime:.z.t;
/daily high-low-close-volume
trade_hlcv:([sym:`$()]high:`float$();low:`float$();close:();volume:());
trade_vwap:([sym:`$()]vwap:`float$();volume:`long$())
trade_last:([sym:`$()]time:`timestamp$();price:`float$();size:`long$())
.rdb.onTpConnect:{[handle]
show"Subscribed to TP";
/sub to all tables
.u.rep . handle"(.u.sub[`;`];`.u `i`L)"
}
.rdb.establishTpConnection:{[zx]
/ Attempt tp connect to tp. If success sub to tables and turn off timer
if[.conn.connectToProcs[`tp;zx];
show"connected to TP";
.rdb.onTpConnect[exec first handle from .conn.procs where process=`tp];
.awscust.z.ts:{};
.rdb.tpConnectWait:1;
:()
];
/ If could not connect to tp, increment wait timer by second (backoff) and set to reconnect.
.rdb.tpConnectWait+:1;
.awscust.z.ts:{[x;zx].rdb.establishTpConnection[zx]}[;zx];
show"Could not establish connection to TP waiting ",string[.rdb.tpConnectWait]," seconds";
system"t ",string 1000* .rdb.tpConnectWait;
}
.rdb.getDataNodes:{[tp_name]
tp_nodes: .aws.list_kx_cluster_nodes[tp_name];
tp_conn_strs: {.aws.get_kx_node_connection_string[tp_name;x]} each tp_nodes`node_id;
raze (enlist "-tp"; tp_conn_strs)
}
/ init schema and sync up from log file;cd to hdb(so client save can run)
.u.rep:{(.[;();:;].)each x;if[null first y;:()];-11!y;};
tag:{update calcTs:.z.P, state:x from y};
/last
upd_last:{[t;x]
.[`trade_last;();,;latest:select by sym from x];
.u.pub[`trade_last;tag[`stream] latest];
}
/vwap
upd_vwap:{[t;x]
trade_vwap+:latest:select vwap:size wavg price,volume:sum size by sym from x;
.u.pub[`trade_vwap;tag[`stream] latest];
}
/hlcv
upd_hlcv:{[t;x]
join:(0!trade_hlcv),select sym,high:price,low:price,close:price,volume:size from x;
trade_hlcv::latest:select max high,min low,last close,sum volume by sym from join;
.u.pub[`trade_hlcv;tag[`stream] latest];
}
upd:{[t;x]
if[t~`trade;
upd_last[t;x];
upd_vwap[t;x];
upd_hlcv[t;x];
];
}
.awscust.z.ts:{}
getSnap_vwap:{[x] select from trade_vwap where sym in x}
getSnap_last:{[x] select from trade_last where sym in x}
getSnap_hlcv:{[x] select from trade_hlcv where sym in x}
// snap function handlers
.stream.snap:`trade_vwap`trade_hlcv`trade_last!(getSnap_vwap;getSnap_hlcv;getSnap_last)
// add .u.snap to support snapshots
.u.snap:{[x]
tag[`snap] .stream.snap[x 0]x 1
}
.u.subSnap:{[x;y]
.u.sub .(x;y);
.u.snap (x;y)
}
\t 5000
/ load datafilter analytics
/\l sample/dfilt.q_
/\l sample/querybuilder.q
init:{[tp_name]
zx: .rdb.getDataNodes[tp_name];
.rdb.establishTpConnection[zx]
}
// initialise kdb+tick
// all tables in the top level namespace (`.) become publish-able
// tables that can be published can be seen in .u.w
\l tick/u.q
.u.init[];
note:" " sv ("RTS: init "; string(.z.z))
show note
init[tp_name]
/ must be in this path for db reads to work
\cd /opt/kx/app
show "RTS: DONE"