## 4、Spark Streaming对接Kafka

### 4.1 对接数据的两种方式

在前面的案例中，我们监听了来自网络端口的数据，实现了WordCount，但是在实际开发中并不是这样。我们更多的是接收来自高级数据源的数据，例如Kafka。

下面我们来介绍如何利用Spark Streaming对接Kafka

![](pics/ss13.png)

以下两种方式都是为了数据可靠性：

- Receiver-based Approach：由Receiver来对接数据，Receiver接收到数据后会将日志预先写入到hdfs上（WAL），同时也会将数据做副本传输到其他的Worker节点。在读取数据的过程中，Receiver是从Zookeeper中获取数据的偏移信息。
- Direct Approach（No Receivers）：没有Receiver接收信息，由Spark Streaming直接对接Kafka的broker，获取数据和数据的偏移信息。

上述两种方式中，Direct Approach方式更加可靠，不需要Spark Streaming自己去保证维护数据的可靠性，而是由善于处理这类工作的Kafka来做。

**对应代码**

- KafkaUtils.createStream(ssc,zkQuorum,"spark-streaming-consumer",{topic:1})
- KafkaUtils.createDirectStream(ssc,[topic],{"metadata.broker.list":'localhost:9092'})

**Direct API的好处**

- **简化的并行**：在Receiver的方式中我们提到创建多个Receiver之后利用union来合并成一个Dstream的方式提高数据传输并行度。而在Direct方式中，**Kafka中的partition与RDD中的partition是一一对应**的并行读取Kafka数据，这种映射关系也更利于理解和优化。
- **高效**：在Receiver的方式中，为了达到0数据丢失需要将数据存入Write Ahead Log中，这样在Kafka和日志中就保存了两份数据，浪费！而第二种方式不存在这个问题，只要我们Kafka的数据保留时间足够长，我们都能够从Kafka进行数据恢复。
- **精确一次**：在Receiver的方式中，使用的是Kafka的高阶API接口从Zookeeper中获取offset值，这也是传统的从Kafka中读取数据的方式，但由于Spark Streaming消费的数据和Zookeeper中记录的offset不同步，这种方式偶尔会造成数据重复消费。而第二种方式，直接使用了简单的低阶Kafka API，Offsets则利用Spark Streaming的checkpoints进行记录，消除了这种不一致性。

## 4.2 环境安装

### 课程目标：

- 独立实现pycharm对接远程centos服务器
- 独立实现spark-streaming对接kafka的环境配置

**安装步骤：**

- 准备pycharm环境

  - 1，对接到centos服务器，下载环境

    - 1.1 选择Tools -->Deployment-->Configuration

      ![](pics/env1.png)

      ![](pics/env2.png)

      注：选择Type为SFTP，写入主机名，登陆的用户名和密码

      ![](pics/env3.png)

      注：选择Deployment目录为基准的根目录

    - 1.2 选择File-->settings-->Project xxx-->Project Interpreter

      ![](pics/env4.png)

      ![](pics/env5.png)

      注：输入远程连接的主机名，登陆的用户名和密码，进行远程python环境的对接。

- 先安装Zookeeper

  - 1，下载zookeeper-3.4.5-cdh5.7.0，配置ZK_HOME
  - 2，进入conf目录，配置zoo.cfg文件
    - 修改dataDir=/xxx(默认为系统tmp目录下，重启后数据会丢失)
  - 3，启动zookeeper
    - zkServer.sh start

- 安装kafka

  - 1，下载Kafka

  - 2，配置KAFKA_HOME

  - 3，config目录下server.properties

    - broker.id=0 在集群中需要设置为唯一的
    - zookeeper.connect=localhost:2181

  - 4，启动kafka

    - bin/kafka-server-start.sh config/server.properties

      看到started，说明启动成功

  - 5，创建topic

    - bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

      replication-factor：副本数量

      partitions：分区数量

      出现Created topic "test"，说明创建成功

  - 6，查看所有topic

    - bin/kafka-topics.sh --list --zookeeper localhost:2181

  - 7，通过生产者发送消息

    - bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

  - 8，通过消费者消费消息（此时的消费者已经变成了Spark  Streaming）

    - bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

      -from-beginning：从最开始生产队的数据开始消费

