def lambda_handler()

in lambda-functions/fdLambdaStreamProducer.py [0:0]


def lambda_handler(event, context):
    # Get bootstrap servers and Input Kafka topic
    Input_kafka_topic = os.environ["InputKafkaTopic"]
    cluster_arn = os.environ["mskClusterArn"]
    response = msk.get_bootstrap_brokers(ClusterArn=cluster_arn)

    # Initialize the producer
    producer = KafkaProducer(security_protocol="SSL",
                             bootstrap_servers=response["BootstrapBrokerStringTls"],
                             value_serializer=lambda x: json.dumps(x).encode("utf-8"),
                             retry_backoff_ms=500,
                             request_timeout_ms=20000)

    # Read Reference data, to be used to generate synthetic transactions

    refData = os.environ['LAMBDA_TASK_ROOT'] + "/ref_data.csv"


    # Store Reference data in array
    
    with open(refData) as csv_file:
       lines = [line for line in csv.reader(csv_file)]

    # Randomly select 35 rows
    
    chosen_rows = random.choices(lines, k=35)
    # Iteratee over the 35 chosen rows. For each row generate a transaction event
    # Use faker and random to generate:
        # 1- Transaction amount
        # 2- Currency - on of 'usd','gbp','egp','cad','eur']
        # 3- Event ID
        # 4- User Agent 
        # 5- Product category
    # Use datetime to capture timestamp

    for row in chosen_rows:
        data = {}
        data['transaction_amt'] = random.randint(5,2000)
        data['ip_address'] = row[10]
        data['email_address'] = row[11]
        data['transaction_currency'] = random.choice(['usd','gbp','egp','cad','eur'])
        data['event_id'] = faker.uuid4()
        data['entity_id'] = row[0]
        data['event_time'] = datetime.now().isoformat(sep='T')
        data['billing_longitude'] = row[8]
        data['billing_state'] = row[5]
        data['user_agent'] = faker.chrome()
        data['billing_street'] = row[3]
        data['billing_city'] = row[4]
        data['card_bin'] = row[1]
        data['customer_name'] = row[2]
        data['product_category'] = random.choice(['entertainment','food_dining','gas_transport','grocery_net','grocery_pos','health_fitness','home','kids_pets', 'misc_net', 'misc_pos', 'personal_care', 'travel', 'shopping_net', 'shopping_pos'])
        data['customer_job'] = row[9]
        data['phone'] = row[12]
        data['billing_latitude'] = row[7]
        data['billing_zip'] = row[6]
        print(data)
        try:
            # Send the generated transaction as kafka record to a kafka topic
            future = producer.send(Input_kafka_topic, value=data)
            producer.flush()
            record_metadata = future.get(timeout=10)
            # Log the output to cloudwatch
            print("sent event with TS {} to Kafka! topic {} partition {} offset {}".format(data['event_time'],record_metadata.topic, record_metadata.partition, record_metadata.offset))
        except Exception as e:
            # Catch exceptions
            print(e.with_traceback())
        # Sleep 1.5 second before sending the next record
        sleep(1.5)