def parse_data_from_kafka_message()

in spark-on-eks/deployment/app_code/job/msk_consumer.py [0:0]


def parse_data_from_kafka_message(sdf, schema):
  assert sdf.isStreaming == True, "DataFrame doesn't receive streaming data"
  col = split(sdf['value'], ',') #split attributes to nested array in one Column
  #now expand col to multiple top-level columns
  for idx, field in enumerate(schema): 
      sdf = sdf.withColumn(field.name, col.getItem(idx).cast(field.dataType)) 
      if field.name=="timestamp":
          sdf = sdf.withColumn(field.name, current_timestamp())
  return sdf.select([field.name for field in schema])