ManagedkdbInsights/basic_tick_V3/create_all.ipynb (1,657 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"id": "28bea13b-67bd-4a0e-8eab-3b8ffd37259e",
"metadata": {},
"source": [
"# BasicTick V3: Create Everything\n",
"This notebook will use the AWS Python boto3 APIs to create the needed resources for a basic tick application. This application will simulate a market data capture system. \n",
"\n",
"## Architecture\n",
"<img src=\"images/Deepdive Diagrams-BasicTick V3.drawio.png\" width=\"80%\">\n",
"\n",
"## Abbreviations\n",
"- RTS: Realtime Subscriber\n",
"- FH: Feedhandler \n",
"- HDB: Historical Database\n",
"- RDB: Realtime Database \n",
"- TP: Tickerplant \n",
"- GW: Gateway\n",
"\n",
"## AWS Resources Created\n",
"- Database \n",
"- Changeset (adds data to database) \n",
"- Scaling Group in which all clusters are run \n",
"- Shared Volume used by database view and clusters \n",
"- Dataview of database on the shared volume\n",
" - option: view can be auto-updating or static\n",
"- Clusters: TP, RTS, RDB, HDB, and GW \n",
"\n",
"### Non AWS\n",
"For this demonstration application the FH is run locally and publishes data to the TP. \n",
"\n",
"# References\n",
"[Basic tick system](https://code.kx.com/insights/1.10/core/qpexample-tick/index.html) \n",
"[KxSystems/kdb-tick (github)](https://github.com/KxSystems/kdb-tick) \n",
"[Architecture of kdb+ systems](https://code.kx.com/q/architecture/) \n"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "65fe5c87-5cc0-4d8b-a324-ff7d5eeba12e",
"metadata": {},
"outputs": [],
"source": [
"#!pip install -r requirements.txt"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "0d5f1d4a-ed45-44e3-bf75-9bdb75fcddbb",
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import subprocess\n",
"import boto3\n",
"import json\n",
"import datetime\n",
"\n",
"import pykx as kx\n",
"\n",
"from env import *\n",
"from managed_kx import *\n",
"\n",
"# Cluster names and database\n",
"from config import *\n",
"\n",
"# ----------------------------------------------------------------\n",
"\n",
"#NODE_TYPE=\"kx.sg.4xlarge\"\n",
"#NODE_TYPE=\"kx.sg.2xlarge\"\n",
"NODE_TYPE=\"kx.sg.xlarge\"\n",
"\n",
"CODE_CONFIG={ 's3Bucket': S3_BUCKET, 's3Key': f'{S3_CODE_PATH}/{CODEBASE}.zip' }\n",
"\n",
"NAS1_CONFIG= {\n",
" 'type': 'SSD_250',\n",
" 'size': 1200\n",
"}\n",
"\n",
"# Realtime Database (RDB) Configs\n",
"RDB_INIT_SCRIPT='rdbmkdb.q'\n",
"RDB_CMD_ARGS=[\n",
" { 'key': 's', 'value': '2' }, \n",
" { 'key': 'g', 'value': '1' }, \n",
" { 'key': 'tp', 'value': TP_CLUSTER_NAME }, \n",
" { 'key': 'procName', 'value': RDB_CLUSTER_NAME }, \n",
" { 'key': 'volumeName', 'value': VOLUME_NAME }, \n",
" { 'key': 'hdbProc', 'value': HDB_CLUSTER_NAME }, \n",
" { 'key': 'dbView', 'value': DBVIEW_NAME }, \n",
" { 'key': 'AWS_ZIP_DEFAULT', 'value': '17,2,6' },\n",
"]\n",
"\n",
"# RTS Configs\n",
"RTS_INIT_SCRIPT='rtsmkdb.q'\n",
"RTS_CMD_ARGS = [\n",
" { 'key': 's', 'value': '2' }, \n",
" { 'key': 'g', 'value': '1' }, \n",
" { 'key': 'tp', 'value': TP_CLUSTER_NAME }, \n",
"]\n",
"\n",
"# Tickerplant (TP) Configs\n",
"TP_INIT_SCRIPT='tick.q'\n",
"TP_CMD_ARGS=[\n",
" { 'key': 'procName', 'value': TP_CLUSTER_NAME }, \n",
" { 'key': 'volumeName', 'value': VOLUME_NAME }, \n",
" { 'key': 'g', 'value': '1' }, \n",
"]\n",
"\n",
"# Historical Database (HDB) Configs\n",
"HDB_INIT_SCRIPT='hdbmkdb.q'\n",
"HDB_CMD_ARGS=[\n",
" { 'key': 's', 'value': '2' }, \n",
" { 'key': 'g', 'value': '1' }, \n",
"]\n",
"\n",
"# Gateway Configs\n",
"GW_INIT_SCRIPT='gwmkdb.q'\n",
"GW_CMD_ARGS=[\n",
" { 'key': 's', 'value': '2' }, \n",
" { 'key': 'g', 'value': '1' }, \n",
" { 'key': 'rdb_name', 'value': RDB_CLUSTER_NAME}, \n",
" { 'key': 'hdb_name', 'value': HDB_CLUSTER_NAME}, \n",
"]\n",
"\n",
"# VPC Configuration\n",
"VPC_CONFIG={ \n",
" 'vpcId': VPC_ID,\n",
" 'securityGroupIds': SECURITY_GROUPS,\n",
" 'subnetIds': SUBNET_IDS,\n",
" 'ipAddressType': 'IP_V4' \n",
"}\n",
"\n",
"# Feedhandler configs\n",
"FEED_TIMER=10000\n",
"FH_PORT=5030 \n"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "3cfe7d89-9f5d-4ceb-ac8c-1f5054a6f15a",
"metadata": {},
"outputs": [],
"source": [
"# Using credentials and create service client\n",
"session = boto3.Session()\n",
"\n",
"# create finspace client\n",
"client = session.client(service_name='finspace')"
]
},
{
"cell_type": "markdown",
"id": "8c3d4047-9583-4b09-b75d-98fd2ddd6c36",
"metadata": {},
"source": [
"# Create a Sample Database\n",
"Create a synthetic database using kxtaqdb.q (takes 1-2 minutes)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "6470a6e3-874a-41f9-93ce-9aad0adbe47e",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"!rm -rf $SOURCE_DATA_DIR"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "a871f941-e506-4bc4-9922-7241ec9a2739",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\"Generated trade|quote records: 866361 4323449\"\n",
"\"Generated trade|quote records: 888436 4440838\"\n",
"\"Generated trade|quote records: 888187 4446429\"\n",
"\"Generated trade|quote records: 883938 4422176\"\n",
"\"Generated trade|quote records: 889931 4447795\"\n",
"\"Generated trade|quote records: 884716 4424949\"\n",
"323M\thdb\n",
"0\n"
]
}
],
"source": [
"# call local q (using pykx) to create the database\n",
"kx.q(\"\\l basictick/kxtaqdb.q\")\n",
"\n",
"# Database size\n",
"print( os.system(f\"du -sh {SOURCE_DATA_DIR}\") )\n"
]
},
{
"cell_type": "markdown",
"id": "8bf690f2-c465-4df8-90f5-1e3b808bb368",
"metadata": {},
"source": [
"## Stage Database Files to S3\n",
"Using AWS cli, copy hdb to staging S3 bucket"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "3ff35f7c-4c3c-47b2-9eaa-2197dbfdadd4",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Destination: s3://kdb-demo-829845998889-kms/data/hdb/\n",
" PRE 2024.11.18/\n",
" PRE 2024.11.19/\n",
" PRE 2024.11.20/\n",
" PRE 2024.11.21/\n",
" PRE 2024.11.22/\n",
" PRE 2024.11.25/\n",
"2024-11-26 15:15:09 75 sym\n",
"0\n"
]
}
],
"source": [
"# S3 destination\n",
"S3_DEST=f\"s3://{S3_BUCKET}/{S3_DATA_PATH}/{SOURCE_DATA_DIR}/\"\n",
"\n",
"cp = \"\"\n",
"\n",
"if AWS_ACCESS_KEY_ID is not None:\n",
" cp = f\"\"\"\n",
"export AWS_ACCESS_KEY_ID={AWS_ACCESS_KEY_ID} --quiet\n",
"export AWS_SECRET_ACCESS_KEY={AWS_SECRET_ACCESS_KEY}\n",
"export AWS_SESSION_TOKEN={AWS_SESSION_TOKEN}\n",
"\"\"\"\n",
"\n",
"cp += f\"\"\"\n",
"aws s3 rm --recursive {S3_DEST} --quiet\n",
"aws s3 sync --exclude .DS_Store {SOURCE_DATA_DIR} {S3_DEST} --quiet\n",
"\"\"\"\n",
" \n",
"# execute the S3 copy\n",
"os.system(cp)\n",
"\n",
"# confirm destination contents\n",
"print( f\"Destination: {S3_DEST}\" )\n",
"print( os.system(f\"aws s3 ls {S3_DEST}\") )"
]
},
{
"cell_type": "markdown",
"id": "17c759c4-ee6c-45c5-a9f6-6acacea3a3be",
"metadata": {},
"source": [
"## Create A Managed Database\n",
"Using the AWS APIs, create a managed database in Managed kdb Insights. The database is initially empty and is populated using changesets.\n",
"\n",
"### Reference\n",
"[Managed kdb Insights databases](https://docs.aws.amazon.com/finspace/latest/userguide/finspace-managed-kdb-db.html)\n",
"\n",
"### APIs used\n",
"[get_kx_database](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/finspace/client/get_kx_database.html) \n",
"[create_kx_database](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/finspace/client/create_kx_database.html) \n"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "d55bd8d3-5629-46f9-bc1f-47bb0308dc0a",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CREATING Database: basictickdb\n",
"CREATED Database: basictickdb\n",
"{\n",
" \"createdTimestamp\": \"2024-11-26 15:15:10.607000+00:00\",\n",
" \"databaseArn\": \"arn:aws:finspace:us-east-1:829845998889:kxEnvironment/jlcenjvtkgzrdek2qqv7ic/kxDatabase/basictickdb\",\n",
" \"databaseName\": \"basictickdb\",\n",
" \"description\": \"Basictick kdb database\",\n",
" \"environmentId\": \"jlcenjvtkgzrdek2qqv7ic\",\n",
" \"lastModifiedTimestamp\": \"2024-11-26 15:15:10.607000+00:00\"\n",
"}\n"
]
}
],
"source": [
"# assume database exists\n",
"create_db=False\n",
"\n",
"try:\n",
" resp = client.get_kx_database(environmentId=ENV_ID, databaseName=DB_NAME)\n",
" resp.pop('ResponseMetadata', None)\n",
"except:\n",
" # does not exist, will create\n",
" create_db=True\n",
"\n",
"if create_db:\n",
" print(f\"CREATING Database: {DB_NAME}\")\n",
" resp = client.create_kx_database(environmentId=ENV_ID, databaseName=DB_NAME, description=\"Basictick kdb database\")\n",
" resp.pop('ResponseMetadata', None)\n",
"\n",
" print(f\"CREATED Database: {DB_NAME}\")\n",
"\n",
"print(json.dumps(resp,sort_keys=True,indent=4,default=str))"
]
},
{
"cell_type": "markdown",
"id": "26d1194e-0c04-49a3-a7e7-a1d23fcff0d9",
"metadata": {},
"source": [
"## Add Data to Database\n",
"Add the created database data copied earlier to S3 to the created managed database using create_kx_changeset. \n",
"\n",
"### APIs used\n",
"[create_kx_changeset](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/finspace/client/create_kx_changeset.html) \n"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "eae61f04-1c9c-468e-bb38-b2e0b94897a0",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Adding Changeset to empty database\n",
"Changeset...\n",
"{\n",
" \"changeRequests\": [\n",
" {\n",
" \"changeType\": \"PUT\",\n",
" \"dbPath\": \"/2024.11.19/\",\n",
" \"s3Path\": \"s3://kdb-demo-829845998889-kms/data/hdb/2024.11.19/\"\n",
" },\n",
" {\n",
" \"changeType\": \"PUT\",\n",
" \"dbPath\": \"/2024.11.25/\",\n",
" \"s3Path\": \"s3://kdb-demo-829845998889-kms/data/hdb/2024.11.25/\"\n",
" },\n",
" {\n",
" \"changeType\": \"PUT\",\n",
" \"dbPath\": \"/2024.11.22/\",\n",
" \"s3Path\": \"s3://kdb-demo-829845998889-kms/data/hdb/2024.11.22/\"\n",
" },\n",
" {\n",
" \"changeType\": \"PUT\",\n",
" \"dbPath\": \"/2024.11.18/\",\n",
" \"s3Path\": \"s3://kdb-demo-829845998889-kms/data/hdb/2024.11.18/\"\n",
" },\n",
" {\n",
" \"changeType\": \"PUT\",\n",
" \"dbPath\": \"/2024.11.21/\",\n",
" \"s3Path\": \"s3://kdb-demo-829845998889-kms/data/hdb/2024.11.21/\"\n",
" },\n",
" {\n",
" \"changeType\": \"PUT\",\n",
" \"dbPath\": \"/2024.11.20/\",\n",
" \"s3Path\": \"s3://kdb-demo-829845998889-kms/data/hdb/2024.11.20/\"\n",
" },\n",
" {\n",
" \"changeType\": \"PUT\",\n",
" \"dbPath\": \"/\",\n",
" \"s3Path\": \"s3://kdb-demo-829845998889-kms/data/hdb/sym\"\n",
" }\n",
" ],\n",
" \"changesetId\": \"1Mm0hTkkSc1MXUzumKSe8A\",\n",
" \"createdTimestamp\": \"2024-11-26 15:15:12.586000+00:00\",\n",
" \"databaseName\": \"basictickdb\",\n",
" \"environmentId\": \"jlcenjvtkgzrdek2qqv7ic\",\n",
" \"lastModifiedTimestamp\": \"2024-11-26 15:15:12.586000+00:00\",\n",
" \"status\": \"PENDING\"\n",
"}\n"
]
}
],
"source": [
"# get the current list of database changesets\n",
"c_set_list = list_kx_changesets(client, environmentId=ENV_ID, databaseName=DB_NAME)\n",
"\n",
"if len(c_set_list) == 0:\n",
" print(\"Adding Changeset to empty database\")\n",
" changes=[]\n",
"\n",
" for f in os.listdir(f\"{SOURCE_DATA_DIR}\"):\n",
" if os.path.isdir(f\"{SOURCE_DATA_DIR}/{f}\"):\n",
" changes.append( { 'changeType': 'PUT', 's3Path': f\"{S3_DEST}{f}/\", 'dbPath': f\"/{f}/\" } )\n",
" else:\n",
" changes.append( { 'changeType': 'PUT', 's3Path': f\"{S3_DEST}{f}\", 'dbPath': f\"/\" } )\n",
"\n",
" resp = client.create_kx_changeset(environmentId=ENV_ID, databaseName=DB_NAME, \n",
" changeRequests=changes)\n",
"\n",
" resp.pop('ResponseMetadata', None)\n",
" changeset_id = resp['changesetId']\n",
"\n",
" print(\"Changeset...\")\n",
" print(json.dumps(resp,sort_keys=True,indent=4,default=str))\n",
"else:\n",
" # use latest changesetId\n",
" changeset_id = c_set_list[0]['changesetId'] "
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "b4422bdd-7d44-4fb0-8018-0bebd6987704",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Status is IN_PROGRESS, total wait 0:00:00, waiting 10 sec ...\n",
"Status is IN_PROGRESS, total wait 0:00:10, waiting 10 sec ...\n",
"Status is IN_PROGRESS, total wait 0:00:20, waiting 10 sec ...\n",
"Status is IN_PROGRESS, total wait 0:00:30, waiting 10 sec ...\n",
"Status is IN_PROGRESS, total wait 0:00:40, waiting 10 sec ...\n",
"Status is IN_PROGRESS, total wait 0:00:50, waiting 10 sec ...\n",
"**Done**\n"
]
}
],
"source": [
"# Wait for the changeset to be added to the database\n",
"wait_for_changeset_status(client, environmentId=ENV_ID, databaseName=DB_NAME, changesetId=changeset_id, show_wait=True)\n",
"print(\"**Done**\")"
]
},
{
"cell_type": "markdown",
"id": "1eb4a142-5dd1-4ee3-a746-852db203eb2f",
"metadata": {},
"source": [
"### Contents of the Managed Database\n",
"Display the changesets of the managed database.\n",
"\n",
"### APIs used\n",
"[list_kx_changesets](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/finspace/client/list_kx_changesets.html) \n",
"[get_kx_changeset](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/finspace/client/get_kx_changeset.html) \n"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "8ba008f3-4991-474c-9b3e-43a1dca56257",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"====================================================================================================\n",
"Database: basictickdb, Changesets: 1\n",
"====================================================================================================\n",
" Changeset: 1Mm0hTkkSc1MXUzumKSe8A: Created: 2024-11-26 15:15:12.586000+00:00 (COMPLETED)\n"
]
},
{
"data": {
"text/html": [
"<style type=\"text/css\">\n",
"</style>\n",
"<table id=\"T_c8bef\">\n",
" <thead>\n",
" <tr>\n",
" <th id=\"T_c8bef_level0_col0\" class=\"col_heading level0 col0\" >changeType</th>\n",
" <th id=\"T_c8bef_level0_col1\" class=\"col_heading level0 col1\" >s3Path</th>\n",
" <th id=\"T_c8bef_level0_col2\" class=\"col_heading level0 col2\" >dbPath</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <td id=\"T_c8bef_row0_col0\" class=\"data row0 col0\" >PUT</td>\n",
" <td id=\"T_c8bef_row0_col1\" class=\"data row0 col1\" >s3://kdb-demo-829845998889-kms/data/hdb/2024.11.19/</td>\n",
" <td id=\"T_c8bef_row0_col2\" class=\"data row0 col2\" >/2024.11.19/</td>\n",
" </tr>\n",
" <tr>\n",
" <td id=\"T_c8bef_row1_col0\" class=\"data row1 col0\" >PUT</td>\n",
" <td id=\"T_c8bef_row1_col1\" class=\"data row1 col1\" >s3://kdb-demo-829845998889-kms/data/hdb/2024.11.25/</td>\n",
" <td id=\"T_c8bef_row1_col2\" class=\"data row1 col2\" >/2024.11.25/</td>\n",
" </tr>\n",
" <tr>\n",
" <td id=\"T_c8bef_row2_col0\" class=\"data row2 col0\" >PUT</td>\n",
" <td id=\"T_c8bef_row2_col1\" class=\"data row2 col1\" >s3://kdb-demo-829845998889-kms/data/hdb/2024.11.22/</td>\n",
" <td id=\"T_c8bef_row2_col2\" class=\"data row2 col2\" >/2024.11.22/</td>\n",
" </tr>\n",
" <tr>\n",
" <td id=\"T_c8bef_row3_col0\" class=\"data row3 col0\" >PUT</td>\n",
" <td id=\"T_c8bef_row3_col1\" class=\"data row3 col1\" >s3://kdb-demo-829845998889-kms/data/hdb/2024.11.18/</td>\n",
" <td id=\"T_c8bef_row3_col2\" class=\"data row3 col2\" >/2024.11.18/</td>\n",
" </tr>\n",
" <tr>\n",
" <td id=\"T_c8bef_row4_col0\" class=\"data row4 col0\" >PUT</td>\n",
" <td id=\"T_c8bef_row4_col1\" class=\"data row4 col1\" >s3://kdb-demo-829845998889-kms/data/hdb/2024.11.21/</td>\n",
" <td id=\"T_c8bef_row4_col2\" class=\"data row4 col2\" >/2024.11.21/</td>\n",
" </tr>\n",
" <tr>\n",
" <td id=\"T_c8bef_row5_col0\" class=\"data row5 col0\" >PUT</td>\n",
" <td id=\"T_c8bef_row5_col1\" class=\"data row5 col1\" >s3://kdb-demo-829845998889-kms/data/hdb/2024.11.20/</td>\n",
" <td id=\"T_c8bef_row5_col2\" class=\"data row5 col2\" >/2024.11.20/</td>\n",
" </tr>\n",
" <tr>\n",
" <td id=\"T_c8bef_row6_col0\" class=\"data row6 col0\" >PUT</td>\n",
" <td id=\"T_c8bef_row6_col1\" class=\"data row6 col1\" >s3://kdb-demo-829845998889-kms/data/hdb/sym</td>\n",
" <td id=\"T_c8bef_row6_col2\" class=\"data row6 col2\" >/</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n"
],
"text/plain": [
"<pandas.io.formats.style.Styler at 0x7fb062fb76d0>"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"# get the current list of database changesets\n",
"c_set_list = list_kx_changesets(client, environmentId=ENV_ID, databaseName=DB_NAME)\n",
"\n",
"print(100*\"=\")\n",
"print(f\"Database: {DB_NAME}, Changesets: {len(c_set_list)}\")\n",
"print(100*\"=\")\n",
"\n",
"# sort by create time\n",
"c_set_list = sorted(c_set_list, key=lambda d: d['createdTimestamp']) \n",
"\n",
"for c in c_set_list:\n",
" c_set_id = c['changesetId']\n",
" print(f\" Changeset: {c_set_id}: Created: {c['createdTimestamp']} ({c['status']})\")\n",
" c_rqs = client.get_kx_changeset(environmentId=ENV_ID, databaseName=DB_NAME, changesetId=c_set_id)['changeRequests']\n",
"\n",
" chs_pdf = pd.DataFrame.from_dict(c_rqs).style.hide(axis='index')\n",
" display(chs_pdf)"
]
},
{
"cell_type": "markdown",
"id": "9dae0232-3666-491f-8891-dae30e12c9d8",
"metadata": {},
"source": [
"# Create Scaling Group\n",
"The scaling group represents the total compute avilable to the application. All clusters will be placed into the scaling group and will share the compute and memory of the scaling group.\n",
"\n",
"## Reference\n",
"[Managed kdb scaling groups](https://docs.aws.amazon.com/finspace/latest/userguide/finspace-managed-kdb-scaling-groups.html)\n",
"\n",
"## APIs used\n",
"[create_kx_scaling_group](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/finspace/client/create_kx_scaling_group.html) "
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "613be7f8-fb82-4415-b30c-186ed470dba4",
"metadata": {},
"outputs": [],
"source": [
"# Check if scaling group exits, only create if it does not exist\n",
"resp = get_kx_scaling_group(client=client, environmentId=ENV_ID, scalingGroupName=SCALING_GROUP_NAME)\n",
"\n",
"if resp is None:\n",
" resp = client.create_kx_scaling_group(\n",
" environmentId = ENV_ID, \n",
" scalingGroupName = SCALING_GROUP_NAME,\n",
" hostType=NODE_TYPE,\n",
" availabilityZoneId = AZ_ID\n",
" )\n",
"else:\n",
" print(f\"Scaling Group {SCALING_GROUP_NAME} exists\") "
]
},
{
"cell_type": "markdown",
"id": "6943fb16-8989-4199-a0fd-5c7c0d5aa56e",
"metadata": {},
"source": [
"# Create Shared Volume\n",
"The shared volume is a common storage device for the application. Every cluster using the shared volume will have a writable directory named after the cluster, can read the directories named after other clusters in the application using the volume. Also, there is a common directory for every shared volume as well, all clusters using a volumes can read/write to the common directory.\n",
"\n",
"## Directory Structure\n",
"Any shared volumes will appear in the /opt/kx/app/shared directory of clusters using the volume, with a path is named for shared volume (/opt/kx/app/shared/VOLUME_NAME). Each cluster using the volume will have a directory named for the cluster that only the cluster can write to (/opt/kx/app/shared/VOLUME_NAME/CLUSTER_NAME) and others using the volumes can read from. Last each shared volume has a directory that is read/write to all clusters using the volume (/opt/kx/app/shared/VOLUME_NAME/common)\n",
"\n",
"**Root:** /opt/kx/app/shared \n",
"**Each Volume:** /opt/kx/app/shared/VOLUME_NAME \n",
"**Write per cluster (read otherwise):** /opt/kx/app/shared/VOLUME_NAME/CLUSTER_NAME \n",
"**common read/write:** /opt/kx/app/shared/VOLUME_NAME/common \n",
"\n",
"## Reference\n",
"[FinSpace Managed kdb Volumes](https://docs.aws.amazon.com/finspace/latest/userguide/finspace-managed-kdb-volumes.html)\n",
"\n",
"## APIs used\n",
"[create_kx_volume](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/finspace/client/create_kx_volume.html) \n"
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "e4a8a247-d029-4f9b-aaf5-c6e2ffe200a1",
"metadata": {},
"outputs": [],
"source": [
"# Check if volume already exists before trying to create one\n",
"resp = get_kx_volume(client=client, environmentId=ENV_ID, volumeName=VOLUME_NAME)\n",
"\n",
"if resp is None:\n",
" resp = client.create_kx_volume(\n",
" environmentId = ENV_ID, \n",
" volumeType = 'NAS_1',\n",
" volumeName = VOLUME_NAME,\n",
" description = 'Shared volume between TP and RDB',\n",
" nas1Configuration = NAS1_CONFIG,\n",
" azMode='SINGLE',\n",
" availabilityZoneIds=[ AZ_ID ] \n",
" )\n",
"else:\n",
" print(f\"Volume {VOLUME_NAME} exists\") "
]
},
{
"cell_type": "markdown",
"id": "fe41eaeb-9c8e-44d3-b8bc-f354142f9140",
"metadata": {},
"source": [
"# Create Dataview\n",
"Create a dataview of the database and have all of its data presented (cached) on the shared volume. Customers can also choose to cache only a portion of the database and can also shoose to tier storage on different volumes as well.\n",
"\n",
"### Reference\n",
"[Dataviews for querying data](https://docs.aws.amazon.com/finspace/latest/userguide/finspace-managed-kdb-dataviews.html)\n",
"\n",
"### APIs used\n",
"[create_kx_dataview](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/finspace/client/create_kx_dataview.html) \n"
]
},
{
"cell_type": "code",
"execution_count": 13,
"id": "ebcbde08-ef39-41e3-bb3e-d97711dbbd1a",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Volume: RDB_TP_SHARED status is CREATING, total wait 0:00:00, waiting 30 sec ...\n",
"Volume: RDB_TP_SHARED status is CREATING, total wait 0:00:30, waiting 30 sec ...\n",
"Volume: RDB_TP_SHARED status is CREATING, total wait 0:01:00, waiting 30 sec ...\n",
"Volume: RDB_TP_SHARED status is CREATING, total wait 0:01:30, waiting 30 sec ...\n",
"Volume: RDB_TP_SHARED status is CREATING, total wait 0:02:00, waiting 30 sec ...\n",
"Volume: RDB_TP_SHARED status is CREATING, total wait 0:02:30, waiting 30 sec ...\n",
"Volume: RDB_TP_SHARED status is CREATING, total wait 0:03:00, waiting 30 sec ...\n",
"Volume: RDB_TP_SHARED status is CREATING, total wait 0:03:30, waiting 30 sec ...\n",
"Volume: RDB_TP_SHARED status is CREATING, total wait 0:04:00, waiting 30 sec ...\n",
"Volume: RDB_TP_SHARED status is CREATING, total wait 0:04:30, waiting 30 sec ...\n",
"Volume: RDB_TP_SHARED status is CREATING, total wait 0:05:00, waiting 30 sec ...\n",
"Volume: RDB_TP_SHARED status is CREATING, total wait 0:05:30, waiting 30 sec ...\n",
"Volume: RDB_TP_SHARED status is CREATING, total wait 0:06:00, waiting 30 sec ...\n",
"Volume: RDB_TP_SHARED status is CREATING, total wait 0:06:30, waiting 30 sec ...\n",
"Volume: RDB_TP_SHARED status is CREATING, total wait 0:07:00, waiting 30 sec ...\n",
"Volume: RDB_TP_SHARED status is CREATING, total wait 0:07:30, waiting 30 sec ...\n",
"Volume: RDB_TP_SHARED status is CREATING, total wait 0:08:00, waiting 30 sec ...\n",
"Volume: RDB_TP_SHARED status is now ACTIVE, total wait 0:08:30\n",
"** VOLUME is READY **\n"
]
}
],
"source": [
"# before creating the dataview, be sure the volume is created and ready\n",
"wait_for_volume_status(client=client, environmentId=ENV_ID, volumeName=VOLUME_NAME, show_wait=True)\n",
"print(\"** VOLUME is READY **\")"
]
},
{
"cell_type": "code",
"execution_count": 14,
"id": "03434316-4ccc-420d-adee-715e6eb1bcd6",
"metadata": {},
"outputs": [],
"source": [
"# do changesets exist?\n",
"c_set_list = list_kx_changesets(client, environmentId=ENV_ID, databaseName=DB_NAME)\n",
"\n",
"if len(c_set_list) != 0:\n",
" # sort by create time\n",
" c_set_list = sorted(c_set_list, key=lambda d: d['createdTimestamp']) \n",
" latest_changeset = c_set_list[-1]['changesetId']\n",
"\n",
" # Check if dataview already exists and is set to the requested changeset_id\n",
" resp = get_kx_dataview(client=client, environmentId=ENV_ID, databaseName=DB_NAME, dataviewName=DBVIEW_NAME)\n",
"\n",
" if resp is None:\n",
" resp = client.create_kx_dataview(\n",
" environmentId = ENV_ID, \n",
" databaseName=DB_NAME, \n",
" dataviewName=DBVIEW_NAME,\n",
" azMode='SINGLE',\n",
" availabilityZoneId=AZ_ID,\n",
" segmentConfigurations=[\n",
" { \n",
" 'volumeName': VOLUME_NAME,\n",
" 'dbPaths': ['/*'], # cache all of database\n",
" }\n",
" ],\n",
" autoUpdate=False,\n",
" changesetId=latest_changeset, # latest changeset_id for static view\n",
" description = f'Dataview of database'\n",
" )\n",
" elif resp['changesetId'] != latest_changeset:\n",
" print(f\"Dataview {DBVIEW_NAME} exists but needs updating...\")\n",
" resp = client.update_kx_dataview(environmentId=ENV_ID, \n",
" databaseName=DB_NAME, \n",
" dataviewName=DBVIEW_NAME, \n",
" changesetId=latest_changeset, \n",
" segmentConfigurations=[\n",
" {'dbPaths': ['/*'], 'volumeName': VOLUME_NAME}\n",
" ]\n",
" )\n",
" else:\n",
" print(f\"Dataview {DBVIEW_NAME} exists with current changeset: {latest_changeset}\")\n",
"else:\n",
" # no changesets, do NOT create view\n",
" print(f\"No changeset in database: {DB_NAME}, Dataview {DBVIEW_NAME} not created\") \n"
]
},
{
"cell_type": "markdown",
"id": "dea431b0-c501-46bb-b72a-a5eb80a335b0",
"metadata": {},
"source": [
"# Create Clusters\n",
"Create the needed clusters for the application. \n",
"\n",
"Code to be used in this application must be staged to an S3 bucket the service can read from, that code will be deployed to each cluster as part of the cluster creation process.\n",
"\n",
"## Reference\n",
"[Managed kdb Insights clusters](https://docs.aws.amazon.com/finspace/latest/userguide/finspace-managed-kdb-clusters.html) \n",
"[Cluster types](https://docs.aws.amazon.com/finspace/latest/userguide/kdb-cluster-types.html)"
]
},
{
"cell_type": "code",
"execution_count": 15,
"id": "e960387c-18d1-4689-900f-606eb48881bd",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"total 80K\n",
"-rw-rw-r-- 1 ec2-user ec2-user 2.8K Sep 25 13:59 tick.q\n",
"drwxrwxr-x 2 ec2-user ec2-user 4.0K Sep 25 13:59 tick\n",
"-rw-rw-r-- 1 ec2-user ec2-user 212 Sep 25 13:59 taq.schema.q\n",
"-rw-rw-r-- 1 ec2-user ec2-user 3.2K Sep 25 13:59 rtsmkdb.q\n",
"-rw-rw-r-- 1 ec2-user ec2-user 4.8K Sep 25 13:59 rdbmkdb.q\n",
"-rw-rw-r-- 1 ec2-user ec2-user 695 Sep 25 13:59 query.q\n",
"-rw-rw-r-- 1 ec2-user ec2-user 752 Sep 25 13:59 kxtaqsubscriber.q\n",
"-rw-rw-r-- 1 ec2-user ec2-user 2.7K Sep 25 13:59 kxtaqdb.q\n",
"-rw-rw-r-- 1 ec2-user ec2-user 655 Sep 25 13:59 hdbmkdb.q\n",
"-rw-rw-r-- 1 ec2-user ec2-user 3.0K Sep 25 13:59 gwmkdb.q\n",
"-rw-rw-r-- 1 ec2-user ec2-user 274 Sep 25 13:59 funcDownHandle.q\n",
"-rw-rw-r-- 1 ec2-user ec2-user 3.1K Sep 25 13:59 connectmkdb.q\n",
"-rw-rw-r-- 1 ec2-user ec2-user 4.5K Oct 1 21:17 kxtaqfeed.q\n",
"-rw-rw-r-- 1 ec2-user ec2-user 4.5K Nov 1 01:06 feed.q\n",
"drwxrwxr-x 4 ec2-user ec2-user 4.0K Nov 1 01:06 .\n",
"drwxrwxr-x 2 ec2-user ec2-user 4.0K Nov 26 15:09 .ipynb_checkpoints\n",
"drwxrwxr-x 7 ec2-user ec2-user 4.0K Nov 26 15:23 ..\n"
]
}
],
"source": [
"!ls -lrtha $CODEBASE"
]
},
{
"cell_type": "code",
"execution_count": 16,
"id": "b502a0a5-8610-4fc8-b6b7-04c47e89ba75",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"updating: query.q (deflated 49%)\n",
"updating: connectmkdb.q (deflated 63%)\n",
"updating: taq.schema.q (deflated 45%)\n",
"updating: kxtaqsubscriber.q (deflated 42%)\n",
"updating: kxtaqdb.q (deflated 44%)\n",
"updating: rtsmkdb.q (deflated 56%)\n",
"updating: rdbmkdb.q (deflated 58%)\n",
"updating: tick/ (stored 0%)\n",
"updating: tick/u.q (deflated 32%)\n",
"updating: hdbmkdb.q (deflated 42%)\n",
"updating: kxtaqfeed.q (deflated 50%)\n",
"updating: tick.q (deflated 49%)\n",
"updating: feed.q (deflated 52%)\n",
"updating: gwmkdb.q (deflated 61%)\n",
"updating: funcDownHandle.q (deflated 33%)\n",
"upload: ./basictick.zip to s3://kdb-demo-829845998889-kms/code/basictick.zip\n",
"2024-11-26 15:24:58 16585 basictick.zip\n"
]
},
{
"data": {
"text/plain": [
"0"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# create zipfile of the local code\n",
"os.system(f\"cd {CODEBASE}; zip -r -X ../{CODEBASE}.zip . -x '*.ipynb_checkpoints*';\")\n",
"\n",
"cp = \"\"\n",
"# Copy command with credentials\n",
"if AWS_ACCESS_KEY_ID is not None:\n",
" cp = f\"\"\"\n",
"export AWS_ACCESS_KEY_ID={AWS_ACCESS_KEY_ID}\n",
"export AWS_SECRET_ACCESS_KEY={AWS_SECRET_ACCESS_KEY}\n",
"export AWS_SESSION_TOKEN={AWS_SESSION_TOKEN}\n",
"\"\"\"\n",
" \n",
"cp += f\"\"\"\n",
"aws s3 cp --exclude .DS_Store {CODEBASE}.zip s3://{S3_BUCKET}/code/{CODEBASE}.zip\n",
"\"\"\"\n",
" \n",
"# Copy the code\n",
"os.system(cp)\n",
"\n",
"# Code on S3\n",
"os.system(f\"aws s3 ls s3://{S3_BUCKET}/code/{CODEBASE}.zip\")"
]
},
{
"cell_type": "markdown",
"id": "92a6d9d0-1043-4914-be9a-074629ed174d",
"metadata": {},
"source": [
"## Wait for Scaling Group to be Ready\n",
"Before creating clusters in a scaling group, be sure the scaling group is ready."
]
},
{
"cell_type": "code",
"execution_count": 17,
"id": "862d8607-a3aa-47c4-b7e5-a3c89aebe241",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Scaling Group: SCALING_GROUP_basictickdb status is now ACTIVE, total wait 0:00:00\n",
"** DONE **\n"
]
}
],
"source": [
"# wait for the scaling group to create\n",
"wait_for_scaling_group_status(client=client, environmentId=ENV_ID, scalingGroupName=SCALING_GROUP_NAME, show_wait=True)\n",
"print(\"** DONE **\")"
]
},
{
"cell_type": "markdown",
"id": "88716ead-2205-4971-a5b0-33ffe96d7f85",
"metadata": {},
"source": [
"## Create Tickerplant (TP) Cluster\n",
"Tickerplant will deliver data from feedhandler to subscribing RDB.\n",
"\n",
"### APIs used\n",
"[create_kx_cluster](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/finspace/client/create_kx_cluster.html) \n",
"\n",
"#### Notes\n",
"- No database used by TP, databases argument is not used \n",
"- Use tickerplantLogConfiguration **not** savedownStorageConfiguration \n",
" - tickerplantLogVolumes uses the same shared volume as other clusters"
]
},
{
"cell_type": "code",
"execution_count": 18,
"id": "28bf38a1-7733-4eb2-839a-a302a57c8225",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Creating: TP_basictickdb\n"
]
}
],
"source": [
"# does cluster already exist?\n",
"resp = get_kx_cluster(client, environmentId=ENV_ID, clusterName=TP_CLUSTER_NAME)\n",
"\n",
"if resp is not None:\n",
" print(f\"Cluster: {TP_CLUSTER_NAME} already exists\")\n",
"else:\n",
" print(f\"Creating: {TP_CLUSTER_NAME}\")\n",
"\n",
" resp = client.create_kx_cluster(\n",
" environmentId=ENV_ID, \n",
" clusterName=TP_CLUSTER_NAME,\n",
" clusterType='TICKERPLANT',\n",
" releaseLabel = '1.0',\n",
" executionRole=EXECUTION_ROLE,\n",
" scalingGroupConfiguration={\n",
" 'memoryReservation': 6,\n",
" 'nodeCount': 1,\n",
" 'scalingGroupName': SCALING_GROUP_NAME,\n",
" },\n",
" tickerplantLogConfiguration ={ 'tickerplantLogVolumes': [ VOLUME_NAME ] },\n",
" clusterDescription=\"Created with create_all notebook\",\n",
" code=CODE_CONFIG,\n",
" initializationScript=TP_INIT_SCRIPT,\n",
" commandLineArguments=TP_CMD_ARGS,\n",
" azMode=AZ_MODE,\n",
" availabilityZoneId=AZ_ID,\n",
" vpcConfiguration=VPC_CONFIG\n",
" )"
]
},
{
"cell_type": "markdown",
"id": "4a2a31ee-07e1-408b-8fe9-2a9fb92f4df2",
"metadata": {},
"source": [
"## Create Historical Database (HDB) Cluster\n",
"A multi-node HDB cluster will serve up queries for T+1 and older data. \n",
"\n",
"### APIs used\n",
"[create_kx_cluster](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/finspace/client/create_kx_cluster.html) \n",
"\n",
"#### Notes\n",
"- **databases**: defines which database and view to use\n",
" - View used by the HDB cluster must be up and running \n",
"- No a tickerplant, no tickerplantLogConfiguration argument \n",
"- No savedown needed, no savedownStorageConfiguration argument "
]
},
{
"cell_type": "code",
"execution_count": 19,
"id": "dc77c3a1-dcb6-446b-9756-59431b109b20",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Dataview: basictickdb_DBVIEW status is CREATING, total wait 0:00:00, waiting 30 sec ...\n",
"Dataview: basictickdb_DBVIEW status is CREATING, total wait 0:00:30, waiting 30 sec ...\n",
"Dataview: basictickdb_DBVIEW status is CREATING, total wait 0:01:00, waiting 30 sec ...\n",
"Dataview: basictickdb_DBVIEW status is CREATING, total wait 0:01:30, waiting 30 sec ...\n",
"Dataview: basictickdb_DBVIEW status is CREATING, total wait 0:02:00, waiting 30 sec ...\n",
"Dataview: basictickdb_DBVIEW status is CREATING, total wait 0:02:30, waiting 30 sec ...\n",
"Dataview: basictickdb_DBVIEW status is CREATING, total wait 0:03:00, waiting 30 sec ...\n",
"Dataview: basictickdb_DBVIEW status is CREATING, total wait 0:03:30, waiting 30 sec ...\n",
"Dataview: basictickdb_DBVIEW status is CREATING, total wait 0:04:00, waiting 30 sec ...\n",
"Dataview: basictickdb_DBVIEW status is CREATING, total wait 0:04:30, waiting 30 sec ...\n",
"Dataview: basictickdb_DBVIEW status is CREATING, total wait 0:05:00, waiting 30 sec ...\n",
"Dataview: basictickdb_DBVIEW status is CREATING, total wait 0:05:30, waiting 30 sec ...\n",
"Dataview: basictickdb_DBVIEW status is CREATING, total wait 0:06:00, waiting 30 sec ...\n",
"Dataview: basictickdb_DBVIEW status is now ACTIVE, total wait 0:06:30\n",
"** Dataview is READY **\n"
]
}
],
"source": [
"# Dataview must be ready before creating the HDB Cluster\n",
"wait_for_dataview_status(client=client, environmentId=ENV_ID, databaseName=DB_NAME, dataviewName=DBVIEW_NAME, show_wait=True)\n",
"print(\"** Dataview is READY **\")"
]
},
{
"cell_type": "code",
"execution_count": 20,
"id": "3abd68fa-5690-4374-bb68-4277bb87cf26",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Creating: HDB_basictickdb\n"
]
}
],
"source": [
"# does cluster already exist?\n",
"resp = get_kx_cluster(client, environmentId=ENV_ID, clusterName=HDB_CLUSTER_NAME)\n",
"\n",
"if resp is not None:\n",
" print(f\"Cluster: {HDB_CLUSTER_NAME} already exists\")\n",
"else:\n",
" print(f\"Creating: {HDB_CLUSTER_NAME}\")\n",
"\n",
" resp = client.create_kx_cluster(\n",
" environmentId=ENV_ID, \n",
" clusterName=HDB_CLUSTER_NAME,\n",
" clusterType='HDB',\n",
" releaseLabel = '1.0',\n",
" executionRole=EXECUTION_ROLE,\n",
" databases=[{ 'databaseName': DB_NAME, 'dataviewName': DBVIEW_NAME }],\n",
" scalingGroupConfiguration={\n",
" 'memoryReservation': 6, # minimum\n",
" 'nodeCount': 2,\n",
" 'scalingGroupName': SCALING_GROUP_NAME,\n",
" },\n",
" clusterDescription=\"Created with create_all notebook\",\n",
" code=CODE_CONFIG,\n",
" initializationScript=HDB_INIT_SCRIPT,\n",
" commandLineArguments=HDB_CMD_ARGS,\n",
" azMode=AZ_MODE,\n",
" availabilityZoneId=AZ_ID,\n",
" vpcConfiguration=VPC_CONFIG\n",
" )"
]
},
{
"cell_type": "markdown",
"id": "932a21ff-a51f-47d3-9b70-c4eb863fa34f",
"metadata": {},
"source": [
"## Create Gateway (GW) Cluster\n",
"The Gateway will handle client queries for data in the RDB and HDB. Gateways act as single API access point for data queries and will query both the RDB and HDB and aggregate results back to requestor.\n",
"\n",
"### APIs used\n",
"[create_kx_cluster](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/finspace/client/create_kx_cluster.html) \n",
"\n",
"#### Notes\n",
"- Gateways connect to other clusters and aggregate results \n",
" - No databases, tickerplantLogConfiguration, or savedownStorageConfiguration arguments\n",
"- execution role required, role is used when connecting to other clusters \n"
]
},
{
"cell_type": "code",
"execution_count": 21,
"id": "2f27b7a1-4e8a-4b32-9175-a83ffb97f2de",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Creating: GATEWAY_basictickdb\n"
]
}
],
"source": [
"# does cluster already exist?\n",
"resp = get_kx_cluster(client, environmentId=ENV_ID, clusterName=GW_CLUSTER_NAME)\n",
"\n",
"if resp is not None:\n",
" print(f\"Cluster: {GW_CLUSTER_NAME} already exists\")\n",
"else:\n",
" print(f\"Creating: {GW_CLUSTER_NAME}\")\n",
"\n",
" resp = client.create_kx_cluster(\n",
" environmentId=ENV_ID, \n",
" clusterName=GW_CLUSTER_NAME,\n",
" clusterType='GATEWAY',\n",
" releaseLabel = '1.0',\n",
" scalingGroupConfiguration={\n",
" 'memoryReservation': 6, # minimum\n",
" 'nodeCount': 1,\n",
" 'scalingGroupName': SCALING_GROUP_NAME,\n",
" },\n",
" clusterDescription=\"Created with create_all notebook\",\n",
" executionRole=EXECUTION_ROLE,\n",
" code=CODE_CONFIG,\n",
" initializationScript=GW_INIT_SCRIPT,\n",
" commandLineArguments=GW_CMD_ARGS,\n",
" azMode=AZ_MODE,\n",
" availabilityZoneId=AZ_ID,\n",
" vpcConfiguration=VPC_CONFIG\n",
" )"
]
},
{
"cell_type": "markdown",
"id": "8fd1688e-850d-423a-b98f-8cc67eb9ca8d",
"metadata": {},
"source": [
"## Create Realtime Database (RDB) Cluster\n",
"The RDB will subscribe to the tickerplant and capture real time data published by the tickerplant (as published by the feedhandler).\n",
"\n",
"Since the RDB clusters depend on the TP cluster, will check that its up before creating the RDBs.\n",
"\n",
"### APIs used\n",
"[create_kx_cluster](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/finspace/client/create_kx_cluster.html) \n",
"\n",
"#### Notes\n",
"- **databases:** must include database and view \n",
" - RDB will update the dbview of the database as part of end of day processing\n",
"- **savedownStorageConfiguration:** defines storage used \n",
" - End of day data is first saved to this location before updating the database "
]
},
{
"cell_type": "code",
"execution_count": 22,
"id": "8efea149-22e9-4503-9c31-903b742a77eb",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Cluster: TP_basictickdb status is CREATING, total wait 0:00:00, waiting 30 sec ...\n",
"Cluster: TP_basictickdb status is CREATING, total wait 0:00:30, waiting 30 sec ...\n",
"Cluster: TP_basictickdb status is CREATING, total wait 0:01:00, waiting 30 sec ...\n",
"Cluster: TP_basictickdb status is CREATING, total wait 0:01:30, waiting 30 sec ...\n",
"Cluster: TP_basictickdb status is CREATING, total wait 0:02:00, waiting 30 sec ...\n",
"Cluster: TP_basictickdb status is CREATING, total wait 0:02:30, waiting 30 sec ...\n",
"Cluster: TP_basictickdb status is CREATING, total wait 0:03:00, waiting 30 sec ...\n",
"Cluster: TP_basictickdb status is CREATING, total wait 0:03:30, waiting 30 sec ...\n",
"Cluster: TP_basictickdb status is CREATING, total wait 0:04:00, waiting 30 sec ...\n",
"Cluster: TP_basictickdb status is CREATING, total wait 0:04:30, waiting 30 sec ...\n",
"Cluster: TP_basictickdb status is CREATING, total wait 0:05:00, waiting 30 sec ...\n",
"Cluster: TP_basictickdb status is CREATING, total wait 0:05:30, waiting 30 sec ...\n",
"Cluster: TP_basictickdb status is CREATING, total wait 0:06:00, waiting 30 sec ...\n",
"Cluster: TP_basictickdb status is CREATING, total wait 0:06:30, waiting 30 sec ...\n",
"Cluster: TP_basictickdb status is now RUNNING, total wait 0:07:00\n",
"TP is running\n"
]
}
],
"source": [
"# TP must be running before creating the RDBs\n",
"wait_for_cluster_status(client, environmentId=ENV_ID, clusterName=TP_CLUSTER_NAME, show_wait=True)\n",
"print(\"TP is running\")"
]
},
{
"cell_type": "code",
"execution_count": 23,
"id": "3e39ebf3-6940-40f1-a7f8-90efb3846f7b",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Creating: RDB_basictickdb\n"
]
}
],
"source": [
"# does cluster already exist?\n",
"resp = get_kx_cluster(client, environmentId=ENV_ID, clusterName=RDB_CLUSTER_NAME)\n",
"\n",
"if resp is not None:\n",
" print(f\"Cluster: {RDB_CLUSTER_NAME} already exists\")\n",
"else:\n",
" print(f\"Creating: {RDB_CLUSTER_NAME}\")\n",
"\n",
" resp = client.create_kx_cluster(\n",
" environmentId=ENV_ID, \n",
" clusterName=RDB_CLUSTER_NAME,\n",
" clusterType='RDB',\n",
" releaseLabel = '1.0',\n",
" executionRole=EXECUTION_ROLE,\n",
" databases=[{ 'databaseName': DB_NAME }], #, 'dataviewName': DBVIEW_NAME }],\n",
" scalingGroupConfiguration={\n",
" 'memoryReservation': 6, # minimum\n",
" 'nodeCount': 1,\n",
" 'scalingGroupName': SCALING_GROUP_NAME,\n",
" },\n",
" savedownStorageConfiguration ={ 'volumeName': VOLUME_NAME },\n",
" clusterDescription=\"Created with create_all notebook\",\n",
" code=CODE_CONFIG,\n",
" initializationScript=RDB_INIT_SCRIPT,\n",
" commandLineArguments=RDB_CMD_ARGS,\n",
" azMode=AZ_MODE,\n",
" availabilityZoneId=AZ_ID,\n",
" vpcConfiguration=VPC_CONFIG\n",
" )"
]
},
{
"cell_type": "markdown",
"id": "b0c1f4f3",
"metadata": {},
"source": [
"## Create Realtime Subscriber (RTS)\n",
"The RTS is similar to the RDB, and will subscribe to the tickerplant to capture and perform calculations on real time data such as maintaining a table of last trade price.\n",
"\n",
"### APIs used\n",
"[create_kx_cluster](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/finspace/client/create_kx_cluster.html) \n",
"\n",
"#### Notes\n",
"- Connects to TP clusters, subscribesfor data and publishes its calculations \n",
" - No databases, tickerplantLogConfiguration, or savedownStorageConfiguration needed\n",
"- execution role required, role is used when connecting to TP cluster \n"
]
},
{
"cell_type": "code",
"execution_count": 24,
"id": "a51df448",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Creating: RTS_basictickdb\n"
]
}
],
"source": [
"# does cluster already exist?\n",
"resp = get_kx_cluster(client, environmentId=ENV_ID, clusterName=RTS_CLUSTER_NAME)\n",
"\n",
"if resp is not None:\n",
" print(f\"Cluster: {RTS_CLUSTER_NAME} already exists\")\n",
"else:\n",
" print(f\"Creating: {RTS_CLUSTER_NAME}\")\n",
"\n",
" resp = client.create_kx_cluster(\n",
" environmentId=ENV_ID, \n",
" clusterName=RTS_CLUSTER_NAME,\n",
" clusterType='RDB',\n",
" releaseLabel = '1.0',\n",
" executionRole=EXECUTION_ROLE,\n",
" scalingGroupConfiguration={\n",
" 'memoryReservation': 6, # minimum\n",
" 'nodeCount': 1,\n",
" 'scalingGroupName': SCALING_GROUP_NAME,\n",
" },\n",
" clusterDescription=\"Created with create_all notebook\",\n",
" code=CODE_CONFIG,\n",
" initializationScript=RTS_INIT_SCRIPT,\n",
" commandLineArguments=RTS_CMD_ARGS,\n",
" azMode=AZ_MODE,\n",
" availabilityZoneId=AZ_ID,\n",
" vpcConfiguration=VPC_CONFIG\n",
" )"
]
},
{
"cell_type": "markdown",
"id": "230207e8-c297-4d7e-af65-4396fa5b4deb",
"metadata": {},
"source": [
"# List All Clusters\n",
"List all clusters, but first be sure all are in running state."
]
},
{
"cell_type": "code",
"execution_count": 25,
"id": "0946ca26-c4b0-410d-ade5-18a47bf2318a",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Cluster: TP_basictickdb status is now RUNNING, total wait 0:00:00\n",
"Cluster: RDB_basictickdb status is PENDING, total wait 0:00:00, waiting 30 sec ...\n",
"Cluster: RDB_basictickdb status is CREATING, total wait 0:00:30, waiting 30 sec ...\n",
"Cluster: RDB_basictickdb status is CREATING, total wait 0:01:00, waiting 30 sec ...\n",
"Cluster: RDB_basictickdb status is CREATING, total wait 0:01:30, waiting 30 sec ...\n",
"Cluster: RDB_basictickdb status is CREATING, total wait 0:02:00, waiting 30 sec ...\n",
"Cluster: RDB_basictickdb status is CREATING, total wait 0:02:30, waiting 30 sec ...\n",
"Cluster: RDB_basictickdb status is CREATING, total wait 0:03:00, waiting 30 sec ...\n",
"Cluster: RDB_basictickdb status is CREATING, total wait 0:03:30, waiting 30 sec ...\n",
"Cluster: RDB_basictickdb status is CREATING, total wait 0:04:00, waiting 30 sec ...\n",
"Cluster: RDB_basictickdb status is CREATING, total wait 0:04:30, waiting 30 sec ...\n",
"Cluster: RDB_basictickdb status is CREATING, total wait 0:05:00, waiting 30 sec ...\n",
"Cluster: RDB_basictickdb status is CREATING, total wait 0:05:30, waiting 30 sec ...\n",
"Cluster: RDB_basictickdb status is CREATING, total wait 0:06:00, waiting 30 sec ...\n",
"Cluster: RDB_basictickdb status is CREATING, total wait 0:06:30, waiting 30 sec ...\n",
"Cluster: RDB_basictickdb status is CREATING, total wait 0:07:00, waiting 30 sec ...\n",
"Cluster: RDB_basictickdb status is CREATING, total wait 0:07:30, waiting 30 sec ...\n",
"Cluster: RDB_basictickdb status is CREATING, total wait 0:08:00, waiting 30 sec ...\n",
"Cluster: RDB_basictickdb status is CREATING, total wait 0:08:30, waiting 30 sec ...\n",
"Cluster: RDB_basictickdb status is CREATING, total wait 0:09:00, waiting 30 sec ...\n",
"Cluster: RDB_basictickdb status is CREATING, total wait 0:09:30, waiting 30 sec ...\n",
"Cluster: RDB_basictickdb status is now RUNNING, total wait 0:10:00\n",
"Cluster: HDB_basictickdb status is now RUNNING, total wait 0:00:00\n",
"Cluster: GATEWAY_basictickdb status is now RUNNING, total wait 0:00:00\n",
"Cluster: RTS_basictickdb status is CREATING, total wait 0:00:00, waiting 30 sec ...\n",
"Cluster: RTS_basictickdb status is CREATING, total wait 0:00:30, waiting 30 sec ...\n",
"Cluster: RTS_basictickdb status is now RUNNING, total wait 0:01:00\n",
"** ALL CLUSTERS DONE **\n"
]
}
],
"source": [
"# Wait for all clusters be in running state\n",
"for c in all_clusters.values():\n",
" wait_for_cluster_status(client, environmentId=ENV_ID, clusterName=c, show_wait=True)\n",
"\n",
"print(\"** ALL CLUSTERS DONE **\")"
]
},
{
"cell_type": "code",
"execution_count": 26,
"id": "1c50c578-05e8-49e7-8deb-1f6b94b10221",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>clusterName</th>\n",
" <th>status</th>\n",
" <th>clusterType</th>\n",
" <th>capacityConfiguration</th>\n",
" <th>commandLineArguments</th>\n",
" <th>clusterDescription</th>\n",
" <th>lastModifiedTimestamp</th>\n",
" <th>createdTimestamp</th>\n",
" <th>databaseName</th>\n",
" <th>cacheConfigurations</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>GATEWAY_basictickdb</td>\n",
" <td>RUNNING</td>\n",
" <td>GATEWAY</td>\n",
" <td>None</td>\n",
" <td>[{'key': 's', 'value': '2'}, {'key': 'g', 'value': '1'}, {'key': 'rdb_name', 'value': 'RDB_basictickdb'}, {'key': 'hdb_name', 'value': 'HDB_basictickdb'}]</td>\n",
" <td>Created with create_all notebook</td>\n",
" <td>2024-11-26 15:43:44.171000+00:00</td>\n",
" <td>2024-11-26 15:31:43.844000+00:00</td>\n",
" <td>None</td>\n",
" <td>NaN</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>HDB_basictickdb</td>\n",
" <td>RUNNING</td>\n",
" <td>HDB</td>\n",
" <td>None</td>\n",
" <td>[{'key': 's', 'value': '2'}, {'key': 'g', 'value': '1'}]</td>\n",
" <td>Created with create_all notebook</td>\n",
" <td>2024-11-26 15:43:45.385000+00:00</td>\n",
" <td>2024-11-26 15:31:41.602000+00:00</td>\n",
" <td>basictickdb</td>\n",
" <td>None</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>RDB_basictickdb</td>\n",
" <td>RUNNING</td>\n",
" <td>RDB</td>\n",
" <td>None</td>\n",
" <td>[{'key': 's', 'value': '2'}, {'key': 'g', 'value': '1'}, {'key': 'tp', 'value': 'TP_basictickdb'}, {'key': 'procName', 'value': 'RDB_basictickdb'}, {'key': 'volumeName', 'value': 'RDB_TP_SHARED'}, {'key': 'hdbProc', 'value': 'HDB_basictickdb'}, {'key': 'dbView', 'value': 'basictickdb_DBVIEW'}, {'key': 'AWS_ZIP_DEFAULT', 'value': '17,2,6'}]</td>\n",
" <td>Created with create_all notebook</td>\n",
" <td>2024-11-26 15:48:56.069000+00:00</td>\n",
" <td>2024-11-26 15:38:50.796000+00:00</td>\n",
" <td>basictickdb</td>\n",
" <td>[]</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>RTS_basictickdb</td>\n",
" <td>RUNNING</td>\n",
" <td>RDB</td>\n",
" <td>None</td>\n",
" <td>[{'key': 's', 'value': '2'}, {'key': 'g', 'value': '1'}, {'key': 'tp', 'value': 'TP_basictickdb'}]</td>\n",
" <td>Created with create_all notebook</td>\n",
" <td>2024-11-26 15:49:56.778000+00:00</td>\n",
" <td>2024-11-26 15:38:53.241000+00:00</td>\n",
" <td>None</td>\n",
" <td>NaN</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5</th>\n",
" <td>TP_basictickdb</td>\n",
" <td>RUNNING</td>\n",
" <td>TICKERPLANT</td>\n",
" <td>None</td>\n",
" <td>[{'key': 'procName', 'value': 'TP_basictickdb'}, {'key': 'volumeName', 'value': 'RDB_TP_SHARED'}, {'key': 'g', 'value': '1'}]</td>\n",
" <td>Created with create_all notebook</td>\n",
" <td>2024-11-26 15:38:46.921000+00:00</td>\n",
" <td>2024-11-26 15:25:02.489000+00:00</td>\n",
" <td>None</td>\n",
" <td>NaN</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" clusterName status clusterType capacityConfiguration \\\n",
"0 GATEWAY_basictickdb RUNNING GATEWAY None \n",
"1 HDB_basictickdb RUNNING HDB None \n",
"3 RDB_basictickdb RUNNING RDB None \n",
"4 RTS_basictickdb RUNNING RDB None \n",
"5 TP_basictickdb RUNNING TICKERPLANT None \n",
"\n",
" commandLineArguments \\\n",
"0 [{'key': 's', 'value': '2'}, {'key': 'g', 'value': '1'}, {'key': 'rdb_name', 'value': 'RDB_basictickdb'}, {'key': 'hdb_name', 'value': 'HDB_basictickdb'}] \n",
"1 [{'key': 's', 'value': '2'}, {'key': 'g', 'value': '1'}] \n",
"3 [{'key': 's', 'value': '2'}, {'key': 'g', 'value': '1'}, {'key': 'tp', 'value': 'TP_basictickdb'}, {'key': 'procName', 'value': 'RDB_basictickdb'}, {'key': 'volumeName', 'value': 'RDB_TP_SHARED'}, {'key': 'hdbProc', 'value': 'HDB_basictickdb'}, {'key': 'dbView', 'value': 'basictickdb_DBVIEW'}, {'key': 'AWS_ZIP_DEFAULT', 'value': '17,2,6'}] \n",
"4 [{'key': 's', 'value': '2'}, {'key': 'g', 'value': '1'}, {'key': 'tp', 'value': 'TP_basictickdb'}] \n",
"5 [{'key': 'procName', 'value': 'TP_basictickdb'}, {'key': 'volumeName', 'value': 'RDB_TP_SHARED'}, {'key': 'g', 'value': '1'}] \n",
"\n",
" clusterDescription lastModifiedTimestamp \\\n",
"0 Created with create_all notebook 2024-11-26 15:43:44.171000+00:00 \n",
"1 Created with create_all notebook 2024-11-26 15:43:45.385000+00:00 \n",
"3 Created with create_all notebook 2024-11-26 15:48:56.069000+00:00 \n",
"4 Created with create_all notebook 2024-11-26 15:49:56.778000+00:00 \n",
"5 Created with create_all notebook 2024-11-26 15:38:46.921000+00:00 \n",
"\n",
" createdTimestamp databaseName cacheConfigurations \n",
"0 2024-11-26 15:31:43.844000+00:00 None NaN \n",
"1 2024-11-26 15:31:41.602000+00:00 basictickdb None \n",
"3 2024-11-26 15:38:50.796000+00:00 basictickdb [] \n",
"4 2024-11-26 15:38:53.241000+00:00 None NaN \n",
"5 2024-11-26 15:25:02.489000+00:00 None NaN "
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"cdf = get_clusters(client, environmentId=ENV_ID)\n",
"\n",
"if cdf is not None:\n",
" # filter for clusters in this application\n",
" cdf = cdf[cdf['clusterName'].isin(all_clusters.values())]\n",
"\n",
"display(cdf)"
]
},
{
"cell_type": "markdown",
"id": "cc3cb064-adfc-4f57-8104-a730184197c3",
"metadata": {},
"source": [
"# Start FeedHandler\n",
"With all clusters running start a feedhandler to send data to the running tickerplant (TP).\n",
"\n",
"\n",
"## feedhandler_pykx.py\n",
"A PyKX based feedhandler is included in this example, to run the feedhandler outside a notebook:\n",
"\n",
"```\n",
"$ python feedhandler_pykx.py -h\n",
"usage: feedhandler_pykx.py [-h] [-profile PROFILE] -env ENV -username USERNAME [-tick TICK] -tp_name TP_NAME [-debug]\n",
"\n",
"options:\n",
" -h, --help show this help message and exit\n",
" -profile PROFILE, -pr PROFILE\n",
" Profile to use for access\n",
" -env ENV, -e ENV environment ID\n",
" -username USERNAME, -u USERNAME\n",
" kdb Username\n",
" -tick TICK, -t TICK Timer ticks (milliseconds)\n",
" -tp_name TP_NAME, -tp TP_NAME\n",
" Tickerplant Cluster Name\n",
" -debug, -d Debugging output\n",
"```\n",
"\n",
"### Environment Variables Used\n",
"Variables from env.py will be passed to the python feedhandler: \n",
"- ENV_ID \n",
"- KDB_USERNAME \n",
"\n",
"Variables from basictick_setup.py that will be passed to the python feedhandler: \n",
"- TP_CLUSTER_NAME \n",
"\n",
"## Example Run\n",
"```\n",
"$ nohup python feedhandler_pykx.py -env jlcenjvtkgzrdek2qqv7ic -username bob -tp_name TP_basictickdb -tick 10000 -debug &> feedhandler.out &\n",
"```\n",
"\n",
"## Stopping Feedhandler\n",
"```\n",
"$ pkill -f \"python feedhandler_pykx.py\"\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": 27,
"id": "d0a5c1f3-4993-434f-9149-7e6736ee9384",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"To run the feedhandler, from a terminal run this:\n",
"nohup python feedhandler_pykx.py -env jlcenjvtkgzrdek2qqv7ic -username sagemaker -tp_name TP_basictickdb -tick 10000 -debug &> feedhandler.out &\n",
"\n",
"To kill the feedhandler, from a terminal run this:\n",
"pkill -f \"python feedhandler_pykx.py\"\n"
]
}
],
"source": [
"cmd = f\"nohup python feedhandler_pykx.py -env {ENV_ID} -username {KDB_USERNAME} -tp_name {TP_CLUSTER_NAME} -tick {FH_TICK} -debug &> feedhandler.out &\"\n",
"\n",
"print(\"To run the feedhandler, from a terminal run this:\")\n",
"print(cmd)\n",
"#os.system(cmd) # this should work but doesn't\n",
"\n",
"print()\n",
"print(\"To kill the feedhandler, from a terminal run this:\")\n",
"print('pkill -f \"python feedhandler_pykx.py\"')"
]
},
{
"cell_type": "code",
"execution_count": 62,
"id": "59cac298-1b13-47d1-a796-3c6584954864",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Execute below in a terminal\n",
"\n",
"nohup python feedhandler_pykx.py -env jlcenjvtkgzrdek2qqv7ic -username sagemaker -tp_name TP_basictickdb -tick 10000 -debug &> feedhandler.out 2>&1 &\n"
]
}
],
"source": [
"# Echo the system command to run\n",
"\n",
"cmd=f\"nohup python feedhandler_pykx.py -env {ENV_ID} -username {KDB_USERNAME} -tp_name {TP_CLUSTER_NAME} -tick {FH_TICK} -debug &> feedhandler.out 2>&1 &\"\n",
"\n",
"print(\"Execute below in a terminal\")\n",
"print()\n",
"print(cmd)"
]
},
{
"cell_type": "markdown",
"id": "9e91a23b-b100-4763-9c50-c819f5824202",
"metadata": {},
"source": [
"# All Processes Running\n",
"This completes the creation of this applciations resources, the application is ready to use.\n",
"\n",
"## Next Steps\n",
"Try the [pykx_query_all](pykx_query_all.ipynb) notebook to query the contents of all the application clusters. You can see an example of measuring the latency of communications between clusters in the [pykx_sub_calc](pykx_sub_calc.ipynb) notebook. The [manual_eod](manual_eod.ipynb) notebook demonstrates how to conenct to and remotely call the end of day (EOD) function on the RDB that adds the day's data collected in the RDB as a new changeset to the managed database.\n",
"\n",
"See the [debugging](debugging.ipynb) notebook for an example of creating/debugging functions on a remote cluster.\n",
"\n",
"## Cleaning Up\n",
"To end the application and destroy all the resources created here, use the [delete_all](delete_all.ipynb) notebook."
]
},
{
"cell_type": "code",
"execution_count": 63,
"id": "86f33240-bb12-49f3-8d9c-5783c25eb182",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Last Run: 2024-11-26 17:27:26.977428\n"
]
}
],
"source": [
"print( f\"Last Run: {datetime.datetime.now()}\" )"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7024ab81-9ee8-416c-af21-456355cd58c0",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "conda_python3",
"language": "python",
"name": "conda_python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.15"
}
},
"nbformat": 4,
"nbformat_minor": 5
}