def normalizeData()

in bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/etl/ETL.scala [106:177]


  def normalizeData(rawRecords: RDD[(Store, Location, Customer,
    Location, TransactionProduct)]): (RDD[Location], RDD[Store],
      RDD[Customer], RDD[Product], RDD[Transaction]) = {
    // extract stores
    val storeRDD = rawRecords.map {
      case (store, _, _, _, _) =>
        store
    }.distinct()

    // extract store locations
    val storeLocationRDD = rawRecords.map {
      case (_, location, _, _, _) =>
        location
    }.distinct()

    // extract customers
    val customerRDD = rawRecords.map {
      case (_, _, customer, _, _) =>
        customer
    }.distinct()

    // extract customer locations
    val customerLocationRDD = rawRecords.map {
      case (_, _, _, location, _) =>
        location
    }.distinct()

    // extract and normalize products
    val productStringRDD = rawRecords.map {
      case (_, _, _, _, tx) =>
        tx.product
    }
    .distinct()
    .zipWithUniqueId()

    val productRDD = productStringRDD.map {
      case (productString, id) =>
        // products are key-value pairs of the form:
        // key=value;key=value;
        val prodKV = productString
          .split(";")
          .filter(_.trim().length > 0)
          .map { pair =>
            val pairString = pair.split("=")
            (pairString(0), pairString(1))
           }
          .toMap

        Product(id, prodKV("category"), prodKV)
    }

    // extract transactions, map products to productIds
    val productTransactionRDD = rawRecords.map {
      case (_, _, _, _, tx) =>
       (tx.product, tx)
    }

    val joinedRDD: RDD[(String, (TransactionProduct, Long))]
      = productTransactionRDD.join(productStringRDD)

    val transactionRDD = joinedRDD.map {
      case (productString, (tx, productId)) =>
        Transaction(tx.customerId, tx.transactionId,
          tx.storeId, tx.dateTime, productId)
    }

    val locationRDD = storeLocationRDD.
      union(customerLocationRDD).
      distinct()

    (locationRDD, storeRDD, customerRDD, productRDD, transactionRDD)
  }