def write_to_kinesis()

in next_steps/kinesis_stream_connector/data_generator/src/synth_live_data_csv.py [0:0]


def write_to_kinesis(synthdata, stream_name):
    # generate KDS partition key randomly
    part_key = str(math.floor(random.random()*(10000000000)))
    records = []
    # structure records for KDS
    for row in synthdata:
        records.append({"PartitionKey": part_key, "Data": row + "\n"})

    # check if stream exists before writing records to it.
    try:
        logger.info('Checking if stream '+stream_name+' exists')
        resp = kinesis_client.describe_stream(StreamName=stream_name)
    except ClientError:
        logger.error('Error locating stream '+stream_name, exc_info=1)
        raise
    else:
        pass

    # write records to stream using batch put
    try:
        logger.info('Writing records to '+stream_name+' with partition key '+part_key)
        response = kinesis_client.put_records(StreamName=stream_name, Records=records)
    except ClientError:
        logger.error('Error writing records to '+stream_name, exc_info=1)
        raise 
    else:
        logger.debug('## Records written to Kinesis Data stream:')
        logger.debug(records)
        return (response)