in projects/ContextRouting/Lambda/Python/ctrProcessor.py [0:0]
def lambda_handler(event, context):
# Create the response container and the tracker container
response = {}
tracker_item = {}
# Setup stats for logging
ctrs_skipped = 0
ctrs_processed = 0
# Process the stream event(s)
for record in event['Records']:
# Decode the data and parse the json
try:
payload = base64.b64decode(record['kinesis']['data'])
payload_data = json.loads(payload)
print(payload_data)
# Log a decode failure and break out of loop
except:
response.update({'fail_code':'decode data fail'})
response.update({'result':'fail'})
break
# Extract the tracker flag, if it exists
try:
active_task_tracker = payload_data['Attributes']['active_task_tracker']
# Log missing tracker and continue
except:
print('No tracker')
ctrs_skipped = ctrs_skipped + 1
continue
# Establish DynamoDB client and table
try:
# Define DB connection and table using the environment variables
dynamodb_client = boto3.resource('dynamodb')
dynamodb_table = dynamodb_client.Table(os.environ['tracker_table'])
# Update the response on fail
except:
response.update({'fail_code':'dynamo init fail'})
response.update({'result':'fail'})
# Check if tracker was disabled. If so, do a quick update to Dynamo to delete the tracker.
if active_task_tracker == '0':
# Increment ctrs_skipped
delete_tracker = dynamodb_table.delete_item(
Key={'caller_id':payload_data['CustomerEndpoint']['Address']}
)
continue
else:
# Pull the attributes and add them to the tracker item
try:
ctr_attributes = payload_data['Attributes']
for k,v in ctr_attributes.items():
tracker_item.update({k : v})
# Log a attribute extraction fail and stop processing
except:
response.update({'fail_code':'Attribute extraction fail'})
response.update({'result':'fail'})
continue
# Update the tracker item with the customer phone number
tracker_item.update({'caller_id':payload_data['CustomerEndpoint']['Address']})
# Define the TTL for the tracker using the environment variable
currEpochTime = round(time.time())
item_ttl = currEpochTime+int(os.environ['tracker_ttl'])
tracker_item.update({'item_ttl':item_ttl})
# Write the tracker item to DynamoDB
try:
tracker_write = dynamodb_table.put_item(
Item = tracker_item
)
response.update({'tracking': 'true'})
# Log a fail to the response
except:
response.update({'tracking': 'false'})
# retuen the response back for this item
return response
# Update the response with stats
response.update({'ctrs_processed':ctrs_processed})
response.update({'ctrs_skipped':ctrs_skipped})
# Print the response for logging & return
print(response)
return response