# BasicTick V3: Create Everything
This notebook will use the AWS Python boto3 APIs to create the needed resources for a basic tick application. This application will simulate a market data capture system. 

## Architecture
<img src="images/Deepdive Diagrams-BasicTick V3.drawio.png"  width="80%">

## Abbreviations
- RTS: Realtime Subscriber
- FH: Feedhandler    
- HDB: Historical Database
- RDB: Realtime Database    
- TP: Tickerplant    
- GW: Gateway

## AWS Resources Created
- Database   
- Changeset (adds data to database)   
- Scaling Group in which all clusters are run   
- Shared Volume used by database view and clusters  
- Dataview of database on the shared volume
  - option: view can be auto-updating or static
- Clusters: TP, RTS, RDB, HDB, and GW   

### Non AWS
For this demonstration application the FH is run locally and publishes data to the TP.    

# References
[Basic tick system](https://code.kx.com/insights/1.10/core/qpexample-tick/index.html)   
[KxSystems/kdb-tick (github)](https://github.com/KxSystems/kdb-tick)   
[Architecture of kdb+ systems](https://code.kx.com/q/architecture/)   


In [1]:
#!pip install -r requirements.txt

In [2]:
import os
import subprocess
import boto3
import json
import datetime

import pykx as kx

from env import *
from managed_kx import *

# Cluster names and database
from config import *

# ----------------------------------------------------------------

#NODE_TYPE="kx.sg.4xlarge"
#NODE_TYPE="kx.sg.2xlarge"
NODE_TYPE="kx.sg.xlarge"

CODE_CONFIG={ 's3Bucket': S3_BUCKET, 's3Key': f'{S3_CODE_PATH}/{CODEBASE}.zip' }

NAS1_CONFIG= {
        'type': 'SSD_250',
        'size': 1200
}

# Realtime Database (RDB) Configs
RDB_INIT_SCRIPT='rdbmkdb.q'
RDB_CMD_ARGS=[
    { 'key': 's', 'value': '2' }, 
    { 'key': 'g', 'value': '1' }, 
    { 'key': 'tp', 'value': TP_CLUSTER_NAME }, 
    { 'key': 'procName', 'value': RDB_CLUSTER_NAME }, 
    { 'key': 'volumeName', 'value': VOLUME_NAME }, 
    { 'key': 'hdbProc', 'value': HDB_CLUSTER_NAME }, 
    { 'key': 'dbView', 'value': DBVIEW_NAME }, 
    { 'key': 'AWS_ZIP_DEFAULT', 'value': '17,2,6' },
]

# RTS Configs
RTS_INIT_SCRIPT='rtsmkdb.q'
RTS_CMD_ARGS = [
    { 'key': 's', 'value': '2' }, 
    { 'key': 'g', 'value': '1' }, 
    { 'key': 'tp', 'value': TP_CLUSTER_NAME }, 
]

# Tickerplant (TP) Configs
TP_INIT_SCRIPT='tick.q'
TP_CMD_ARGS=[
    { 'key': 'procName', 'value': TP_CLUSTER_NAME }, 
    { 'key': 'volumeName', 'value': VOLUME_NAME }, 
    { 'key': 'g', 'value': '1' }, 
]

# Historical Database (HDB) Configs
HDB_INIT_SCRIPT='hdbmkdb.q'
HDB_CMD_ARGS=[
    { 'key': 's', 'value': '2' }, 
    { 'key': 'g', 'value': '1' }, 
]

# Gateway Configs
GW_INIT_SCRIPT='gwmkdb.q'
GW_CMD_ARGS=[
    { 'key': 's', 'value': '2' }, 
    { 'key': 'g', 'value': '1' }, 
    { 'key': 'rdb_name', 'value': RDB_CLUSTER_NAME}, 
    { 'key': 'hdb_name', 'value': HDB_CLUSTER_NAME}, 
]

# VPC Configuration
VPC_CONFIG={ 
    'vpcId': VPC_ID,
    'securityGroupIds': SECURITY_GROUPS,
    'subnetIds': SUBNET_IDS,
    'ipAddressType': 'IP_V4' 
}

# Feedhandler configs
FEED_TIMER=10000
FH_PORT=5030 


In [3]:
# Using credentials and create service client
session = boto3.Session()

# create finspace client
client = session.client(service_name='finspace')

# Create a Sample Database
Create a synthetic database using kxtaqdb.q (takes 1-2 minutes)

In [4]:
!rm -rf $SOURCE_DATA_DIR

In [5]:
# call local q (using pykx) to create the database
kx.q("\l basictick/kxtaqdb.q")

# Database size
print( os.system(f"du -sh {SOURCE_DATA_DIR}") )


"Generated trade|quote records: 866361 4323449"
"Generated trade|quote records: 888436 4440838"
"Generated trade|quote records: 888187 4446429"
"Generated trade|quote records: 883938 4422176"
"Generated trade|quote records: 889931 4447795"
"Generated trade|quote records: 884716 4424949"
323M	hdb
0


## Stage Database Files to S3
Using AWS cli, copy hdb to staging S3 bucket

In [6]:
# S3 destination
S3_DEST=f"s3://{S3_BUCKET}/{S3_DATA_PATH}/{SOURCE_DATA_DIR}/"

cp = ""

if AWS_ACCESS_KEY_ID is not None:
    cp = f"""
export AWS_ACCESS_KEY_ID={AWS_ACCESS_KEY_ID} --quiet
export AWS_SECRET_ACCESS_KEY={AWS_SECRET_ACCESS_KEY}
export AWS_SESSION_TOKEN={AWS_SESSION_TOKEN}
"""

cp += f"""
aws s3 rm --recursive {S3_DEST} --quiet
aws s3 sync --exclude .DS_Store {SOURCE_DATA_DIR} {S3_DEST} --quiet
"""
    
# execute the S3 copy
os.system(cp)

# confirm destination contents
print( f"Destination: {S3_DEST}" )
print( os.system(f"aws s3 ls {S3_DEST}") )

Destination: s3://kdb-demo-829845998889-kms/data/hdb/
                           PRE 2024.11.18/
                           PRE 2024.11.19/
                           PRE 2024.11.20/
                           PRE 2024.11.21/
                           PRE 2024.11.22/
                           PRE 2024.11.25/
2024-11-26 15:15:09         75 sym
0


## Create A Managed Database
Using the AWS APIs, create a managed database in Managed kdb Insights. The database is initially empty and is populated using changesets.

### Reference
[Managed kdb Insights databases](https://docs.aws.amazon.com/finspace/latest/userguide/finspace-managed-kdb-db.html)

### APIs used
[get_kx_database](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/finspace/client/get_kx_database.html)  
[create_kx_database](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/finspace/client/create_kx_database.html)  


In [7]:
# assume database exists
create_db=False

try:
    resp = client.get_kx_database(environmentId=ENV_ID, databaseName=DB_NAME)
    resp.pop('ResponseMetadata', None)
except:
    # does not exist, will create
    create_db=True

if create_db:
    print(f"CREATING Database: {DB_NAME}")
    resp = client.create_kx_database(environmentId=ENV_ID, databaseName=DB_NAME, description="Basictick kdb database")
    resp.pop('ResponseMetadata', None)

    print(f"CREATED Database: {DB_NAME}")

print(json.dumps(resp,sort_keys=True,indent=4,default=str))

CREATING Database: basictickdb
CREATED Database: basictickdb
{
    "createdTimestamp": "2024-11-26 15:15:10.607000+00:00",
    "databaseArn": "arn:aws:finspace:us-east-1:829845998889:kxEnvironment/jlcenjvtkgzrdek2qqv7ic/kxDatabase/basictickdb",
    "databaseName": "basictickdb",
    "description": "Basictick kdb database",
    "environmentId": "jlcenjvtkgzrdek2qqv7ic",
    "lastModifiedTimestamp": "2024-11-26 15:15:10.607000+00:00"
}


## Add Data to Database
Add the created database data copied earlier to S3 to the created managed database using create_kx_changeset. 

### APIs used
[create_kx_changeset](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/finspace/client/create_kx_changeset.html)  


In [8]:
# get the current list of database changesets
c_set_list = list_kx_changesets(client, environmentId=ENV_ID, databaseName=DB_NAME)

if len(c_set_list) == 0:
    print("Adding Changeset to empty database")
    changes=[]

    for f in os.listdir(f"{SOURCE_DATA_DIR}"):
        if os.path.isdir(f"{SOURCE_DATA_DIR}/{f}"):
            changes.append( { 'changeType': 'PUT', 's3Path': f"{S3_DEST}{f}/", 'dbPath': f"/{f}/" } )
        else:
            changes.append( { 'changeType': 'PUT', 's3Path': f"{S3_DEST}{f}", 'dbPath': f"/" } )

    resp = client.create_kx_changeset(environmentId=ENV_ID, databaseName=DB_NAME, 
        changeRequests=changes)

    resp.pop('ResponseMetadata', None)
    changeset_id = resp['changesetId']

    print("Changeset...")
    print(json.dumps(resp,sort_keys=True,indent=4,default=str))
else:
    # use latest changesetId
    changeset_id = c_set_list[0]['changesetId']    

Adding Changeset to empty database
Changeset...
{
    "changeRequests": [
        {
            "changeType": "PUT",
            "dbPath": "/2024.11.19/",
            "s3Path": "s3://kdb-demo-829845998889-kms/data/hdb/2024.11.19/"
        },
        {
            "changeType": "PUT",
            "dbPath": "/2024.11.25/",
            "s3Path": "s3://kdb-demo-829845998889-kms/data/hdb/2024.11.25/"
        },
        {
            "changeType": "PUT",
            "dbPath": "/2024.11.22/",
            "s3Path": "s3://kdb-demo-829845998889-kms/data/hdb/2024.11.22/"
        },
        {
            "changeType": "PUT",
            "dbPath": "/2024.11.18/",
            "s3Path": "s3://kdb-demo-829845998889-kms/data/hdb/2024.11.18/"
        },
        {
            "changeType": "PUT",
            "dbPath": "/2024.11.21/",
            "s3Path": "s3://kdb-demo-829845998889-kms/data/hdb/2024.11.21/"
        },
        {
            "changeType": "PUT",
            "dbPath": "/2024.11.20/",
     

In [9]:
# Wait for the changeset to be added to the database
wait_for_changeset_status(client, environmentId=ENV_ID, databaseName=DB_NAME, changesetId=changeset_id, show_wait=True)
print("**Done**")

Status is IN_PROGRESS, total wait 0:00:00, waiting 10 sec ...
Status is IN_PROGRESS, total wait 0:00:10, waiting 10 sec ...
Status is IN_PROGRESS, total wait 0:00:20, waiting 10 sec ...
Status is IN_PROGRESS, total wait 0:00:30, waiting 10 sec ...
Status is IN_PROGRESS, total wait 0:00:40, waiting 10 sec ...
Status is IN_PROGRESS, total wait 0:00:50, waiting 10 sec ...
**Done**


### Contents of the Managed Database
Display the changesets of the managed database.

### APIs used
[list_kx_changesets](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/services/finspace/client/list_kx_changesets.html)   
[get_kx_changeset](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/finspace/client/get_kx_changeset.html)  


In [10]:
# get the current list of database changesets
c_set_list = list_kx_changesets(client, environmentId=ENV_ID, databaseName=DB_NAME)

print(100*"=")
print(f"Database: {DB_NAME}, Changesets: {len(c_set_list)}")
print(100*"=")

# sort by create time
c_set_list = sorted(c_set_list, key=lambda d: d['createdTimestamp']) 

for c in c_set_list:
    c_set_id = c['changesetId']
    print(f"  Changeset: {c_set_id}: Created: {c['createdTimestamp']} ({c['status']})")
    c_rqs = client.get_kx_changeset(environmentId=ENV_ID, databaseName=DB_NAME, changesetId=c_set_id)['changeRequests']

    chs_pdf = pd.DataFrame.from_dict(c_rqs).style.hide(axis='index')
    display(chs_pdf)

Database: basictickdb, Changesets: 1
  Changeset: 1Mm0hTkkSc1MXUzumKSe8A: Created: 2024-11-26 15:15:12.586000+00:00 (COMPLETED)


changeType,s3Path,dbPath
PUT,s3://kdb-demo-829845998889-kms/data/hdb/2024.11.19/,/2024.11.19/
PUT,s3://kdb-demo-829845998889-kms/data/hdb/2024.11.25/,/2024.11.25/
PUT,s3://kdb-demo-829845998889-kms/data/hdb/2024.11.22/,/2024.11.22/
PUT,s3://kdb-demo-829845998889-kms/data/hdb/2024.11.18/,/2024.11.18/
PUT,s3://kdb-demo-829845998889-kms/data/hdb/2024.11.21/,/2024.11.21/
PUT,s3://kdb-demo-829845998889-kms/data/hdb/2024.11.20/,/2024.11.20/
PUT,s3://kdb-demo-829845998889-kms/data/hdb/sym,/


# Create Scaling Group
The scaling group represents the total compute avilable to the application. All clusters will be placed into the scaling group and will share the compute and memory of the scaling group.

## Reference
[Managed kdb scaling groups](https://docs.aws.amazon.com/finspace/latest/userguide/finspace-managed-kdb-scaling-groups.html)

## APIs used
[create_kx_scaling_group](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/finspace/client/create_kx_scaling_group.html)  

In [11]:
# Check if scaling group exits, only create if it does not exist
resp = get_kx_scaling_group(client=client, environmentId=ENV_ID, scalingGroupName=SCALING_GROUP_NAME)

if resp is None:
    resp = client.create_kx_scaling_group(
        environmentId = ENV_ID, 
        scalingGroupName = SCALING_GROUP_NAME,
        hostType=NODE_TYPE,
        availabilityZoneId = AZ_ID
    )
else:
    print(f"Scaling Group {SCALING_GROUP_NAME} exists")    

# Create Shared Volume
The shared volume is a common storage device for the application. Every cluster using the shared volume will have a writable directory named after the cluster, can read the directories named after other clusters in the application using the volume. Also, there is a common directory for every shared volume as well, all clusters using a volumes can read/write to the common directory.

## Directory Structure
Any shared volumes will appear in the /opt/kx/app/shared directory of clusters using the volume, with a path is named for shared volume (/opt/kx/app/shared/VOLUME_NAME). Each cluster using the volume will have a directory named for the cluster that only the cluster can write to (/opt/kx/app/shared/VOLUME_NAME/CLUSTER_NAME) and others using the volumes can read from. Last each shared volume has a directory that is read/write to all clusters using the volume (/opt/kx/app/shared/VOLUME_NAME/common)

**Root:** /opt/kx/app/shared   
**Each Volume:** /opt/kx/app/shared/VOLUME_NAME   
**Write per cluster (read otherwise):** /opt/kx/app/shared/VOLUME_NAME/CLUSTER_NAME   
**common read/write:** /opt/kx/app/shared/VOLUME_NAME/common   

## Reference
[FinSpace Managed kdb Volumes](https://docs.aws.amazon.com/finspace/latest/userguide/finspace-managed-kdb-volumes.html)

## APIs used
[create_kx_volume](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/finspace/client/create_kx_volume.html) 


In [12]:
# Check if volume already exists before trying to create one
resp = get_kx_volume(client=client, environmentId=ENV_ID, volumeName=VOLUME_NAME)

if resp is None:
    resp = client.create_kx_volume(
        environmentId = ENV_ID, 
        volumeType = 'NAS_1',
        volumeName = VOLUME_NAME,
        description = 'Shared volume between TP and RDB',
        nas1Configuration = NAS1_CONFIG,
        azMode='SINGLE',
        availabilityZoneIds=[ AZ_ID ]    
    )
else:
    print(f"Volume {VOLUME_NAME} exists")        

# Create Dataview
Create a dataview of the database and have all of its data presented (cached) on the shared volume. Customers can also choose to cache only a portion of the database and can also shoose to tier storage on different volumes as well.

### Reference
[Dataviews for querying data](https://docs.aws.amazon.com/finspace/latest/userguide/finspace-managed-kdb-dataviews.html)

### APIs used
[create_kx_dataview](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/finspace/client/create_kx_dataview.html) 


In [13]:
# before creating the dataview, be sure the volume is created and ready
wait_for_volume_status(client=client, environmentId=ENV_ID, volumeName=VOLUME_NAME, show_wait=True)
print("** VOLUME is READY **")

Volume: RDB_TP_SHARED status is CREATING, total wait 0:00:00, waiting 30 sec ...
Volume: RDB_TP_SHARED status is CREATING, total wait 0:00:30, waiting 30 sec ...
Volume: RDB_TP_SHARED status is CREATING, total wait 0:01:00, waiting 30 sec ...
Volume: RDB_TP_SHARED status is CREATING, total wait 0:01:30, waiting 30 sec ...
Volume: RDB_TP_SHARED status is CREATING, total wait 0:02:00, waiting 30 sec ...
Volume: RDB_TP_SHARED status is CREATING, total wait 0:02:30, waiting 30 sec ...
Volume: RDB_TP_SHARED status is CREATING, total wait 0:03:00, waiting 30 sec ...
Volume: RDB_TP_SHARED status is CREATING, total wait 0:03:30, waiting 30 sec ...
Volume: RDB_TP_SHARED status is CREATING, total wait 0:04:00, waiting 30 sec ...
Volume: RDB_TP_SHARED status is CREATING, total wait 0:04:30, waiting 30 sec ...
Volume: RDB_TP_SHARED status is CREATING, total wait 0:05:00, waiting 30 sec ...
Volume: RDB_TP_SHARED status is CREATING, total wait 0:05:30, waiting 30 sec ...
Volume: RDB_TP_SHARED status

In [14]:
# do changesets exist?
c_set_list = list_kx_changesets(client, environmentId=ENV_ID, databaseName=DB_NAME)

if len(c_set_list) != 0:
    # sort by create time
    c_set_list = sorted(c_set_list, key=lambda d: d['createdTimestamp']) 
    latest_changeset = c_set_list[-1]['changesetId']

    # Check if dataview already exists and is set to the requested changeset_id
    resp = get_kx_dataview(client=client, environmentId=ENV_ID, databaseName=DB_NAME, dataviewName=DBVIEW_NAME)

    if resp is None:
        resp = client.create_kx_dataview(
            environmentId = ENV_ID, 
            databaseName=DB_NAME, 
            dataviewName=DBVIEW_NAME,
            azMode='SINGLE',
            availabilityZoneId=AZ_ID,
            segmentConfigurations=[
                { 
                    'volumeName': VOLUME_NAME,
                    'dbPaths': ['/*'],  # cache all of database
                }
            ],
            autoUpdate=False,
            changesetId=latest_changeset, # latest changeset_id for static view
            description = f'Dataview of database'
        )
    elif resp['changesetId'] != latest_changeset:
        print(f"Dataview {DBVIEW_NAME} exists but needs updating...")
        resp = client.update_kx_dataview(environmentId=ENV_ID, 
            databaseName=DB_NAME, 
            dataviewName=DBVIEW_NAME, 
            changesetId=latest_changeset, 
            segmentConfigurations=[
                {'dbPaths': ['/*'], 'volumeName': VOLUME_NAME}
            ]
        )
    else:
        print(f"Dataview {DBVIEW_NAME} exists with current changeset: {latest_changeset}")
else:
    # no changesets, do NOT create view
    print(f"No changeset in database: {DB_NAME}, Dataview {DBVIEW_NAME} not created")        


# Create Clusters
Create the needed clusters for the application. 

Code to be used in this application must be staged to an S3 bucket the service can read from, that code will be deployed to each cluster as part of the cluster creation process.

## Reference
[Managed kdb Insights clusters](https://docs.aws.amazon.com/finspace/latest/userguide/finspace-managed-kdb-clusters.html)   
[Cluster types](https://docs.aws.amazon.com/finspace/latest/userguide/kdb-cluster-types.html)

In [15]:
!ls -lrtha $CODEBASE

total 80K
-rw-rw-r-- 1 ec2-user ec2-user 2.8K Sep 25 13:59 tick.q
drwxrwxr-x 2 ec2-user ec2-user 4.0K Sep 25 13:59 tick
-rw-rw-r-- 1 ec2-user ec2-user  212 Sep 25 13:59 taq.schema.q
-rw-rw-r-- 1 ec2-user ec2-user 3.2K Sep 25 13:59 rtsmkdb.q
-rw-rw-r-- 1 ec2-user ec2-user 4.8K Sep 25 13:59 rdbmkdb.q
-rw-rw-r-- 1 ec2-user ec2-user  695 Sep 25 13:59 query.q
-rw-rw-r-- 1 ec2-user ec2-user  752 Sep 25 13:59 kxtaqsubscriber.q
-rw-rw-r-- 1 ec2-user ec2-user 2.7K Sep 25 13:59 kxtaqdb.q
-rw-rw-r-- 1 ec2-user ec2-user  655 Sep 25 13:59 hdbmkdb.q
-rw-rw-r-- 1 ec2-user ec2-user 3.0K Sep 25 13:59 gwmkdb.q
-rw-rw-r-- 1 ec2-user ec2-user  274 Sep 25 13:59 funcDownHandle.q
-rw-rw-r-- 1 ec2-user ec2-user 3.1K Sep 25 13:59 connectmkdb.q
-rw-rw-r-- 1 ec2-user ec2-user 4.5K Oct  1 21:17 kxtaqfeed.q
-rw-rw-r-- 1 ec2-user ec2-user 4.5K Nov  1 01:06 feed.q
drwxrwxr-x 4 ec2-user ec2-user 4.0K Nov  1 01:06 .
drwxrwxr-x 2 ec2-user ec2-user 4.0K Nov 26 15:09 .ipynb_checkpoints
drwxrwxr-x 7 ec2-user ec2-user 4.0K

In [16]:
# create zipfile of the local code
os.system(f"cd {CODEBASE}; zip -r -X ../{CODEBASE}.zip . -x '*.ipynb_checkpoints*';")

cp = ""
# Copy command with credentials
if AWS_ACCESS_KEY_ID is not None:
    cp = f"""
export AWS_ACCESS_KEY_ID={AWS_ACCESS_KEY_ID}
export AWS_SECRET_ACCESS_KEY={AWS_SECRET_ACCESS_KEY}
export AWS_SESSION_TOKEN={AWS_SESSION_TOKEN}
"""
    
cp += f"""
aws s3 cp  --exclude .DS_Store {CODEBASE}.zip s3://{S3_BUCKET}/code/{CODEBASE}.zip
"""
    
# Copy the code
os.system(cp)

# Code on S3
os.system(f"aws s3 ls s3://{S3_BUCKET}/code/{CODEBASE}.zip")

updating: query.q (deflated 49%)
updating: connectmkdb.q (deflated 63%)
updating: taq.schema.q (deflated 45%)
updating: kxtaqsubscriber.q (deflated 42%)
updating: kxtaqdb.q (deflated 44%)
updating: rtsmkdb.q (deflated 56%)
updating: rdbmkdb.q (deflated 58%)
updating: tick/ (stored 0%)
updating: tick/u.q (deflated 32%)
updating: hdbmkdb.q (deflated 42%)
updating: kxtaqfeed.q (deflated 50%)
updating: tick.q (deflated 49%)
updating: feed.q (deflated 52%)
updating: gwmkdb.q (deflated 61%)
updating: funcDownHandle.q (deflated 33%)
upload: ./basictick.zip to s3://kdb-demo-829845998889-kms/code/basictick.zip
2024-11-26 15:24:58      16585 basictick.zip


0

## Wait for Scaling Group to be Ready
Before creating clusters in a scaling group, be sure the scaling group is ready.

In [17]:
# wait for the scaling group to create
wait_for_scaling_group_status(client=client, environmentId=ENV_ID, scalingGroupName=SCALING_GROUP_NAME, show_wait=True)
print("** DONE **")

Scaling Group: SCALING_GROUP_basictickdb status is now ACTIVE, total wait 0:00:00
** DONE **


## Create Tickerplant (TP) Cluster
Tickerplant will deliver data from feedhandler to subscribing RDB.

### APIs used
[create_kx_cluster](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/finspace/client/create_kx_cluster.html) 

#### Notes
- No database used by TP, databases argument is not used   
- Use tickerplantLogConfiguration **not** savedownStorageConfiguration   
  - tickerplantLogVolumes uses the same shared volume as other clusters

In [18]:
# does cluster already exist?
resp = get_kx_cluster(client, environmentId=ENV_ID, clusterName=TP_CLUSTER_NAME)

if resp is not None:
    print(f"Cluster: {TP_CLUSTER_NAME} already exists")
else:
    print(f"Creating: {TP_CLUSTER_NAME}")

    resp = client.create_kx_cluster(
        environmentId=ENV_ID, 
        clusterName=TP_CLUSTER_NAME,
        clusterType='TICKERPLANT',
        releaseLabel = '1.0',
        executionRole=EXECUTION_ROLE,
        scalingGroupConfiguration={
            'memoryReservation': 6,
            'nodeCount': 1,
            'scalingGroupName': SCALING_GROUP_NAME,
        },
        tickerplantLogConfiguration ={ 'tickerplantLogVolumes': [ VOLUME_NAME ] },
        clusterDescription="Created with create_all notebook",
        code=CODE_CONFIG,
        initializationScript=TP_INIT_SCRIPT,
        commandLineArguments=TP_CMD_ARGS,
        azMode=AZ_MODE,
        availabilityZoneId=AZ_ID,
        vpcConfiguration=VPC_CONFIG
    )

Creating: TP_basictickdb


## Create Historical Database (HDB) Cluster
A multi-node HDB cluster will serve up queries for T+1 and older data. 

### APIs used
[create_kx_cluster](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/finspace/client/create_kx_cluster.html) 

#### Notes
- **databases**: defines which database and view to use
  - View used by the HDB cluster must be up and running   
- No a tickerplant, no tickerplantLogConfiguration argument   
- No savedown needed, no savedownStorageConfiguration argument  

In [19]:
# Dataview must be ready before creating the HDB Cluster
wait_for_dataview_status(client=client, environmentId=ENV_ID, databaseName=DB_NAME, dataviewName=DBVIEW_NAME, show_wait=True)
print("** Dataview is READY **")

Dataview: basictickdb_DBVIEW status is CREATING, total wait 0:00:00, waiting 30 sec ...
Dataview: basictickdb_DBVIEW status is CREATING, total wait 0:00:30, waiting 30 sec ...
Dataview: basictickdb_DBVIEW status is CREATING, total wait 0:01:00, waiting 30 sec ...
Dataview: basictickdb_DBVIEW status is CREATING, total wait 0:01:30, waiting 30 sec ...
Dataview: basictickdb_DBVIEW status is CREATING, total wait 0:02:00, waiting 30 sec ...
Dataview: basictickdb_DBVIEW status is CREATING, total wait 0:02:30, waiting 30 sec ...
Dataview: basictickdb_DBVIEW status is CREATING, total wait 0:03:00, waiting 30 sec ...
Dataview: basictickdb_DBVIEW status is CREATING, total wait 0:03:30, waiting 30 sec ...
Dataview: basictickdb_DBVIEW status is CREATING, total wait 0:04:00, waiting 30 sec ...
Dataview: basictickdb_DBVIEW status is CREATING, total wait 0:04:30, waiting 30 sec ...
Dataview: basictickdb_DBVIEW status is CREATING, total wait 0:05:00, waiting 30 sec ...
Dataview: basictickdb_DBVIEW sta

In [20]:
# does cluster already exist?
resp = get_kx_cluster(client, environmentId=ENV_ID, clusterName=HDB_CLUSTER_NAME)

if resp is not None:
    print(f"Cluster: {HDB_CLUSTER_NAME} already exists")
else:
    print(f"Creating: {HDB_CLUSTER_NAME}")

    resp = client.create_kx_cluster(
        environmentId=ENV_ID, 
        clusterName=HDB_CLUSTER_NAME,
        clusterType='HDB',
        releaseLabel = '1.0',
        executionRole=EXECUTION_ROLE,
        databases=[{ 'databaseName': DB_NAME, 'dataviewName': DBVIEW_NAME }],
        scalingGroupConfiguration={
            'memoryReservation': 6, # minimum
            'nodeCount': 2,
            'scalingGroupName': SCALING_GROUP_NAME,
        },
        clusterDescription="Created with create_all notebook",
        code=CODE_CONFIG,
        initializationScript=HDB_INIT_SCRIPT,
        commandLineArguments=HDB_CMD_ARGS,
        azMode=AZ_MODE,
        availabilityZoneId=AZ_ID,
        vpcConfiguration=VPC_CONFIG
    )

Creating: HDB_basictickdb


## Create Gateway (GW) Cluster
The Gateway will handle client queries for data in the RDB and HDB. Gateways act as single API access point for data queries and will query both the RDB and HDB and aggregate results back to requestor.

### APIs used
[create_kx_cluster](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/finspace/client/create_kx_cluster.html) 

#### Notes
- Gateways connect to other clusters and aggregate results   
  - No databases, tickerplantLogConfiguration, or savedownStorageConfiguration arguments
- execution role required, role is used when connecting to other clusters  


In [21]:
# does cluster already exist?
resp = get_kx_cluster(client, environmentId=ENV_ID, clusterName=GW_CLUSTER_NAME)

if resp is not None:
    print(f"Cluster: {GW_CLUSTER_NAME} already exists")
else:
    print(f"Creating: {GW_CLUSTER_NAME}")

    resp = client.create_kx_cluster(
        environmentId=ENV_ID, 
        clusterName=GW_CLUSTER_NAME,
        clusterType='GATEWAY',
        releaseLabel = '1.0',
        scalingGroupConfiguration={
            'memoryReservation': 6, # minimum
            'nodeCount': 1,
            'scalingGroupName': SCALING_GROUP_NAME,
        },
        clusterDescription="Created with create_all notebook",
        executionRole=EXECUTION_ROLE,
        code=CODE_CONFIG,
        initializationScript=GW_INIT_SCRIPT,
        commandLineArguments=GW_CMD_ARGS,
        azMode=AZ_MODE,
        availabilityZoneId=AZ_ID,
        vpcConfiguration=VPC_CONFIG
    )

Creating: GATEWAY_basictickdb


## Create Realtime Database (RDB) Cluster
The RDB will subscribe to the tickerplant and capture real time data published by the tickerplant (as published by the feedhandler).

Since the RDB clusters depend on the TP cluster, will check that its up before creating the RDBs.

### APIs used
[create_kx_cluster](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/finspace/client/create_kx_cluster.html) 

####  Notes
- **databases:**  must include database and view   
  - RDB will update the dbview of the database as part of end of day processing
- **savedownStorageConfiguration:** defines storage used   
  - End of day data is first saved to this location before updating the database 

In [22]:
# TP must be running before creating the RDBs
wait_for_cluster_status(client, environmentId=ENV_ID, clusterName=TP_CLUSTER_NAME, show_wait=True)
print("TP is running")

Cluster: TP_basictickdb status is CREATING, total wait 0:00:00, waiting 30 sec ...
Cluster: TP_basictickdb status is CREATING, total wait 0:00:30, waiting 30 sec ...
Cluster: TP_basictickdb status is CREATING, total wait 0:01:00, waiting 30 sec ...
Cluster: TP_basictickdb status is CREATING, total wait 0:01:30, waiting 30 sec ...
Cluster: TP_basictickdb status is CREATING, total wait 0:02:00, waiting 30 sec ...
Cluster: TP_basictickdb status is CREATING, total wait 0:02:30, waiting 30 sec ...
Cluster: TP_basictickdb status is CREATING, total wait 0:03:00, waiting 30 sec ...
Cluster: TP_basictickdb status is CREATING, total wait 0:03:30, waiting 30 sec ...
Cluster: TP_basictickdb status is CREATING, total wait 0:04:00, waiting 30 sec ...
Cluster: TP_basictickdb status is CREATING, total wait 0:04:30, waiting 30 sec ...
Cluster: TP_basictickdb status is CREATING, total wait 0:05:00, waiting 30 sec ...
Cluster: TP_basictickdb status is CREATING, total wait 0:05:30, waiting 30 sec ...
Clus

In [23]:
# does cluster already exist?
resp = get_kx_cluster(client, environmentId=ENV_ID, clusterName=RDB_CLUSTER_NAME)

if resp is not None:
    print(f"Cluster: {RDB_CLUSTER_NAME} already exists")
else:
    print(f"Creating: {RDB_CLUSTER_NAME}")

    resp = client.create_kx_cluster(
        environmentId=ENV_ID, 
        clusterName=RDB_CLUSTER_NAME,
        clusterType='RDB',
        releaseLabel = '1.0',
        executionRole=EXECUTION_ROLE,
        databases=[{ 'databaseName': DB_NAME }], #, 'dataviewName': DBVIEW_NAME }],
        scalingGroupConfiguration={
            'memoryReservation': 6, # minimum
            'nodeCount': 1,
            'scalingGroupName': SCALING_GROUP_NAME,
        },
        savedownStorageConfiguration ={ 'volumeName': VOLUME_NAME },
        clusterDescription="Created with create_all notebook",
        code=CODE_CONFIG,
        initializationScript=RDB_INIT_SCRIPT,
        commandLineArguments=RDB_CMD_ARGS,
        azMode=AZ_MODE,
        availabilityZoneId=AZ_ID,
        vpcConfiguration=VPC_CONFIG
    )

Creating: RDB_basictickdb


## Create Realtime Subscriber (RTS)
The RTS is similar to the RDB, and will subscribe to the tickerplant to capture and perform calculations on real time data such as maintaining a table of last trade price.

### APIs used
[create_kx_cluster](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/finspace/client/create_kx_cluster.html) 

#### Notes
- Connects to TP clusters, subscribesfor data and publishes its calculations   
  - No databases, tickerplantLogConfiguration, or savedownStorageConfiguration needed
- execution role required, role is used when connecting to TP cluster   


In [24]:
# does cluster already exist?
resp = get_kx_cluster(client, environmentId=ENV_ID, clusterName=RTS_CLUSTER_NAME)

if resp is not None:
    print(f"Cluster: {RTS_CLUSTER_NAME} already exists")
else:
    print(f"Creating: {RTS_CLUSTER_NAME}")

    resp = client.create_kx_cluster(
        environmentId=ENV_ID, 
        clusterName=RTS_CLUSTER_NAME,
        clusterType='RDB',
        releaseLabel = '1.0',
        executionRole=EXECUTION_ROLE,
        scalingGroupConfiguration={
            'memoryReservation': 6, # minimum
            'nodeCount': 1,
            'scalingGroupName': SCALING_GROUP_NAME,
        },
        clusterDescription="Created with create_all notebook",
        code=CODE_CONFIG,
        initializationScript=RTS_INIT_SCRIPT,
        commandLineArguments=RTS_CMD_ARGS,
        azMode=AZ_MODE,
        availabilityZoneId=AZ_ID,
        vpcConfiguration=VPC_CONFIG
    )

Creating: RTS_basictickdb


# List All Clusters
List all clusters, but first be sure all are in running state.

In [25]:
# Wait for all clusters be in running state
for c in all_clusters.values():
    wait_for_cluster_status(client, environmentId=ENV_ID, clusterName=c, show_wait=True)

print("** ALL CLUSTERS DONE **")

Cluster: TP_basictickdb status is now RUNNING, total wait 0:00:00
Cluster: RDB_basictickdb status is PENDING, total wait 0:00:00, waiting 30 sec ...
Cluster: RDB_basictickdb status is CREATING, total wait 0:00:30, waiting 30 sec ...
Cluster: RDB_basictickdb status is CREATING, total wait 0:01:00, waiting 30 sec ...
Cluster: RDB_basictickdb status is CREATING, total wait 0:01:30, waiting 30 sec ...
Cluster: RDB_basictickdb status is CREATING, total wait 0:02:00, waiting 30 sec ...
Cluster: RDB_basictickdb status is CREATING, total wait 0:02:30, waiting 30 sec ...
Cluster: RDB_basictickdb status is CREATING, total wait 0:03:00, waiting 30 sec ...
Cluster: RDB_basictickdb status is CREATING, total wait 0:03:30, waiting 30 sec ...
Cluster: RDB_basictickdb status is CREATING, total wait 0:04:00, waiting 30 sec ...
Cluster: RDB_basictickdb status is CREATING, total wait 0:04:30, waiting 30 sec ...
Cluster: RDB_basictickdb status is CREATING, total wait 0:05:00, waiting 30 sec ...
Cluster: RD

In [26]:
cdf = get_clusters(client, environmentId=ENV_ID)

if cdf is not None:
    # filter for clusters in this application
    cdf = cdf[cdf['clusterName'].isin(all_clusters.values())]

display(cdf)

Unnamed: 0,clusterName,status,clusterType,capacityConfiguration,commandLineArguments,clusterDescription,lastModifiedTimestamp,createdTimestamp,databaseName,cacheConfigurations
0,GATEWAY_basictickdb,RUNNING,GATEWAY,,"[{'key': 's', 'value': '2'}, {'key': 'g', 'value': '1'}, {'key': 'rdb_name', 'value': 'RDB_basictickdb'}, {'key': 'hdb_name', 'value': 'HDB_basictickdb'}]",Created with create_all notebook,2024-11-26 15:43:44.171000+00:00,2024-11-26 15:31:43.844000+00:00,,
1,HDB_basictickdb,RUNNING,HDB,,"[{'key': 's', 'value': '2'}, {'key': 'g', 'value': '1'}]",Created with create_all notebook,2024-11-26 15:43:45.385000+00:00,2024-11-26 15:31:41.602000+00:00,basictickdb,
3,RDB_basictickdb,RUNNING,RDB,,"[{'key': 's', 'value': '2'}, {'key': 'g', 'value': '1'}, {'key': 'tp', 'value': 'TP_basictickdb'}, {'key': 'procName', 'value': 'RDB_basictickdb'}, {'key': 'volumeName', 'value': 'RDB_TP_SHARED'}, {'key': 'hdbProc', 'value': 'HDB_basictickdb'}, {'key': 'dbView', 'value': 'basictickdb_DBVIEW'}, {'key': 'AWS_ZIP_DEFAULT', 'value': '17,2,6'}]",Created with create_all notebook,2024-11-26 15:48:56.069000+00:00,2024-11-26 15:38:50.796000+00:00,basictickdb,[]
4,RTS_basictickdb,RUNNING,RDB,,"[{'key': 's', 'value': '2'}, {'key': 'g', 'value': '1'}, {'key': 'tp', 'value': 'TP_basictickdb'}]",Created with create_all notebook,2024-11-26 15:49:56.778000+00:00,2024-11-26 15:38:53.241000+00:00,,
5,TP_basictickdb,RUNNING,TICKERPLANT,,"[{'key': 'procName', 'value': 'TP_basictickdb'}, {'key': 'volumeName', 'value': 'RDB_TP_SHARED'}, {'key': 'g', 'value': '1'}]",Created with create_all notebook,2024-11-26 15:38:46.921000+00:00,2024-11-26 15:25:02.489000+00:00,,


# Start FeedHandler
With all clusters running start a feedhandler to send data to the running tickerplant (TP).


## feedhandler_pykx.py
A PyKX based feedhandler is included in this example, to run the feedhandler outside a notebook:

```
$ python feedhandler_pykx.py -h
usage: feedhandler_pykx.py [-h] [-profile PROFILE] -env ENV -username USERNAME [-tick TICK] -tp_name TP_NAME [-debug]

options:
  -h, --help            show this help message and exit
  -profile PROFILE, -pr PROFILE
                        Profile to use for access
  -env ENV, -e ENV      environment ID
  -username USERNAME, -u USERNAME
                        kdb Username
  -tick TICK, -t TICK   Timer ticks (milliseconds)
  -tp_name TP_NAME, -tp TP_NAME
                        Tickerplant Cluster Name
  -debug, -d            Debugging output
```

### Environment Variables Used
Variables from env.py will be passed to the python feedhandler:   
- ENV_ID   
- KDB_USERNAME   

Variables from basictick_setup.py that will be passed to the python feedhandler:   
- TP_CLUSTER_NAME   

## Example Run
```
$ nohup python feedhandler_pykx.py -env jlcenjvtkgzrdek2qqv7ic -username bob -tp_name TP_basictickdb -tick 10000 -debug &> feedhandler.out &
```

## Stopping Feedhandler
```
$ pkill -f "python feedhandler_pykx.py"
```

In [27]:
cmd = f"nohup python feedhandler_pykx.py -env {ENV_ID} -username {KDB_USERNAME} -tp_name {TP_CLUSTER_NAME} -tick {FH_TICK} -debug &> feedhandler.out &"

print("To run the feedhandler, from a terminal run this:")
print(cmd)
#os.system(cmd) # this should work but doesn't

print()
print("To kill the feedhandler, from a terminal run this:")
print('pkill -f "python feedhandler_pykx.py"')

To run the feedhandler, from a terminal run this:
nohup python feedhandler_pykx.py -env jlcenjvtkgzrdek2qqv7ic -username sagemaker -tp_name TP_basictickdb -tick 10000 -debug &> feedhandler.out &

To kill the feedhandler, from a terminal run this:
pkill -f "python feedhandler_pykx.py"


In [62]:
# Echo the system command to run

cmd=f"nohup python feedhandler_pykx.py -env {ENV_ID} -username {KDB_USERNAME} -tp_name {TP_CLUSTER_NAME} -tick {FH_TICK} -debug &> feedhandler.out 2>&1 &"

print("Execute below in a terminal")
print()
print(cmd)

Execute below in a terminal

nohup python feedhandler_pykx.py -env jlcenjvtkgzrdek2qqv7ic -username sagemaker -tp_name TP_basictickdb -tick 10000 -debug &> feedhandler.out 2>&1 &


# All Processes Running
This completes the creation of this applciations resources, the application is ready to use.

## Next Steps
Try the [pykx_query_all](pykx_query_all.ipynb) notebook to query the contents of all the application clusters. You can see an example of measuring the latency of communications between clusters in the [pykx_sub_calc](pykx_sub_calc.ipynb) notebook. The [manual_eod](manual_eod.ipynb) notebook demonstrates how to conenct to and remotely call the end of day (EOD) function on the RDB that adds the day's data collected in the RDB as a new changeset to the managed database.

See the [debugging](debugging.ipynb) notebook for an example of creating/debugging functions on a remote cluster.

## Cleaning Up
To end the application and destroy all the resources created here, use the [delete_all](delete_all.ipynb) notebook.

In [63]:
print( f"Last Run: {datetime.datetime.now()}" )

Last Run: 2024-11-26 17:27:26.977428
