in spark-on-eks/deployment/app_code/job/msk_consumer.py [0:0]
def parse_data_from_kafka_message(sdf, schema):
assert sdf.isStreaming == True, "DataFrame doesn't receive streaming data"
col = split(sdf['value'], ',') #split attributes to nested array in one Column
#now expand col to multiple top-level columns
for idx, field in enumerate(schema):
sdf = sdf.withColumn(field.name, col.getItem(idx).cast(field.dataType))
if field.name=="timestamp":
sdf = sdf.withColumn(field.name, current_timestamp())
return sdf.select([field.name for field in schema])