kinesisresponder/management/kinesis_responder_basecommand.py (42 lines of code) (raw):
# coding: utf-8
from django.core.management.base import BaseCommand
from boto import kinesis, sts
from pprint import pprint
from time import sleep
import logging
import sys
logger = logging.getLogger(__name__)
class KinesisResponderBaseCommand(BaseCommand):
"""
Base class for a Django command to run the responder. Subclass this and:
- set stream_name, role_name and session_name attributes
- override startup_thread to provide an instance of your responder
"""
args = ''
help = 'runs the test kinesis responder'
stream_name = 'stream name to connect to'
role_name = 'ARN of role to use'
session_name = 'session_name'
def startup_thread(self, conn, shardinfo):
"""
Override this method to start up a processing thread. This is called once for every shard in the stream
:param conn: kinesis connection object
:param shardinfo: dictionary of information about the shard, returned from describe_stream
:return: a KinesisResponser subclass instance that will handle the messages for this shard
"""
raise RuntimeError("startup_thread must be implemented in your subclass!")
def handle(self, *args, **options):
if 'aws_access_key_id' in options and 'aws_secret_access_key' in options:
sts_conn = sts.connect_to_region('eu-west-1',
aws_access_key_id=options['aws_access_key_id'],
aws_secret_access_key=options['aws_secret_access_key'])
else:
sts_conn = sts.connect_to_region('eu-west-1')
credentials = sts_conn.assume_role(self.role_name, self.session_name)
conn = kinesis.connect_to_region('eu-west-1', aws_access_key_id=credentials.credentials.access_key,
aws_secret_access_key=credentials.credentials.secret_key,
security_token=credentials.credentials.session_token)
streaminfo = conn.describe_stream(self.stream_name)
threadlist = [self.startup_thread(credentials.credentials, shardinfo) for shardinfo in streaminfo['StreamDescription']['Shards']]
logger.info("Stream {0} has {1} shards".format(self.stream_name,len(threadlist)))
for t in threadlist:
t.daemon = True
t.start()
print("Started up and processing. Hit CTRL-C to stop.", flush=True)
#simplest way to allow ctrl-C when dealing with threads
try:
while True:
sleep(60)
for t in threadlist:
if not t.is_alive():
logger.error("A processing thread failed, exiting responder")
sys.exit(255)
except KeyboardInterrupt:
print("CTRL-C caught, cleaning up", flush=True)