def apply()

in wayang-benchmark/code/main/scala/org/apache/wayang/apps/tpch/queries/Query3Database.scala [64:168]


  def apply(configuration: Configuration,
            jdbcPlatform: JdbcPlatformTemplate,
            createTableSource: (String, Seq[String]) => JdbcTableSource,
            segment: String = "BUILDING",
            date: String = "1995-03-15")
           (implicit experiment: Experiment) = {

    val wayangCtx = new WayangContext(configuration)
    plugins.foreach(wayangCtx.register)
    val planBuilder = new PlanBuilder(wayangCtx)
      .withJobName(s"TPC-H (${this.getClass.getSimpleName})")
      .withUdfJarsOf(classOf[Query3Database])
      .withExperiment(experiment)

    val schema = configuration.getOptionalStringProperty("wayang.apps.tpch.schema").orElse(null)
    def withSchema(table: String) = schema match {
      case null => table
      case str: String => s"$str.$table"
    }

    experiment.getSubject.addConfiguration("jdbcUrl", configuration.getStringProperty(jdbcPlatform.jdbcUrlProperty))
    if (schema != null) experiment.getSubject.addConfiguration("schema", schema)
    experiment.getSubject.addConfiguration("segment", segment)
    experiment.getSubject.addConfiguration("date", date)

    // Read, filter, and project the customer data.
    val _segment = segment
    val customerKeys = planBuilder
      .readTable(createTableSource(withSchema("CUSTOMER"), Customer.fields))
      .withName("Load CUSTOMER table")

      .filter(_.getString(6) == _segment, sqlUdf = s"c_mktsegment LIKE '$segment%'", selectivity = .25)
      .withName("Filter customers")

      .projectRecords(Seq("c_custkey"))
      .withName("Project customers")

      .map(_.getLong(0))
      .withName("Extract customer ID")

    // Read, filter, and project the order data.
    val _date = CsvUtils.parseDate(date)
    val orders = planBuilder
      .load(createTableSource(withSchema("ORDERS"), Order.fields))
      .withName("Load ORDERS table")

      .filter(t => CsvUtils.parseDate(t.getString(4)) > _date, sqlUdf = s"o_orderdate < date('$date')")
      .withName("Filter orders")

      .projectRecords(Seq("o_orderkey", "o_custkey", "o_orderdate", "o_shippriority"))
      .withName("Project orders")

      .map(order => (order.getLong(0), // orderKey
        order.getLong(1), // custKey
        CsvUtils.parseDate(order.getString(2)), // orderDate
        order.getInt(3)) // shipPriority
      )
      .withName("Unpack orders")

    // Read, filter, and project the line item data.
    val lineItems = planBuilder
      .readTable(createTableSource(withSchema("LINEITEM"), LineItem.fields))
      .withName("Load LINEITEM table")

      .filter(t => CsvUtils.parseDate(t.getString(10)) > _date, sqlUdf = s"l_shipDate > date('$date')")
      .withName("Filter line items")

      .projectRecords(Seq("l_orderkey", "l_extendedprice", "l_discount"))
      .withName("Project line items")

      .map(li => (
        li.getLong(0), //li.orderKey,
        li.getDouble(1) * (1 - li.getDouble(2)) //li.extendedPrice * (1 - li.discount)
      ))
      .withName("Extract line item data")

    // Join and aggregate the different datasets.
    customerKeys
      .join[(Long, Long, Int, Int), Long](identity, orders, _._2)
      .withName("Join customers with orders")

      .map(_.field1) // (orderKey, custKey, orderDate, shipPriority)
      .withName("Project customer-order join product")

      .join[(Long, Double), Long](_._1, lineItems, _._1)
      .withName("Join CO with line items")

      .map(coli => Query3Result(
        orderKey = coli.field1._1,
        revenue = coli.field1._2,
        orderDate = coli.field0._3,
        shipPriority = coli.field0._4
      ))
      .withName("Project CO-line-item join product")

      .reduceByKey(
        t => (t.orderKey, t.orderDate, t.shipPriority),
        (t1, t2) => {
          t1.revenue += t2.revenue;
          t2
        }
      )
      .withName("Aggregate revenue")
      .collect()
  }