2torial-stock-market/ingest.py (52 lines of code) (raw):
#-------------------------------------------
# Elasticsearch Imports
#-------------------------------------------
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_bulk
#-------------------------------------------
# Alpaca Imports
#-------------------------------------------
from alpaca_trade_api.stream import Stream
from alpaca_trade_api.common import URL
#-------------------------------------------
# General Imports
#-------------------------------------------
from functools import partial
import pandas as pd
import asyncio
import json
#-------------------------------------------
# Elastic Global Variables
#-------------------------------------------
CLOUD_USERNAME = "elastic"
CLOUD_PASSWORD = ""
CLOUD_ID = ""
#-------------------------------------------
# ALPACA Global Variables
#-------------------------------------------
API_KEY = ""
API_SECRET = ""
ALPACA_ENDPOINT = "https://data.sandbox.alpaca.markets"
#-------------------------------------------
# Define Index Mappings
#-------------------------------------------
TRADE_MAPPING = '{"properties":{"conditions":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"exchange":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"id":{"type":"long"},"price":{"type":"float"},"size":{"type":"long"},"symbol":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"tape":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"timestamp":{"type":"date"}}}'
QUOTE_MAPPING = '{"properties":{"ask_exchange":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"ask_price":{"type":"float"},"ask_size":{"type":"long"},"bid_exchange":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"bid_price":{"type":"float"},"bid_size":{"type":"long"},"conditions":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"symbol":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"tape":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"timestamp":{"type":"date"}}}'
#-------------------------------------------
# Parse Alpaca Data
#-------------------------------------------
def parse_data(data):
line = ''.join(str(data).split())
line = line[6:]
line = line[:-1]
line = line.replace("'",'"')
clean_data = json.loads(line)
clean_data['timestamp'] = pd.Timestamp(clean_data['timestamp'])
print('.', end='', flush=True)
return clean_data
#-------------------------------------------
# Async Elastic Functions
#-------------------------------------------
async def create_index(es, index_name, mapping):
if not await es.indices.exists(index=index_name):
await es.indices.create(index=index_name,mappings=json.loads(mapping))
async def ingest_data_callback(t, es, index_name):
parsed_data = parse_data(t)
await es.index(index=index_name, document=parsed_data)
#-------------------------------------------
# Main Function
#-------------------------------------------
async def main():
# Connect to Elasticsearch
es = AsyncElasticsearch(
cloud_id=CLOUD_ID,
basic_auth=(CLOUD_USERNAME, CLOUD_PASSWORD)
)
# Create Elasticsearch indices if they don't already exist
await create_index(es, "trade-index", TRADE_MAPPING)
await create_index(es, "quote-index", QUOTE_MAPPING)
# Connect to Alpaca
stream = Stream(API_KEY,
API_SECRET,
base_url=URL(ALPACA_ENDPOINT),
data_feed='iex') # <- replace with 'sip' if you have Alpaca PRO subscription
# Create partial functions in order to pass arguments into our callback functions
# See this article for more info: https://www.geeksforgeeks.org/partial-functions-python/
partial_ingest_trade_data_callback = partial(ingest_data_callback, es=es, index_name="trade-index")
partial_ingest_quote_data_callback = partial(ingest_data_callback, es=es, index_name="quote-index")
# Subscribe to Alpaca websocket streams using market symbols
#-- (Every time we receive a document from Alpaca, our
# callback functions will parse and ingest to Elasticsearch)
stream.subscribe_trades(partial_ingest_trade_data_callback, "ESTC")
stream.subscribe_quotes(partial_ingest_quote_data_callback, "ESTC")
# Start the data stream from Alpaca
await stream._run_forever()
#-------------------------------------------
# Run Main Function
#-------------------------------------------
try:
asyncio.run(main())
except KeyboardInterrupt:
print('keyboard interrupt, bye')
pass