Spark- 计算每个学科最受欢迎的老师

 

日志类型

复制代码
测试数据
http://bigdata.myit.com/zhangsan
http://bigdata.myit.com/zhangsan
http://bigdata.myit.com/zhangsan
http://bigdata.myit.com/zhangsan
http://bigdata.myit.com/zhangsan
http://java.myit.com/lisi
http://java.myit.com/lisi
http://java.myit.com/lisi
复制代码

 

复制代码
package mypro

import java.net.URL

import org.apache.log4j.{Level, Logger}
import org.apache..rdd.RDD
import org.apache..{SparkContext, SparkConf}

/**
 * Created by 166 on 2017/9/5.
 */
object FavTeacher {
  def main(args: Array[String]) {
    Logger.getLogger("org.apache.").setLevel(Level.OFF)
    val conf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[2]")//local[*]代表用多个线程跑,2代表用两个线程跑
    val sc = new SparkContext(conf)

    //读取数据
    val lines: RDD[String] = sc.textFile(args(0))
    //整理数据
    val subjectAndTeacher:RDD[(String,String)]=lines.map(line=> {
      val url = new URL(line)
      val host = url.getHost
      val subject = host.substring(0, host.indexOf("."))
      val teacher = url.getPath.substring(1)   //去掉路径前面的"/"
      (subject, teacher)
    })

    //聚合
    val reduce = subjectAndTeacher.map((_,1)).reduceByKey(_+_)
    //println(reduce.collect().toBuffer)

    //按学科分组
    val grouped: RDD[(String, Iterable[((String, String), Int)])] = reduce.groupBy(_._1._1)//迭代器不能排序,需要将它变成List。

    //二次排序
    val result: RDD[(String, List[((String, String), Int)])] = grouped.mapValues(_.toList.sortBy(_._2).reverse.take(2))//用scala的语法,会把数据全部加载到内存后再做排序,数据量大的时候会有性能问题,内存溢出的问题,不建议这样使用,
    val arr: Array[(String, List[((String, String), Int)])] = result.collect()
    println(arr.toBuffer)
    
  }
}
复制代码

另种角度来实现,过滤多次提交

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.rz..base
import java.net.URL
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
// 过滤多次提交
object GroupFavTeacher2 {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
    val sc = new SparkContext(conf)
    val topN = args(1).toInt
    val subject = Array("bigdata","javaee","php")
    // 读取数据
    val lines: RDD[String] = sc.textFile(args(0))
    // 整理数据  http://bigdata.myit.cn/laozhang
    val subjectAndTeacher= lines.map(line => {
      val url = new URL(line)
      val host = url.getHost
      val subject = host.substring(0, host.indexOf("."))
      val teacher = url.getPath.substring(1// 去掉前面的/
      ((subject, teacher),1)
    })
    // 聚合
    val reduced = subjectAndTeacher.reduceByKey(_+_)
  <br>  // 缓存到内存,因为多次过滤都是使用同一个rdd,缓存到内存可以提高反复使用的性能<br>  val cache = reduced.cache()<br>
    for (sb <- subject){
      val sorted = cache.filter(_._1._1 == sb).sortBy(_._2,false).take(topN)
      println(sorted.toBuffer)
    }
    sc.stop()
  }
}

使用自定义分区器将每个学科的数据shuffle到独自的分区,在分区内进行排序取topN

复制代码
package com.rz.spark.base

import java.net.URL

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

// 自定义分区器
import scala.collection.mutable
// 过滤多次提交
object GroupFavTeacher3 {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

    val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
    val sc = new SparkContext(conf)
    val topN = args(1).toInt

    val subject = Array("bigdata","javaee","php")
    // 读取数据
    val lines: RDD[String] = sc.textFile(args(0))
    // 整理数据  http://bigdata.myit.cn/laozhang
    val subjectAndTeacher= lines.map(line => {
      val url = new URL(line)
      val host = url.getHost
      val subject = host.substring(0, host.indexOf("."))
      val teacher = url.getPath.substring(1) // 去掉前面的/
      ((subject, teacher),1)
    })

    // 聚合
    val reduced = subjectAndTeacher.reduceByKey(_+_)

    // 计算我们有多少学科
    val sujects: Array[String] = reduced.map(_._1._1).distinct().collect()

