PluginsAndFeatures/azure-toolkit-for-eclipse/com.microsoft.azuretools.hdinsight/resources/hdinsight/templates/scala/LogQuery.scala (49 lines of code) (raw):

/** * Copyright (c) Microsoft Corporation * * All rights reserved. * * MIT License * * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files * (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, * publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, * subject to the following conditions: * * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR * ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH * THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ //package org.apache.spark.examples import org.apache.spark.{SparkConf, SparkContext} //import com.example.hello._ import org.apache.spark.SparkContext._ /** * Executes a roll up-style query against Apache logs. * * This is adapted from Apache Spark GitHub: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala */ object LogQuery { val exampleApacheLogs = List( """10.10.10.10 - "FRED" [18/Jan/2013:17:56:07 +1100] "GET http://images.com/2013/Generic.jpg | HTTP/1.1" 304 315 "http://referall.com/" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; | GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR | 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR | 3.5.30729; Release=ARP)" "UD-1" - "image/jpeg" "whatever" 0.350 "-" - "" 265 923 934 "" | 62.24.11.25 images.com 1358492167 - Whatup""".stripMargin.lines.mkString, """10.10.10.10 - "FRED" [18/Jan/2013:18:02:37 +1100] "GET http://images.com/2013/Generic.jpg | HTTP/1.1" 304 306 "http:/referall.com" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; | GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR | 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR | 3.5.30729; Release=ARP)" "UD-1" - "image/jpeg" "whatever" 0.352 "-" - "" 256 977 988 "" | 0 73.23.2.15 images.com 1358492557 - Whatup""".stripMargin.lines.mkString ) def main(args: Array[String]) { //var sparkconf2 = new SparkConf().setAppName() sparkConf().setAppName("Log 1"); //val sc=new sparkcontext() val sparkconf = new SparkConf().setAppName("Log Query").setMaster("local[2]") val sc = new SparkContext(sparkconf) val dataSet = sc.parallelize(exampleApacheLogs) // scalastyle:off val apacheLogRegex = """^([\d.]+) (\S+) (\S+) \[([\w\d:/]+\s[+\-]\d{4})\] "(.+?)" (\d{3}) ([\d\-]+) "([^"]+)" "([^"]+)".*""".r // scalastyle:on /** Tracks the total query count and number of aggregate bytes for a particular group. */ class Stats(val count: Int, val numBytes: Int) extends Serializable { def merge(other: Stats): Stats = new Stats(count + other.count, numBytes + other.numBytes) override def toString: String = "bytes=%s\tn=%s".format(numBytes, count) } def extractKey(line: String): (String, String, String) = { apacheLogRegex.findFirstIn(line) match { case Some(apacheLogRegex(ip, _, user, dateTime, query, status, bytes, referer, ua)) => if (user != "\"-\"") (ip, user, query) else (null, null, null) case _ => (null, null, null) } } def extractStats(line: String): Stats = { apacheLogRegex.findFirstIn(line) match { case Some(apacheLogRegex(ip, _, user, dateTime, query, status, bytes, referer, ua)) => new Stats(1, bytes.toInt) case _ => new Stats(1, 0) } } //println(Point(1,2)) //testcall(); dataSet.map(line => (extractKey(line), extractStats(line))) .reduceByKey((a, b) => a.merge(b)) .collect().foreach{ case (user, query) => println("%s\t%s".format(user, query))} sc.stop() } }