# 3、JSON数据的处理

## 3.1 介绍 

**JSON数据**

- Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame

  Spark SQL能够自动将JSON数据集以结构化的形式加载为一个DataFrame

- This conversion can be done using SparkSession.read.json on a JSON file

  读取一个JSON文件可以用SparkSession.read.json方法

**从JSON到DataFrame**

- 指定DataFrame的schema

  1，通过反射自动推断，适合静态数据

  2，程序指定，适合程序运行中动态生成的数据

**加载json数据**

```python
#使用内部的schema
jsonDF = spark.read.json("xxx.json")
jsonDF = spark.read.format('json').load('xxx.json')

#指定schema
jsonDF = spark.read.schema(jsonSchema).json('xxx.json')
```

**嵌套结构的JSON**

- 重要的方法

  1，get_json_object

  2，get_json

  3，explode

## 3.2 实践

### 3.1 静态json数据的读取和操作

**无嵌套结构的json数据**

```python
from pyspark.sql import SparkSession
spark =  SparkSession.builder.appName('json_demo').getOrCreate()
sc = spark.sparkContext

# ==========================================
#                无嵌套结构的json
# ==========================================
jsonString = [
"""{ "id" : "01001", "city" : "AGAWAM",  "pop" : 15338, "state" : "MA" }""",
"""{ "id" : "01002", "city" : "CUSHMAN", "pop" : 36963, "state" : "MA" }"""
]
```

**从json字符串数组得到DataFrame**

```python
# 从json字符串数组得到rdd有两种方法
# 1. 转换为rdd，再从rdd到DataFrame
# 2. 直接利用spark.createDataFrame()，见后面例子

jsonRDD = sc.parallelize(jsonString)   # stringJSONRDD
jsonDF =  spark.read.json(jsonRDD)  # convert RDD into DataFrame
jsonDF.printSchema()
jsonDF.show()
```

**直接从文件生成DataFrame**

```python
# -- 直接从文件生成DataFrame
#只有被压缩后的json文件内容，才能被spark-sql正确读取，否则格式化后的数据读取会出现问题
jsonDF = spark.read.json("xxx.json")
# or
# jsonDF = spark.read.format('json').load('xxx.json')

jsonDF.printSchema()
jsonDF.show()

jsonDF.filter(jsonDF.pop>4000).show(10)
#依照已有的DataFrame，创建一个临时的表(相当于mysql数据库中的一个表)，这样就可以用纯sql语句进行数据操作
jsonDF.createOrReplaceTempView("tmp_table")

resultDF = spark.sql("select * from tmp_table where pop>4000")
resultDF.show(10)
```

### 3.2 动态json数据的读取和操作

**指定DataFrame的Schema**

3.1节中的例子为通过反射自动推断schema，适合静态数据

下面我们来讲解如何进行程序指定schema

**没有嵌套结构的json**

```python
jsonString = [
"""{ "id" : "01001", "city" : "AGAWAM",  "pop" : 15338, "state" : "MA" }""",
"""{ "id" : "01002", "city" : "CUSHMAN", "pop" : 36963, "state" : "MA" }"""
]

jsonRDD = sc.parallelize(jsonString)

from pyspark.sql.types import *

#定义结构类型
#StructType：schema的整体结构，表示JSON的对象结构
#XXXStype:指的是某一列的数据类型
jsonSchema = StructType() \
  .add("id", StringType(),True) \
  .add("city", StringType()) \
  .add("pop" , LongType()) \
  .add("state",StringType())

jsonSchema = StructType() \
  .add("id", LongType(),True) \
  .add("city", StringType()) \
  .add("pop" , DoubleType()) \
  .add("state",StringType())

reader = spark.read.schema(jsonSchema)

jsonDF = reader.json(jsonRDD)
jsonDF.printSchema()
jsonDF.show()
```

**带有嵌套结构的json**

```python
from pyspark.sql.types import *
jsonSchema = StructType([
    StructField("id", StringType(), True),
    StructField("city", StringType(), True),
    StructField("loc" , ArrayType(DoubleType())),
    StructField("pop", LongType(), True),
    StructField("state", StringType(), True)
])

reader = spark.read.schema(jsonSchema)
jsonDF = reader.json('data/nest.json')
jsonDF.printSchema()
jsonDF.show(2)
jsonDF.filter(jsonDF.pop>4000).show(10)
```

### 3.3 复杂结构的处理

```python
eventArray = [(0, """{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 }"""),

 (1, """{"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 }"""),

 (2, """{"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 }""")]

from pyspark.sql.functions import *
from pyspark.sql.types import *

eventJSONDF = spark.createDataFrame(eventArray,('key','json'))

'''
get_json_object：
spark 从1.6开始提供了get_json_object方法，可以把一个json字符串解析成json对象，进而提取其中的字段数据，但这个方法每次提取一个字段，
如果有多个字段要提取，就需要写多个get_json_object方法，如下：
'''
eventDF = eventJSONDF \
    .select(eventJSONDF.key,
            get_json_object(eventJSONDF.json, '$.device_id').alias("device_id"),
            get_json_object(eventJSONDF.json, '$.device_type').alias("device_type"),
            get_json_object(eventJSONDF.json, '$.ip').alias("ip")
            )

eventDF.select(["device_id","ip"]).show()
eventDF.filter(eventDF.key>1).show()

'''
from_json可以看作是get_json_object的孪生兄弟，它是利用Schema来提取信息的。
使用from_json，我们就避免多次写get_json_object了，而是构造了一个json对象作为DataFrame中的一列，还可以把整个json对象当作一个整体，然后用*这种方式使用。
这个方法在2.0.2中没有，从2.1开始出现
'''

jsonSchema = StructType() \
    .add("device_type", StringType()) \
    .add("device_id", LongType()) \
    .add("ip", StringType()) \
    .add("cca3",StringType()) \
    .add("cn", StringType()) \
    .add("temp", LongType()) \
    .add("signal", LongType()) \
    .add("battery_level", LongType()) \
    .add("c02_level", LongType()) \
    .add("timestamp", TimestampType())


eventJSONDF = spark.createDataFrame(eventArray,('key','json'))
eventDF = eventJSONDF \
    .select(eventJSONDF.key,
            from_json(eventJSONDF.json,jsonSchema).alias('device')
            )
eventDF.select("device.*").filter("device.temp > 10 and device.signal > 15")
```

