def lambda_handler()

in lambdas/stream_station_review_to_s3/index.py [0:0]


def lambda_handler(event, context):
    record_count = 0
    for record in event['Records']:
      try:
        # Get the primary key of the station
        station_id = record['dynamodb']['Keys']['station_id']['N']
        if record['eventName'] != 'REMOVE':
          new_image = record['dynamodb']['NewImage']

          new_dt = datetime.strptime(new_image['create_date']['S'], '%Y-%m-%d %H:%M:%S.%f')
          new_image['create_date'] = {'N': int(new_dt.timestamp())}

          sentiment = comprehend_client.detect_sentiment(
            Text=new_image['review']['S']
            ,LanguageCode='en'
          )

          new_image['sentiment'] = {'S': sentiment['Sentiment']}
          new_image['sentiment_mixed'] = {'N': sentiment['SentimentScore']['Mixed']}
          new_image['sentiment_neutral'] = {'N': sentiment['SentimentScore']['Neutral']}
          new_image['sentiment_positive'] = {'N': sentiment['SentimentScore']['Positive']}
          new_image['sentiment_negative'] = {'N': sentiment['SentimentScore']['Negative']}

          deserializer = TypeDeserializer()
          payload = json.dumps({k: deserializer.deserialize(v) for k, v in new_image.items()}, cls=DecimalEncoder)
      
          response = client.put_record(
            DeliveryStreamName = FIREHOSE_STREAM_NAME,
            Record = { 'Data': payload + '\r\n' }
          )

          print('[DEBUG] Processed station_id: {}. Request response: {}'.format(station_id, payload))
          record_count += 1
      except Exception:
        pass # JSON encoder to Decimal sometimes throws exception with inexact rounding. Skip these. Fix later.

    print("[INFO] Processed {} records.".format(str(record_count)))