spark- PySparkSQL之PySpark解析Json集合数据

 

PySparkSQL之PySpark解析Json集合数据

数据样本

复制代码
12341234123412342|asefr-3423|[{"name":"","score":"65"},{"name":"airlow","score":"70"},{"name":"flume","score":"55"},{"name":"python","score":"33"},{"name":"scala","score":"44"},{"name":"java","score":"70"},{"name":"hdfs","score":"66"},{"name":"hbase","score":"77"},{"name":"qq","score":"70"},{"name":"sun","score":"88"},{"name":"mysql","score":"96"},{"name":"php","score":"88"},{"name":"hive","score":"97"},{"name":"oozie","score":"45"},{"name":"meizu","score":"70"},{"name":"hw","score":"32"},{"name":"sql","score":"75"},{"name":"r","score":"64"},{"name":"mr","score":"83"},{"name":"kafka","score":"64"},{"name":"mo","score":"75"},{"name":"apple","score":"70"},{"name":"jquery","score":"86"},{"name":"js","score":"95"},{"name":"pig","score":"70"}]
复制代码

正菜:

复制代码
#-*- coding:utf-8 –*-
from __future__ import print_function
from  import SparkContext
from .sql import SQLContext
from .sql.types import Row, StructField, StructType, StringType, IntegerType
import sys
reload(sys)
import 


if __name__ == "__main__":
    sc = SparkContext(appName="PythonSQL")
    sqlContext = SQLContext(sc)
    fileName = sys.argv[1]
    lines = sc.textFile(fileName)
    sc.setLogLevel("WARN")

    def parse_line(line):
        fields=line.split("|",-1)
        keyword=fields[2]
        return keyword

    def parse_(keyword):
        return keyword.replace("[","").replace("]","").replace("},{","}|{")

    keywordRDD = lines.map(parse_line)
    #print(keywordRDD.take(1))
    #print("---------------")

    jsonlistRDD = keywordRDD.map(parse_)
    #print(jsonlistRDD.take(1))

    jsonRDD = jsonlistRDD.flatMap(lambda jsonlist:jsonlist.split("|"))

    schema = StructType([StructField("name", StringType()),StructField("score", IntegerType())])
    df = sqlContext.read.schema(schema).(jsonRDD)
    # df.printSchema()
    # df.show()

    df.registerTempTable("")
    df_result = sqlContext.sql("SELECT name,score FROM  WHERE score > 70")
    df_result.coalesce(1).write.(sys.argv[2])

    sc.stop()
复制代码

提交作业

-submit .\demo2.py "C:\\Users\\txdyl\\Desktop\\test.txt" "c:\\users\\txdyl\\Desktop\\output"

数据结果

 

 

 

人已赞赏
博客

Spark- 常见问题

2019-8-18 18:42:26

博客

ERROR- 开发常见error

2019-8-18 18:45:31

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
有新消息 消息中心
搜索