- 配置spark streaming kafka开发环境

  - 1，下载spark streaming集成kafka的jar包

    spark-streaming-kafka-0-8-assembly_2.11-2.1.1.jar

  - 2，将jar包放置到spark的jars目录下

  - 3，编辑spark/conf目录下的spark-defaults.conf，添加如下两条配置

    ```properties
    spark.driver.extraClassPath=/xxx/jars/spark-streaming-kafka-0-8-assembly_2.11-2.1.1.jar
    spark.executor.extraClassPath=/xxx/jars/spark-streaming-kafka-0-8-assembly_2.11-2.1.1.jar
    #driver和executor对应的两个路径一致
    ```

  - 4，测试是否配置成功

    - 启动pyspark

    - 把下面的代码拷贝到pyspark中

      ```python
      from pyspark.streaming import StreamingContext
      from pyspark.streaming.kafka import KafkaUtils
      from pyspark.sql.session import SparkSession
      
      spark = SparkSession.builder.master("local[2]").getOrCreate()
      sc = spark.sparkContext
      ssc = StreamingContext(sc,3)
      ks = KafkaUtils.createDirectStream(ssc,["topicxx"],{"metadata.broker.list":"localhost:9092"})
      ```

      如果没有配置成功，会抛出ClassNotFoundException

### 4.3 案例实现

需求：利用Spark Streaming不断处理来自Kafka生产者生产的数据，并统计出现的单词数量

- 1，编写producer.py，用于生产数据

  ```python
  from kafka import KafkaProducer
  import time
  
  #创建KafkaProducer，连接broker
  producer = KafkaProducer(bootstrap_servers='localhost:9092')
  
  #每隔一段时间发送一端字符串数据到broker
  def send_data():
      for i in range(60):
          producer.send('topic_name',b"hello,kafka,spark,streaming,kafka")
          time.sleep(0.1)
  send_data()
  ```

- 2，编辑Spark Streaming代码，统计单词出现的数量

  ```python
  from pyspark.streaming import StreamingContext
  from pyspark.streaming.kafka import KafkaUtils
  from pyspark.sql.session import SparkSession
  
  import os
  
  #注：在pycharm上运行时，会提示SPARK_HOME找不到，需要配置SPARK_HOME环境变量
  os.environ["SPARK_HOME"]="/root/bigdata/spark-2.3.0-bin-2.6.0-cdh5.7.0"
  #由于系统中存在两种不同版本的python，需要配置PYSPARK_PYTHON和PYSPARK_DRIVER_PYTHON指定运行为python3
  os.environ["PYSPARK_PYTHON"]="/usr/local/python3/bin/python3"
  os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/local/python3/bin/python3"
  
  topic="topic_name"
  
  spark = SparkSession.builder.master("local[2]").getOrCreate()
  sc = spark.sparkContext
  ssc = StreamingContext(sc,3)
  
  #创建direct连接，指定要连接的topic和broker地址
  ks = KafkaUtils.createDirectStream(ssc,[topic],{"metadata.broker.list":"localhost:9092"})
  #(None,内容)
  ks.pprint()
  
  #以下代码每操作一次，就打印输出一次
  lines = ks.map(lambda x:x[1])
  lines.pprint()
  
  words = lines.flatMap(lambda line:line.split(","))
  words.pprint()
  
  pairs = words.map(lambda word:(word,1))
  pairs.pprint()
  
  counts = pairs.reduceByKey(lambda x,y:x+y)
  counts.pprint()
  
  ssc.start()
  #等待计算结束
  ssc.awaitTermination()
  ```

- 3，开启Spark Streaming消费数据，将产生的日志结果存储到日志中

  spark-submit xxx.py>a.log

- 4，开启producer.py，生产数据

  python3 producer.py

- 5，通过浏览器观察运算过程

  http://node-teach:4040

- 6，分析生成的日志内容

  ```
  -------------------------------------------
  Time: 2018-12-11 01:31:21
  -------------------------------------------
  (None, 'hello,kafka,spark,streaming,kafka')
  (None, 'hello,kafka,spark,streaming,kafka')
  (None, 'hello,kafka,spark,streaming,kafka')
  (None, 'hello,kafka,spark,streaming,kafka')
  
  -------------------------------------------
  Time: 2018-12-11 01:02:33
  -------------------------------------------
  hello,kafka,spark,streaming,kafka
  hello,kafka,spark,streaming,kafka
  
  -------------------------------------------
  Time: 2018-12-11 01:02:33
  -------------------------------------------
  hello
  kafka
  spark
  streaming
  kafka
  hello
  kafka
  spark
  streaming
  kafka
  
  -------------------------------------------
  Time: 2018-12-11 01:02:33
  -------------------------------------------
  ('hello', 1)
  ('kafka', 1)
  ('spark', 1)
  ('streaming', 1)
  ('kafka', 1)
  ('hello', 1)
  ('kafka', 1)
  ('spark', 1)
  ('streaming', 1)
  ('kafka', 1)
  
  -------------------------------------------
  Time: 2018-12-11 01:02:33
  -------------------------------------------
  ('streaming', 2)
  ('hello', 2)
  ('kafka', 4)
  ('spark', 2)
  
  -------------------------------------------
  Time: 2018-12-11 01:02:36
  -------------------------------------------
  
  -------------------------------------------
  Time: 2018-12-11 01:02:36
  -------------------------------------------
  ```

