# 3、JSON数据的处理

## 3.1 介绍 

**JSON数据**

- Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame

  Spark SQL能够自动将JSON数据集以结构化的形式加载为一个DataFrame

- This conversion can be done using SparkSession.read.json on a JSON file

  读取一个JSON文件可以用SparkSession.read.json方法

**从JSON到DataFrame**

- 指定DataFrame的schema

  1，通过反射自动推断，适合静态数据

  2，程序指定，适合程序运行中动态生成的数据

**加载json数据**

```python
#使用内部的schema
jsonDF = spark.read.json("xxx.json")
jsonDF = spark.read.format('json').load('xxx.json')

#指定schema
jsonDF = spark.read.schema(jsonSchema).json('xxx.json')
```

**嵌套结构的JSON**

- 重要的方法

  1，get_json_object

  2，get_json

  3，explode

## 3.2 实践

### 3.1 静态json数据的读取和操作

**无嵌套结构的json数据**

```python
from pyspark.sql import SparkSession
spark =  SparkSession.builder.appName('json_demo').getOrCreate()
sc = spark.sparkContext

# ==========================================
#                无嵌套结构的json
# ==========================================
jsonString = [
"""{ "id" : "01001", "city" : "AGAWAM",  "pop" : 15338, "state" : "MA" }""",
"""{ "id" : "01002", "city" : "CUSHMAN", "pop" : 36963, "state" : "MA" }"""
]
```

**从json字符串数组得到DataFrame**

```python
# 从json字符串数组得到rdd有两种方法
# 1. 转换为rdd，再从rdd到DataFrame
# 2. 直接利用spark.createDataFrame()，见后面例子

jsonRDD = sc.parallelize(jsonString)   # stringJSONRDD
jsonDF =  spark.read.json(jsonRDD)  # convert RDD into DataFrame
jsonDF.printSchema()
jsonDF.show()
```

**直接从文件生成DataFrame**

```python
# -- 直接从文件生成DataFrame
#只有被压缩后的json文件内容，才能被spark-sql正确读取，否则格式化后的数据读取会出现问题
jsonDF = spark.read.json("xxx.json")
# or
# jsonDF = spark.read.format('json').load('xxx.json')

jsonDF.printSchema()
jsonDF.show()

jsonDF.filter(jsonDF.pop>4000).show(10)
#依照已有的DataFrame，创建一个临时的表(相当于mysql数据库中的一个表)，这样就可以用纯sql语句进行数据操作
jsonDF.createOrReplaceTempView("tmp_table")

resultDF = spark.sql("select * from tmp_table where pop>4000")
resultDF.show(10)
```

### 3.2 动态json数据的读取和操作

**指定DataFrame的Schema**

3.1节中的例子为通过反射自动推断schema，适合静态数据

下面我们来讲解如何进行程序指定schema

**没有嵌套结构的json**

```python
jsonString = [
"""{ "id" : "01001", "city" : "AGAWAM",  "pop" : 15338, "state" : "MA" }""",
"""{ "id" : "01002", "city" : "CUSHMAN", "pop" : 36963, "state" : "MA" }"""
]

jsonRDD = sc.parallelize(jsonString)

from pyspark.sql.types import *

#定义结构类型
#StructType：schema的整体结构，表示JSON的对象结构
#XXXStype:指的是某一列的数据类型
jsonSchema = StructType() \
  .add("id", StringType(),True) \
  .add("city", StringType()) \
  .add("pop" , LongType()) \
  .add("state",StringType())

jsonSchema = StructType() \
  .add("id", LongType(),True) \
  .add("city", StringType()) \
  .add("pop" , DoubleType()) \
  .add("state",StringType())

reader = spark.read.schema(jsonSchema)

jsonDF = reader.json(jsonRDD)
jsonDF.printSchema()
jsonDF.show()
```

**带有嵌套结构的json**

```python
from pyspark.sql.types import *
jsonSchema = StructType([
    StructField("id", StringType(), True),
    StructField("city", StringType(), True),
    StructField("loc" , ArrayType(DoubleType())),
    StructField("pop", LongType(), True),
    StructField("state", StringType(), True)
])

reader = spark.read.schema(jsonSchema)
jsonDF = reader.json('data/nest.json')
jsonDF.printSchema()
jsonDF.show(2)
jsonDF.filter(jsonDF.pop>4000).show(10)
```

