Spark- SparkStreaming可更新状态的实例

 

Producer

复制代码
package zx.zx.sparkkafka


import java.util.Properties

import kafka.producer.{KeyedMessage, Producer, ProducerConfig}

import scala.util.Random

/**
 * Created by 166 on 2017/9/6.
 */
object Producer {
  val topic="myWordCount1"
  val buffer: StringBuilder = new StringBuilder
  val message: Array[String] = Array("hadoop","scala","","kafka","java","storm","redis","hello","world")
  def getMessage:String={
    buffer.clear()
    for(info<-0 to 10)
      {
        if(info!=10) buffer.append(message(Random.nextInt(message.length)).concat(" ")) else buffer.append(message(Random.nextInt(message.length)))
      }
    buffer.toString()
  }

  def main(args: Array[String]) {

    //properties用户保存一下配置信息的
    val properties= new Properties
    //添加配置信息:metadata.broker.list指定kafka的Borker的地址和端口,可以是多个Borker的地址
    properties.put("metadata.broker.list","192.168.1.88:9092,192.168.1.89:9092,192.168.1.90:9092")
    //数据写入到kafka中的使用序列化方式
    properties.put("serializer.class","kafka.serializer.StringEncoder")
    val producer= new Producer[String,String](new ProducerConfig(properties))
    for (i<-0 until Integer.MAX_VALUE){
      Thread.sleep(500)
      val message: KeyedMessage[String, String] = KeyedMessage[String,String](topic,"",null,getMessage)
      producer.send(message)
    }
  }
}
复制代码

SparkStreamingDemo

注意必须设置checkpoint

复制代码
package zx.zx.sparkkafka

import org.apache.log4j.{Level, Logger}
import org.apache..{HashPartitioner, SparkConf}
import org.apache..streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache..streaming.kafka.KafkaUtils
import org.apache..streaming.{Seconds, StreamingContext}

/**
 * Created by 166 on 2017/9/6.
 */


object SparkStreamingDemo {

  /**
   * Iterator[(String, Seq[Int], Option[Int])]
   * 第一个:key,单词
   * 第二个:当前批次该单词出现的次数
   * 第三个:初始值或者以前累加过的值
   */
  val updataFunc=(iter:Iterator[(String, Seq[Int], Option[Int])])=>{
      iter.map(t=>(t._1,t._2.sum+t._3.getOrElse(0)))
  }
  def main(args: Array[String]) {

    Logger.getLogger("org.apache.").setLevel(Level.OFF)
    //创建SparkConf并设置AppName
    val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[2]")
    //创建StreamingContext
    val ssc: StreamingContext = new StreamingContext(conf,Seconds(2))
    //设置检查点-----如果想要更新历史状态(累加),要设置checkpoint
//checkpoint必须设置,一般来说设置中HDFS
    ssc.checkpoint("C:\\Users\\166\\Desktop\\Data\\ck")


    //接受命令行中的参数
    //从kafka中拉取数据
    val zkQuorum="srv01:2181,srv02:2181,srv03:2181"
    val groupId="g1"//groupID=UUID.randomUUID().toString

    //当话题很多时就使用这个要切分---topics={t1,t2,t3}
    //val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val topic = Map("myWordCount1"->3)
    val topicAndLine: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,zkQuorum,groupId,topic)
    //(key,message)--->map(_._2)===>message
    val lines: DStream[String] = topicAndLine.map(_._2) //该数据可能是多行的
    //一行一行地取出来,切分数据
    //redis  scala hadoop hello scala java java hadoop scala world
    //(redis,1),(,1)
    val words: DStream[(String, Int)] = lines.map(_.split(" ")).flatMap(x=>x).map((_,1))//一行一行地取出来,切分数据
    //统计单词数量
    val result: DStream[(String, Int)] = words.updateStateByKey(updataFunc,new HashPartitioner(ssc.sparkContext.defaultParallelism),true)
    //将结果打印到控制台
    result.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
复制代码

 

 

人已赞赏
0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
有新消息 消息中心
搜索