in bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/etl/ETL.scala [64:104]
def parseRawData(rawRecords: RDD[String]):
RDD[(Store, Location, Customer, Location, TransactionProduct)] = {
val splitRecords = rawRecords.map { r =>
val cols = r.split(",")
val storeId = cols(0).toInt
val storeZipcode = cols(1)
val storeCity = cols(2)
val storeState = cols(3)
val storeLocation = Location(storeZipcode, storeCity, storeState)
val store = Store(storeId, storeZipcode)
val customerId = cols(4).toInt
val firstName = cols(5)
val lastName = cols(6)
val customerZipcode = cols(7)
val customerCity = cols(8)
val customerState = cols(9)
val customerLocation = Location(customerZipcode, customerCity,
customerState)
val customer = Customer(customerId, firstName, lastName,
customerZipcode)
val txId = cols(10).toInt
val df = new SimpleDateFormat("EEE MMM dd kk:mm:ss z yyyy", Locale.US)
val txDate = df.parse(cols(11))
val txCal = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"), Locale.US)
txCal.setTime(txDate)
txCal.set(Calendar.MILLISECOND, 0)
val txProduct = cols(12)
val transaction = TransactionProduct(customerId, txId,
storeId, txCal, txProduct)
(store, storeLocation, customer, customerLocation, transaction)
}
splitRecords
}