输入输出转化工具类
package com.rz.mobile_tag.log
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
/**
* 访问日志转换(输入==>输出)工具类
*/
object AccessConvertUtil {
// 定义的输出字段
val structType = StructType(
Array(
StructField("url", StringType),
StructField("cmsType", StringType),
StructField("cmsId", LongType),
StructField("traffic", LongType),
StructField("ip", StringType),
StructField("city", StringType),
StructField("time", StringType),
StructField("day", StringType)
)
)
/**
* 根据输入的每一行信息转换成输出的样式
* @param log 输入的每一行记录信息
*/
def parseLog(log:String)={
try{
val splits = log.split("\t",-1)
val url = splits(1)
val traffic = splits(2).toLong
val ip = splits(3)
val domain = "http://www.rz.com/"
val cms = url.substring(url.indexOf(domain)+domain.length)
val cmsTypeId = cms.split("/")
var cmsType = ""
var cmsId = 0l
if (cmsTypeId.length>1){
cmsType = cmsTypeId(0)
cmsId = cmsTypeId(1).toLong
}
val city=""
val time = splits(0)
val day = time.substring(0, 10).replaceAll("-","")
// 这个Row里面的字段要和Struct中的字段对应上
Row(url, cmsType, cmsId, traffic, ip, city, time, day)
}catch {
case e:Exception =>{
Row(0)
}
}
}
}
读取数据,清洗输出目标数据
package com.rz.mobile_tag.log
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
* 使用Spark完成我们的数据清洗操作
*/
object SparkStatCleanJob {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName(s"${this.getClass.getSimpleName}")
.master("local[2]")
.getOrCreate()
val accessRDD = spark.sparkContext.textFile(args(0))
// debug查看数据
// accessRDD.take(10).foreach(println)
val accessDF = spark.createDataFrame(accessRDD.map(log =>AccessConvertUtil.parseLog(log)),AccessConvertUtil.structType)
// accessDF.printSchema()
// accessDF.show(false)
accessDF.coalesce(1).write.format("parquet").mode(SaveMode.Overwrite).partitionBy("day").save(args(1))
spark.stop()
}
}


















