in bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/generator/SparkDriver.scala [119:197]
def generateData(sc: SparkContext): RDD[Transaction] = {
val inputData = new DataLoader().loadData()
val seedFactory = new SeedFactory(seed)
println("Generating stores...")
val stores : ArrayList[Store] = new ArrayList()
val storeGenerator = new StoreGenerator(inputData, seedFactory)
for(i <- 1 to nStores) {
val store = storeGenerator.generate()
stores.add(store)
}
println("Done.")
println("Generating customers...")
var customers: List[Customer] = List()
val custGen = new CustGen(inputData, stores, seedFactory)
for(i <- 1 to nCustomers) {
val customer = custGen.generate()
customers = customer :: customers
}
println("...Done generating customers.")
println("Broadcasting stores and products...")
val storesBC = sc.broadcast(stores)
val productBC = sc.broadcast(inputData.getProductCategories())
val customerRDD = sc.parallelize(customers)
val simLen = simulationLength
val nextSeed = seedFactory.getNextSeed()
println("...Done broadcasting stores and products.")
println("Defining transaction DAG...")
/**
* See inline comments below regarding how we
* generate TRANSACTION objects from CUSTOMERs.
*/
val transactionRDD = customerRDD.mapPartitionsWithIndex{
(index, custIter) =>
// Create a new RNG
val seedFactory = new SeedFactory(nextSeed ^ index)
val transactionIter = custIter.map{
customer =>
val products = productBC.value
//Create a new purchasing profile.
val profileGen = new PurchasingProfileGenerator(products, seedFactory)
val profile = profileGen.generate()
val transGen = new TransactionGenerator(customer, profile, storesBC.value, products, seedFactory)
var transactions : List[Transaction] = List()
var transaction = transGen.generate()
//Create a list of this customer's transactions for the time period
while(transaction.getDateTime() < simLen) {
if (transaction.getDateTime > BURNIN_TIME) {
transactions = transaction :: transactions
}
transaction = transGen.generate()
}
//The final result, we return the list of transactions produced above.
transactions
}
transactionIter
}.flatMap(s => s)
println("...Done defining transaction DAG.")
println("Generating transactions...")
// forces RDD materialization.
val nTrans = transactionRDD.count()
println(s"... Done Generating $nTrans transactions.")
/**
* Return the RDD representing all the petstore transactions.
* This RDD contains a distributed collection of instances where
* a customer went to a pet store, and bought a variable number of items.
* We can then serialize all the contents to disk.
*/
transactionRDD
}