in next_steps/kinesis_stream_connector/data_generator/src/synth_live_data_csv.py [0:0]
def write_to_kinesis(synthdata, stream_name):
# generate KDS partition key randomly
part_key = str(math.floor(random.random()*(10000000000)))
records = []
# structure records for KDS
for row in synthdata:
records.append({"PartitionKey": part_key, "Data": row + "\n"})
# check if stream exists before writing records to it.
try:
logger.info('Checking if stream '+stream_name+' exists')
resp = kinesis_client.describe_stream(StreamName=stream_name)
except ClientError:
logger.error('Error locating stream '+stream_name, exc_info=1)
raise
else:
pass
# write records to stream using batch put
try:
logger.info('Writing records to '+stream_name+' with partition key '+part_key)
response = kinesis_client.put_records(StreamName=stream_name, Records=records)
except ClientError:
logger.error('Error writing records to '+stream_name, exc_info=1)
raise
else:
logger.debug('## Records written to Kinesis Data stream:')
logger.debug(records)
return (response)