def lambda_handler()

in lambda-functions/kfpLambdaStreamProducer.py [0:0]


def lambda_handler(event, context):
    cluster_arn = os.environ["mskClusterArn"]
    response = msk.get_bootstrap_brokers(
        ClusterArn=cluster_arn
    )
    producer = KafkaProducer(security_protocol="PLAINTEXT",
                             bootstrap_servers=response["BootstrapBrokerString"],
                             value_serializer=lambda x: x.encode("utf-8"))
    for _ in range(1, 100):
        data = json.dumps({
            "sensor_id": str(random.randint(1, 5)),
            "temperature": random.randint(27, 32),
            "event_time": datetime.datetime.now().isoformat()
        })
        producer.send(os.environ["topicName"], value=data)
        time.sleep(1)