in bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/etl/ETL.scala [106:177]
def normalizeData(rawRecords: RDD[(Store, Location, Customer,
Location, TransactionProduct)]): (RDD[Location], RDD[Store],
RDD[Customer], RDD[Product], RDD[Transaction]) = {
// extract stores
val storeRDD = rawRecords.map {
case (store, _, _, _, _) =>
store
}.distinct()
// extract store locations
val storeLocationRDD = rawRecords.map {
case (_, location, _, _, _) =>
location
}.distinct()
// extract customers
val customerRDD = rawRecords.map {
case (_, _, customer, _, _) =>
customer
}.distinct()
// extract customer locations
val customerLocationRDD = rawRecords.map {
case (_, _, _, location, _) =>
location
}.distinct()
// extract and normalize products
val productStringRDD = rawRecords.map {
case (_, _, _, _, tx) =>
tx.product
}
.distinct()
.zipWithUniqueId()
val productRDD = productStringRDD.map {
case (productString, id) =>
// products are key-value pairs of the form:
// key=value;key=value;
val prodKV = productString
.split(";")
.filter(_.trim().length > 0)
.map { pair =>
val pairString = pair.split("=")
(pairString(0), pairString(1))
}
.toMap
Product(id, prodKV("category"), prodKV)
}
// extract transactions, map products to productIds
val productTransactionRDD = rawRecords.map {
case (_, _, _, _, tx) =>
(tx.product, tx)
}
val joinedRDD: RDD[(String, (TransactionProduct, Long))]
= productTransactionRDD.join(productStringRDD)
val transactionRDD = joinedRDD.map {
case (productString, (tx, productId)) =>
Transaction(tx.customerId, tx.transactionId,
tx.storeId, tx.dateTime, productId)
}
val locationRDD = storeLocationRDD.
union(customerLocationRDD).
distinct()
(locationRDD, storeRDD, customerRDD, productRDD, transactionRDD)
}