import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.types._ import org.apache.spark.sql.Row import org.apache.spark.rdd.RDD object PeopleDataStatistics2 { private val schemaString = "id,gender,height" def main(args: Array[String]) { if (args.length < 1) { println("Usage:PeopleDataStatistics2 filePath") System.exit(1) } val conf = new SparkConf().setAppName("Spark Exercise:People Data Statistics 2") val sc = new SparkContext(conf) val peopleDataRDD = sc.textFile(args(0)) val sqlCtx = new SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlCtx.implicits._ val schemaArray = schemaString.split(",") val schema = StructType(schemaArray.map(fieldName = > StructField(fieldName, StringType, true))) val rowRDD: RDD[Row] = peopleDataRDD.map(_.split(" ")).map( eachRow => Row(eachRow(0), eachRow(1), eachRow(2))) val peopleDF = sqlCtx.createDataFrame(rowRDD, schema) peopleDF.registerTempTable("people") //get the male people whose height is more than 180 val higherMale180 = sqlCtx.sql("select id,gender, height from people where height > 180 and gender='M'") println("Men whose height are more than 180: " + higherMale180.count()) println("<Display #1>") //get the female people whose height is more than 170 val higherFemale170 = sqlCtx.sql("select id,gender, height from people where height > 170 and gender='F'") println("Women whose height are more than 170: " + higherFemale170.count()) println("<Display #2>") //Grouped the people by gender and count the number peopleDF.groupBy(peopleDF("gender")).count().show() println("People Count Grouped By Gender") println("<Display #3>") // peopleDF.filter(peopleDF("gender").equalTo("M")).filter( peopleDF("height") > 210).show(50) println("Men whose height is more than 210") println("<Display #4>") // peopleDF.sort($"height".desc).take(50).foreach { row => println(row(0) + "," + row(1) + "," + row(2)) } println("Sorted the people by height in descend order,Show top 50 people") println("<Display #5>") // peopleDF.filter(peopleDF("gender").equalTo("M")).agg(Map("height" -> "avg")).show() println("The Average height for Men") println("<Display #6>") // peopleDF.filter(peopleDF("gender").equalTo("F")).agg("height" -> "max").show() println("The Max height for Women:") println("<Display #7>") //...... println("All the statistics actions are finished on structured People data.") } }
import java.io.{File, FileWriter} import scala.util.Random object UserDataGenerator { private val FILE_PATH = "C:\\LOCAL_DISK_D\\sample_user_data.txt" private val ROLE_ID_ARRAY = Array[String]("ROLE001","ROLE002","ROLE003","ROLE004","ROLE005") private val REGION_ID_ARRAY = Array[String]("REG001","REG002","REG003","REG004","REG005") private val MAX_USER_AGE = 60 //how many records to be generated private val MAX_RECORDS = 10000000 def main(args:Array[String]): Unit = { generateDataFile(FILE_PATH , MAX_RECORDS) }
private def generateDataFile(filePath : String, recordNum: Int): Unit = { var writer:FileWriter = null try { writer = new FileWriter(filePath,true) val rand = new Random() for (i <- 1 to recordNum) { //generate the gender of the user var gender = getRandomGender // var age = rand.nextInt(MAX_USER_AGE) if (age < 10) { age = age + 10 } //generate the registering date for the user var year = rand.nextInt(16) + 2000 var month = rand.nextInt(12)+1 //to avoid checking if it is a valid day for specific month //we always generate a day which is no more than 28 var day = rand.nextInt(28) + 1 var registerDate = year + "-" + month + "-" + day //generate the role of the user var roleIndex:Int = rand.nextInt(ROLE_ID_ARRAY.length) var role = ROLE_ID_ARRAY(roleIndex) //generate the region where the user is var regionIndex:Int = rand.nextInt(REGION_ID_ARRAY.length) var region = REGION_ID_ARRAY(regionIndex)
import java.io.{File, FileWriter} import scala.util.Random object ConsumingDataGenerator { private val FILE_PATH = "C:\\LOCAL_DISK_D\\sample_consuming_data.txt" // how many records to be generated private val MAX_RECORDS = 100000000 // we suppose only 10 kinds of products in the consuming data private val PRODUCT_ID_ARRAY = Array[Int](1,2,3,4,5,6,7,8,9,10) // we suppose the price of most expensive product will not exceed 2000 RMB private val MAX_PRICE = 2000 // we suppose the price of cheapest product will not be lower than 10 RMB private val MIN_PRICE = 10 //the users number which should be same as the one in UserDataGenerator object private val USERS_NUM = 10000000
def main(args:Array[String]): Unit = { generateDataFile(FILE_PATH,MAX_RECORDS); }
private def generateDataFile(filePath : String, recordNum: Int): Unit = { var writer:FileWriter = null try { writer = new FileWriter(filePath,true) val rand = new Random() for (i <- 1 to recordNum) { //generate the buying date var year = rand.nextInt(16) + 2000 var month = rand.nextInt(12)+1 //to avoid checking if it is a valid day for specific // month,we always generate a day which is no more than 28 var day = rand.nextInt(28) + 1 var recordDate = year + "-" + month + "-" + day //generate the product ID var index:Int = rand.nextInt(PRODUCT_ID_ARRAY.length) var productID = PRODUCT_ID_ARRAY(index) //generate the product price var price:Int = rand.nextInt(MAX_PRICE) if (price == 0) { price = MIN_PRICE } // which user buys this product val userID = rand.nextInt(10000000)+1 writer.write(i + " " + recordDate + " " + productID + " " + price + " " + userID) writer.write(System.getProperty("line.separator")) } writer.flush() } catch { case e:Exception => println("Error occurred:" + e) } finally { if (writer != null) writer.close() } println("Consuming Data File generated successfully.") } }
//define case class for user case class User(userID: String, gender: String, age: Int, registerDate: String,role: String, region: String) //define case class for consuming data case class Order(orderID: String, orderDate: String, productID: Int, price: Int, userID: String)
object UserConsumingDataStatistics { def main(args: Array[String]) { if (args.length < 1) { println("Usage:UserConsumingDataStatistics userDataFilePath consumingDataFilePath") System.exit(1) } val conf = new SparkConf().setAppName("Spark Exercise:User Consuming Data Statistics") //Kryo serializer is more quickly by default java serializer conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val ctx = new SparkContext(conf) val sqlCtx = new SQLContext(ctx) import sqlCtx.implicits._ //Convert user data RDD to a DataFrame and register it as a temp table val userDF = ctx.textFile(args(0)).map(_.split(" ")).map( u => User(u(0), u(1), u(2).toInt,u(3),u(4),u(5))).toDF() userDF.registerTempTable("user") //Convert consuming data RDD to a DataFrame and register it as a temp table val orderDF = ctx.textFile(args(1)).map(_.split(" ")).map(o => Order( o(0), o(1), o(2).toInt,o(3).toInt,o(4))).toDF() orderDF.registerTempTable("orders") //cache the DF in memory with serializer should make the program run much faster userDF.persist(StorageLevel.MEMORY_ONLY_SER) orderDF.persist(StorageLevel.MEMORY_ONLY_SER)
//The number of people who have orders in the year 2015 val count = orderDF.filter(orderDF("orderDate").contains("2015")).join( userDF, orderDF("userID").equalTo(userDF("userID"))).count() println("The number of people who have orders in the year 2015:" + count) //total orders produced in the year 2014 val countOfOrders2014 = sqlCtx.sql("SELECT * FROM orders where orderDate like '2014%'").count() println("total orders produced in the year 2014:" + countOfOrders2014) //Orders that are produced by user with ID 1 information overview val countOfOrdersForUser1 = sqlCtx.sql("SELECT o.orderID,o.productID, o.price,u.userID FROM orders o,user u where u.userID = 1 and u.userID = o.userID").show() println("Orders produced by user with ID 1 showed.") //Calculate the max,min,avg prices for the orders that are producted by user with ID 10 val orderStatsForUser10 = sqlCtx.sql("SELECT max(o.price) as maxPrice, min(o.price) as minPrice,avg(o.price) as avgPrice,u.userID FROM orders o, user u where u.userID = 10 and u.userID = o.userID group by u.userID") println("Order statistic result for user with ID 10:") orderStatsForUser10.collect().map(order => "Minimum Price=" + order.getAs("minPrice") + ";Maximum Price=" + order.getAs("maxPrice") + ";Average Price=" + order.getAs("avgPrice") ).foreach(result => println(result)) } }
Reprint policy:
All articles in this blog are used except for special statements
CC BY 4.0
reprint policy. If reproduced, please indicate source
John Doe
!