日志生成
package zx.Utils
import java.io.{File, FileWriter}
import java.util.Calendar
import org.apache.commons.lang.time.{DateUtils, FastDateFormat}
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
/**
* Created by 166 on 2017/9/6.
*/
case class FlowLog(time:String,ip:String,upFlow:Long,downFlow:Long) extends Serializable{
override def toString: String = {
s"$time\t$ip\t$upFlow\t$downFlow"
}
}
object CreateLog {
val ip_buffer: StringBuilder = new StringBuilder
private val fs: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
var startTime:String="2015-1-12 12:12:12"
val instance: Calendar = Calendar.getInstance
val ipPool:ArrayBuffer[String]=getIp //ipPool 取得20个ip
//取得20个ip地址
private [this] def getIp:ArrayBuffer[String]={
val arrayBuffer: ArrayBuffer[String] = ArrayBuffer()
ip_buffer.clear()
for(i<-0 to 20){
ip_buffer.append(Random.nextInt(255)).append(".")
.append(Random.nextInt(255)).append(".")
.append(Random.nextInt(255)).append(".")
.append(Random.nextInt(255))
arrayBuffer+=ip_buffer.toString()
ip_buffer.clear()
}
arrayBuffer
}
def getTime:String={
instance.setTime(DateUtils.parseDate(startTime,Array("yyyy-MM-dd HH:mm:ss")))
instance.add(Calendar.MINUTE,Random.nextInt(200))
val newTime: String = fs.format(instance.getTime)
startTime=newTime
newTime
}
def getFlow:Long={
Random.nextInt(800)
}
//从ip地址池中取出一个ip
def getIP:String={
ipPool(Random.nextInt(ipPool.size))
}
//把日志写入文件
def write2file(fr:FileWriter,context:String)={
fr.write(context)
fr.write(System.lineSeparator())
fr.flush()
"SUCCESS"
}
def main(args: Array[String]) {
val file: File = new File("C:\\Users\\166\\Desktop\\Data\\Log","click_flow.log")
if(file.exists()){
file.delete()
val fw: FileWriter = new FileWriter(file)
for(i<-0 to 10000)println(write2file(fw,FlowLog(getTime,getIP,getFlow,getFlow).toString))
fw.close()
}else{
val fw: FileWriter = new FileWriter(file)
for(i<-0 to 10000)println(write2file(fw,FlowLog(getTime,getIP,getFlow,getFlow).toString))
fw.close()
}
}
}
算出每个用户的上行流量总和 和下行流量的总和
package zx.sparkStream
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**需求:算出每个用户的上行流量总和 和下行流量的总和
* Created by rz on 2017/9/6.
*/
case class ResultTuple()
case class ClickFlow(remoteUser:String,tupleFlow:(Long,Long))
object SparkOffLine {
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc: SparkContext = new SparkContext(new SparkConf().setAppName("SparkOffLine").setMaster("local[*]"))
val rdd: RDD[String] = sc.textFile("C:\\Users\\166\\Desktop\\Data\\Log\\click_flow.log")
val rdd1:RDD[(String,ClickFlow)]=rdd.map(data=>{
val datas:Array[String]= data.split("\t")
(datas(1),ClickFlow(datas(1),(datas(2).toLong,datas(3).toLong)))
})
val rdd2:RDD[(String,ClickFlow)]=rdd1.reduceByKey((x,y)=>{
val x_upFlow: Long = x.tupleFlow._1
val y_upFlow: Long = y.tupleFlow._1
val x_dowmFlow: Long = x.tupleFlow._2
val y_downFlow: Long = y.tupleFlow._2
ClickFlow(x.remoteUser,(x_upFlow+y_upFlow,x_dowmFlow+y_downFlow))
})
println(rdd2.collect().toBuffer)
}
}


