    // 自定义一个分区器,并且按照指定的分区器进行分区
    val subjectPartitoner = new SubjectPartitoner(sujects)

    // partitionBy按照指定的分区规则进行分区
    val partitioned: RDD[((String, String), Int)] = reduced.partitionBy(subjectPartitoner)

    // 如果一次拿出一个分区(可以操作一个分区的数据)
    val sorted = partitioned.mapPartitions(it => {
      // 将迭代器转成List,然后排序,再转成迭代器返回
      it.toList.sortBy(_._2).reverse.take(topN).toIterator // 按数值排序
    })
    val result = sorted.collect()

    println(result.toBuffer)
    sc.stop()
  }

  // 自定义分区器
  class SubjectPartitoner(sbs: Array[String]) extends Partitioner{
    // 相当于主构造器(new 的时候会执行一次)
    // 用于存放规则的一个map
    val rules = new mutable.HashMap[String, Int]()
    var i = 0
    for (sb <- sbs){
      rules.put(sb,i)
      i += 1
    }

    // 返回分区的数量(下一个RDD有多少分区)
    override def numPartitions: Int = sbs.length

    // 根据传入的key计算分区标号
    // Key是一个无组(String, String)
    override def getPartition(key: Any): Int ={
      // 获取学科名称
      val subject = key.asInstanceOf[(String, String)]._1
      // 根据规则计算分区编号
      rules(subject)
    }
  }

}
复制代码

上面的方式会有多次shuffle,reduceByKey聚合数据的时候shuffle一次,使用自定义分区器重新对数据进行分析又shuffle了一次。我们可以尽可能的减少shuffle的过程,我们可以在reduceByKey的时候手动使用自定分区器进行分区,reduceByKey默认使用的是。HashPartitioner。

复制代码
package com.rz.spark.base

import java.net.URL

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

// 自定义分区器且减少shuffle
import scala.collection.mutable

// 过滤多次提交
object GroupFavTeacher4 {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

    val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
    val sc = new SparkContext(conf)
    val topN = args(1).toInt

    val subject = Array("bigdata","javaee","php")
    // 读取数据
    val lines: RDD[String] = sc.textFile(args(0))
    // 整理数据  http://bigdata.myit.cn/laozhang
    val subjectAndTeacher= lines.map(line => {
      val url = new URL(line)
      val host = url.getHost
      val subject = host.substring(0, host.indexOf("."))
      val teacher = url.getPath.substring(1) // 去掉前面的/
      ((subject, teacher),1)
    })

    // 计算我们有多少学科
    val sujects: Array[String] = subjectAndTeacher.map(_._1._1).distinct().collect()

    // 自定义一个分区器,并且按照指定的分区器进行分区
    val subjectPartitoner = new SubjectPartitoner2(sujects)

    // 聚合,聚合是按照指定的分区器进行分区
    // 该RDD一个分区内仅有一个学科的数据
    val reduced: RDD[((String, String), Int)] = subjectAndTeacher.reduceByKey(subjectPartitoner,_+_)


    // 如果一次拿出一个分区(可以操作一个分区的数据)
    val sorted = reduced.mapPartitions(it => {
      // 将迭代器转成List,然后排序,再转成迭代器返回
      it.toList.sortBy(_._2).reverse.take(topN).toIterator // 按数值排序
    })

    // 收集数据
    val result = sorted.collect()

    println(result.toBuffer)
    sc.stop()
  }

  // 自定义分区器
  class SubjectPartitoner2(sbs: Array[String]) extends Partitioner{
    // 相当于主构造器(new 的时候会执行一次)
    // 用于存放规则的一个map
    val rules = new mutable.HashMap[String, Int]()
    var i = 0
    for (sb <- sbs){
      rules.put(sb,i)
      i += 1
    }

    // 返回分区的数量(下一个RDD有多少分区)
    override def numPartitions: Int = sbs.length

    // 根据传入的key计算分区标号
    // Key是一个无组(String, String)
    override def getPartition(key: Any): Int ={
      // 获取学科名称
      val subject = key.asInstanceOf[(String, String)]._1
      // 根据规则计算分区编号
      rules(subject)
    }
  }

}
复制代码

 

 

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!
0 条回复 A 作者 M 管理员
    所有的伟大,都源于一个勇敢的开始!
欢迎您,新朋友,感谢参与互动!欢迎您 {{author}},您在本站有{{commentsCount}}条评论