def generateData()

in bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/generator/SparkDriver.scala [119:197]


  def generateData(sc: SparkContext): RDD[Transaction] = {
    val inputData = new DataLoader().loadData()
    val seedFactory = new SeedFactory(seed)

    println("Generating stores...")
    val stores : ArrayList[Store] = new ArrayList()
    val storeGenerator = new StoreGenerator(inputData, seedFactory)
    for(i <- 1 to nStores) {
      val store = storeGenerator.generate()
      stores.add(store)
    }
    println("Done.")

    println("Generating customers...")
    var customers: List[Customer] = List()
    val custGen = new CustGen(inputData, stores, seedFactory)
    for(i <- 1 to nCustomers) {
      val customer = custGen.generate()
      customers = customer :: customers
    }
    println("...Done generating customers.")

    println("Broadcasting stores and products...")
    val storesBC = sc.broadcast(stores)
    val productBC = sc.broadcast(inputData.getProductCategories())
    val customerRDD = sc.parallelize(customers)
    val simLen = simulationLength
    val nextSeed = seedFactory.getNextSeed()
    println("...Done broadcasting stores and products.")

    println("Defining transaction DAG...")

    /**
     *  See inline comments below regarding how we
     *  generate TRANSACTION objects from CUSTOMERs.
     */
    val transactionRDD = customerRDD.mapPartitionsWithIndex{
      (index, custIter) =>
        // Create a new RNG
        val seedFactory = new SeedFactory(nextSeed ^ index)
        val transactionIter = custIter.map{
        customer =>
	  val products = productBC.value
          //Create a new purchasing profile.
          val profileGen = new PurchasingProfileGenerator(products, seedFactory)
          val profile = profileGen.generate()
          val transGen = new TransactionGenerator(customer, profile, storesBC.value, products, seedFactory)
          var transactions : List[Transaction] = List()
	  var transaction = transGen.generate()

          //Create a list of this customer's transactions for the time period
          while(transaction.getDateTime() < simLen) {
            if (transaction.getDateTime > BURNIN_TIME) {
              transactions = transaction :: transactions
            }
            transaction = transGen.generate()
          }
          //The final result, we return the list of transactions produced above.
	    transactions
        }
      transactionIter
    }.flatMap(s => s)

    println("...Done defining transaction DAG.")

    println("Generating transactions...")

    // forces RDD materialization.
    val nTrans = transactionRDD.count()
    println(s"... Done Generating $nTrans transactions.")

    /**
     *  Return the RDD representing all the petstore transactions.
     *  This RDD contains a distributed collection of instances where
     *  a customer went to a pet store, and bought a variable number of items.
     *  We can then serialize all the contents to disk.
     */
    transactionRDD
  }