ManagedkdbInsights/basic_tick_V3/basictick/rdbmkdb.q (137 lines of code) (raw):
/ Basic rdb process
show "RDB: START"
show "Command Line Arguments..."
params:.Q.opt .z.X
show params
/ read in params
tp_name:first params`tp
.rdb.procName:first params`procName
.rdb.volumeName:first params`volumeName
.rdb.hdbProc:first params`hdbProc
.rdb.dbView:first params`dbView
.rdb.savePath:"/opt/kx/app/shared/",.rdb.volumeName,"/",.rdb.procName,"/"
.rdb.database:.aws.akdb
/ cd to code directory
\cd /opt/kx/app/code
/ BEGIN load libraries relative to the code directory
/ Load in lib for querying data and example schema
\l connectmkdb.q
\l query.q
/ END load libraries
/ file deletion functions
diR:{$[11h=type d:key x;raze x,.z.s each` sv/:x,/:d;d]}
nuke:hdel each desc diR@
/ set upd func
upd:insert;
/ init schema and sync up from log file;cd to hdb(so client save can run)
.u.rep:{(.[;();:;].)each x;if[null first y;:()];-11!y;};
.rdb.onTpConnect:{[handle]
show"Subscribed to TP";
/sub to all tables
.u.rep . handle"(.u.sub[`;`];`.u `i`L)"
}
.rdb.establishTpConnection:{[zx]
/ Attempt tp connect to tp. If success sub to tables and turn off timer
if[.conn.connectToProcs[`tp;zx];
show"Connected to TP";
.rdb.onTpConnect[exec first handle from .conn.procs where process=`tp];
.awscust.z.ts:{};
.rdb.tpConnectWait:1;
:()
];
/ If could not connect to tp, increment wait timer by second (backoff) and set to reconnect.
.rdb.tpConnectWait+:1;
.awscust.z.ts:{[x;zx].rdb.establishTpConnection[zx]}[;zx];
show"Could not establish connection to TP waiting ",string[.rdb.tpConnectWait]," seconds";
system"t ",string 1000* .rdb.tpConnectWait;
}
.rdb.getDataNodes:{[tp_name]
tp_nodes: .aws.list_kx_cluster_nodes[tp_name];
tp_conn_strs: {.aws.get_kx_node_connection_string[tp_name;x]} each tp_nodes`node_id;
raze (enlist "-tp"; tp_conn_strs)
}
.rdb.sleep:{t:.z.p; while[.z.p<t+`second$x;]}
.rdb.updateCluster:{[cluster;database;dataview;strategy]
res:.aws.update_kx_cluster_databases[cluster;
.aws.sdbs[
.aws.db[database;"";
{()}.aws.cache["CACHE_250";"/"];
dataview
]
];
.aws.sdep[strategy]];
res
}
.rdb.wait_for_status:{[function;args;statuses;frequency;timeout]
res:function . args;
st:.z.t;
l:0;
$[10h=type statuses;statuses:enlist statuses;]
while [(timeout>ti:.z.t-st) & not (res`status) in statuses;
$[frequency<=ti-l;
(
l:ti;
res:function . args;
show "Waiting: ", sv[" "; args], " status: ", (res`status), " waited: ", string(ti)
);
]
];
show "** Done: ", sv[" "; args]," **";
res
}
/ Publishes a changeset given a directory
.rdb.pushChangeset:{[d]
dt:string d;
dict:flip`input_path`database_path`change_type!(
(`$.rdb.savePath,dt;`$.rdb.savePath,"sym");
(`$"/",dt,"/";`$"/");`PUT`PUT);
cid:.aws.create_changeset[.rdb.database;dict];
cid
}
.rdb.eod:{
-1 "EOD Processing date: ", string[x];
t:tables`.;
t@:where `g=attr each t@\:`sym;
-1 "Table Counts pre-EOD: ", .Q.s1[t!count each value each t];
-1 "Downloading latest sym file";
.aws.get_latest_sym_file[.rdb.database;.rdb.savePath];
-1 "Saving tables";
{.Q.dpft[hsym`$.rdb.savePath;x;`sym;y]}[x] each t;
-1 "Pushing Changeset";
cid:.rdb.pushChangeset[x];
-1 "Waiting for changset: ", cid`id;
res:.rdb.wait_for_status[.aws.get_changeset;(.rdb.database;cid`id);("COMPLETED";"FAILED");00:00:20;30:00];
dview:.aws.get_kx_dataview[.rdb.database;.rdb.dbView];
-1 "Autoupdating Dataview? ", string [dview`auto_update], " database: ", .rdb.database, " view: ", .rdb.dbView;
if[0=dview`auto_update; .aws.update_kx_dataview[.rdb.database;.rdb.dbView;cid`id;dview`segment_configurations]];
res:.rdb.wait_for_status[.aws.get_kx_dataview;(.rdb.database;.rdb.dbView);("ACTIVE";"FAILED");00:00:20;30:00];
-1 "Updating Cluster: ", .rdb.hdbProc;
.rdb.updateCluster[.rdb.hdbProc;.rdb.database;.rdb.dbView;"NO_RESTART"];
res:.rdb.wait_for_status[.aws.get_kx_cluster;enlist .rdb.hdbProc;("RUNNING";"FAILED");00:00:20;30:00];
if["FAILED"~res`status;
-1 "Cluster Failed to Restart, returning early";
:()
];
-1 "EOD Clean up ";
/ clear tables
{x set 0#`.[x]}each t;
@[;`sym;`g#] each t;
/ delete files
nuke hsym`$.rdb.savePath,string[x];
hdel hsym`$.rdb.savePath,"sym";
/ garbage collect
.Q.gc[];
}
/ Account for a static or auto-updating for EOD processing
.u.end:{
.rdb.eod[x]
}
init:{[tp_name]
zx: .rdb.getDataNodes[tp_name];
.rdb.establishTpConnection[zx]
}
show "RDB Init: ", string[.z.z]
init[tp_name]
/ must finished at this path for db reads to work
\cd /opt/kx/app
show "RDB: DONE"