## 八 埋点日志处理

### 8.1 埋点日志介绍

埋点就是在应用中特定的流程收集一些信息，用来跟踪应用使用的状况，后续用来进一步优化产品或是提供运营的数据支撑。比如用于作为用于实现个性化推荐的数据支撑。

埋点方式主流的无非两种方式：

- 自行研发：在研发的产品中注入代码进行统计，并搭建起相应的后台查询和处理
- 第三方平台：第三方统计工具，如友盟、百度移动等

埋点日志主要分为类型：

- 曝光日志：商品被展示到页面被称为曝光，曝光日志也就是指商品一旦被展示出来，则记录一条曝光日志
  - 曝光时间
  - 曝光场景
  - 用户唯一标识
  - 商品ID
  - 商品类别ID
- 点击流日志：用户浏览、收藏、加购物车、购买、评论、搜索等行为记录日志
  - 被曝光时间：对应曝光日志的曝光时间(浏览)
  - 被曝光场景：对应曝光日志的曝光场景(浏览)
  - 用户唯一标识
  - 行为时间
  - 行为类型
  - 商品ID
  - 商品类别ID
  - 停留时长(浏览)
  - 评分(评论)
  - 搜索词(搜索)

### 8.2 埋点日志意义

用户行为偏好分析

- 利用点击流日志分析个体/群体用户的行为特征，预测出用户行为的偏好

统计指标分析

- 点击率：顾名思义被点击的概率，计算公式通常是：点击次数/曝光次数。如某商品共曝光或展示了100次，曝光后总共被点击了10次，那么点击率则是10%
- 跳出率：用户访问一个页面后，之后没有再也没有其他操作，称为跳出，计算公式常用的是：访问一次就退出的访问量/总的访问量
  - 整体(整个板块/应用)跳出率
  - 单页面的跳出率。如某页面共计有100个用户访问，但其中有10个用户访问当前页面后就再也没有其他访问了，那么当前页面的跳出率是10%
- 转化率：电商中的转化率计算：商品订单成交量/商品访问量。如某商品累计访问量是100个，最终提交订单的只有5个，那么该商品转化率就是5%

注意：跳出率和转化率中的访问量指的是独立用户访问量。独立用户访问量：首先独立用户访问量并不等价于来访问的总用户个数。比如某用户A在1月1日访问了商品1，1月2日又访问了商品1，那么这里商品1的访问量应算作2次，而不是1次。

### 8.3 埋点日志处理

- 埋点日志格式化

``` python
def get_logger(logger_name, path, level):
    # 创建logger
    logger = logging.getLogger(logger_name)
    logger.setLevel(level)

    # 创建formatter
    fmt = "%(asctime)s: %(message)s"
    datefmt = "%Y/%m/%d %H:%M:%S"
    formatter = logging.Formatter(fmt, datefmt)
    
    # 创建handler
    handler = logging.FileHandler(path)
    handler.setLevel(level)
    
    # 添加 handler 和 formatter 到 logger
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    
    return logger
```

- 处理曝光日志

``` python
import time
import logging

exposure_logger = get_logger("exposure", "/root/bigdata/meiduo/meiduo_mall/logs/exposure.log", logging.DEBUG)

# 曝光日志
exposure_timesteamp = time.time()
exposure_loc = "detail"
uid = 1
sku_id = 1
cate_id = 1
```

- 创建日志

```python
exposure_logger.info("exposure_timesteamp<%d> exposure_loc<%s> uid<%d> sku_id<%d> cate_id<%d>"%(exposure_timesteamp, exposure_loc, uid, sku_id, cate_id))
```

- 查看日志

``` shell
cat /root/meiduoSourceCode/logs/exposure.log
2018/12/30 03:19:41: exposure_timesteamp<1543519181> exposure_loc<detail> uid<1> sku_id<1> cate_id<1>
2018/12/30 13:18:13: exposure_timesteamp<1543555093> exposure_loc<detail> uid<1> sku_id<1> cate_id<1>
2018/12/30 13:18:13: exposure_timesteamp<1543555093> exposure_loc<detail> uid<1> sku_id<1> cate_id<1>
2018/12/31 01:47:59: exposure_timesteamp<1543600079> exposure_loc<detail> uid<1> sku_id<1> cate_id<1>
2018/12/31 01:49:20: exposure_timesteamp<1543600079> exposure_loc<detail> uid<1> sku_id<1> cate_id<1>
2018/12/31 01:52:53: exposure_timesteamp<1543600373> exposure_loc<detail> uid<1> sku_id<1> cate_id<1>
2018/12/31 01:57:47: exposure_timesteamp<1543600666> exposure_loc<detail> uid<1> sku_id<1> cate_id<1>

```

- 点击日志

```python
import time
import logging

click_trace_logger = get_logger("click_trace", "/root/bigdata/meiduo/meiduo_mall/logs/click_trace.log", logging.DEBUG)
```

```python
# 点击流日志
exposure_timesteamp = exposure_timesteamp
exposure_loc = exposure_loc
timesteamp = time.time()
behavior="pv" # pv fav cart buy share 
uid = 1
sku_id = 1
cate_id = 1
# 假设某点击流日志记录格式如下：
click_trace_logger.info("exposure_timesteamp<%d> exposure_loc<%s> timesteamp<%d> behavior<%s> uid<%d> sku_id<%d> cate_id<%d>"%(exposure_timesteamp, exposure_loc, timesteamp, behavior, uid, sku_id, cate_id))
```



### 8.4 Flume采集日志

``` python
import re
s = '2018/12/01 02:35:13: exposure_timesteamp<1543601846> exposure_loc<detail> timesteamp<1543602913> behavior<pv> uid<1> sku_id<1> cate_id<1>'

match = re.search("\
exposure_timesteamp<(?P<exposure_timesteamp>.*?)> \
exposure_loc<(?P<exposure_loc>.*?)> \
timesteamp<(?P<timesteamp>.*?)> \
behavior<(?P<behavior>.*?)> \
uid<(?P<uid>.*?)> \
sku_id<(?P<sku_id>.*?)> \
cate_id<(?P<cate_id>.*?)>, s)

result = []
if match:
    result.append(("exposure_timesteamp", match.group("exposure_timesteamp")))
    result.append(("exposure_loc", match.group("exposure_loc")))
    result.append(("timesteamp", match.group("timesteamp")))
    result.append(("behavior", match.group("behavior")))
    result.append(("uid", match.group("uid")))
    result.append(("sku_id", match.group("sku_id")))
    result.append(("cate_id", match.group("cate_id")))
result
```

``` shell
flume-ng agent -f /root/bigdata/apache-flume-1.6.0-cdh5.7.0-bin/conf/exposure_log_hdfs.conf -n a1
```



### 8.5 实时点击流日志处理

- 编辑：`/root/bigdata/flume/conf/click_trace_log_hdfs.properties`

```shell
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/bigdata/meiduo/meiduo_mall/logs/click_trace.log
a1.sources.r1.channels = c1

a1.sources.r1.interceptors = t1
a1.sources.r1.interceptors.t1.type = timestamp

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://node-teach1:8020/project2-meiduo-rs/logs/click-trace/%y-%m-%d
as.sinks.k1.hdfs.userLocalTimeStamp = true
a1.sinks.k1.hdfs.filePrefix = click-trace-
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

a1.sinks.k2.channel = c1
a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.topic = meiduo_click_trace
a1.sinks.k2.kafka.bootstrap.servers = localhost:9092
a1.sinks.k2.kafka.flumeBatchSize = 20
a1.sinks.k2.kafka.producer.acks = 1
a1.sinks.k2.kafka.producer.linger.ms = 1
a1.sinks.k2.kafka.producer.compression.type = snappy
```

- 启动flume对点击流日志进行采集，分别发送到kafka和hdfs：

```
flume-ng agent -f /root/bigdata/apache-flume-1.6.0-cdh5.7.0-bin/conf/click_trace_log_hdfs.conf -n a1
```

- 启动zookeeper

  ```shell
  bin/zkServer.sh start
  ```

- 启动Kafka(如果还未启动的话)：`cd /root/bigdata/kafka_2.11-1.1.0 && bin/kafka-server-start.sh config/server.properties`

- 通过spark处理实时点击流日志数据

``` python
# 注意：初次安装并运行时，由于使用了kafka，所以会自动下载一系列的依赖jar包，会耗费一定时间

from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext

# 第2个参数表示 程序运行间隔时间
ssc = StreamingContext(sc, 1)

kafkaParams = {"metadata.broker.list": "192.168.199.126:9092"}
dstream = KafkaUtils.createDirectStream(ssc, ["topic2"], kafkaParams)
```

- 通过spark Streaming 处理kafka消息

```python
import re
def map(row):
    match = re.search("\
exposure_timesteamp<(?P<exposure_timesteamp>.*?)> \
exposure_loc<(?P<exposure_loc>.*?)> \
timesteamp<(?P<timesteamp>.*?)> \
behavior<(?P<behavior>.*?)> \
uid<(?P<uid>.*?)> \
sku_id<(?P<sku_id>.*?)> \
cate_id<(?P<cate_id>.*?)>", row[1])

    result = []
    if match:
        result.append(("exposure_timesteamp", match.group("exposure_timesteamp")))
        result.append(("exposure_loc", match.group("exposure_loc")))
        result.append(("timesteamp", match.group("timesteamp")))
        result.append(("behavior", match.group("behavior")))
        result.append(("uid", match.group("uid")))
        result.append(("sku_id", match.group("sku_id")))
        result.append(("cate_id", match.group("cate_id")))
    return result

def foreachRDD(rdd):
    print("foreachRDD", rdd.collect())
    
dstream.map(map).foreachRDD(foreachRDD)
ssc.start()
ssc.awaitTermation()
```

终端显示:

``` shell
foreachRDD []
foreachRDD []
foreachRDD []
foreachRDD []
foreachRDD []
foreachRDD [[('exposure_timesteamp', '1543603102'), ('exposure_loc', 'detail'), ('timesteamp', '1543603398'), ('behavior', 'pv'), ('uid', '1'), ('sku_id', '1'), ('cate_id', '1'), ('stay_time', '60')]]
foreachRDD []
foreachRDD []
foreachRDD []
foreachRDD []
foreachRDD []
foreachRDD []
foreachRDD []
foreachRDD []
foreachRDD []
foreachRDD []
foreachRDD [[('exposure_timesteamp', '1543603102'), ('exposure_loc', 'detail'), ('timesteamp', '1543603400'), ('behavior', 'pv'), ('uid', '1'), ('sku_id', '1'), ('cate_id', '1'), ('stay_time', '60')], [('exposure_timesteamp', '1543603102'), ('exposure_loc', 'detail'), ('timesteamp', '1543603401'), ('behavior', 'pv'), ('uid', '1'), ('sku_id', '1'), ('cate_id', '1'), ('stay_time', '60')]]
foreachRDD []
foreachRDD []
foreachRDD []
foreachRDD []
foreachRDD []
foreachRDD []
```

```python
ssc.stop()
```

### 8.6 离线日志处理

- 查看离线日志存储位置

``` shell
!hadoop fs -ls /project2-meiduo-rs/logs/click-trace
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/root/bigdata/hadoop-2.9.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/bigdata/apache-hive-2.3.4-bin/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Found 2 items
drwxr-xr-x   - root supergroup          0 2018-12-30 13:28 /project2-meiduo-rs/logs/click-trace/18-12-30
drwxr-xr-x   - root supergroup          0 2018-12-31 02:51 /project2-meiduo-rs/logs/click-trace/18-12-31

```

- 通过spark读取点击日志

```python
date = "18-12-01"
click_trace = spark.read.csv("hdfs://node-teach1:8020/project2-meiduo-rs/logs/click-trace/%s"%date)
#truncate:show结果中最多显示的字符，默认为true，最多显示20个字符，剩余为省略号，设置为False即可全部显示
click_trace.show(truncate=False)
```

终端显示:

```shell
+-------------------------------------------------------------------------------------------------------------------------------------------------------+
|_c0                                                                                                                                                    |
+-------------------------------------------------------------------------------------------------------------------------------------------------------+
|2018/11/30 13:21:08: exposure_timesteamp<1543555093> exposure_loc<detail> timesteamp<1543555268> behavior<pv> uid<1> sku_id<1> cate_id<1> stay_time<60>|
|2018/11/30 13:21:08: exposure_timesteamp<1543555093> exposure_loc<detail> timesteamp<1543555268> behavior<pv> uid<1> sku_id<1> cate_id<1> stay_time<60>|
|2018/11/30 13:28:37: exposure_timesteamp<1543555093> exposure_loc<detail> timesteamp<1543555717> behavior<pv> uid<1> sku_id<1> cate_id<1> stay_time<60>|
|2018/11/30 13:28:37: exposure_timesteamp<1543555093> exposure_loc<detail> timesteamp<1543555717> behavior<pv> uid<1> sku_id<1> cate_id<1> stay_time<60>|
|2018/11/30 13:28:37: exposure_timesteamp<1543555093> exposure_loc<detail> timesteamp<1543555717> behavior<pv> uid<1> sku_id<1> cate_id<1> stay_time<60>|
|2018/11/30 13:28:37: exposure_timesteamp<1543555093> exposure_loc<detail> timesteamp<1543555717> behavior<pv> uid<1> sku_id<1> cate_id<1> stay_time<60>|
|2018/11/30 13:28:37: exposure_timesteamp<1543555093> exposure_loc<detail> timesteamp<1543555717> behavior<pv> uid<1> sku_id<1> cate_id<1> stay_time<60>|
|2018/12/01 02:34:28: exposure_timesteamp<1543601846> exposure_loc<detail> timesteamp<1543602868> behavior<pv> uid<1> sku_id<1> cate_id<1> stay_time<60>|
|2018/12/01 02:34:28: exposure_timesteamp<1543601846> exposure_loc<detail> timesteamp<1543602868> behavior<pv> uid<1> sku_id<1> cate_id<1> stay_time<60>|
|2018/12/01 02:34:29: exposure_timesteamp<1543601846> exposure_loc<detail> timesteamp<1543602869> behavior<pv> uid<1> sku_id<1> cate_id<1> stay_time<60>|
|2018/12/01 02:34:29: exposure_timesteamp<1543601846> exposure_loc<detail> timesteamp<1543602869> behavior<pv> uid<1> sku_id<1> cate_id<1> stay_time<60>|
|2018/12/01 02:34:30: exposure_timesteamp<1543601846> exposure_loc<detail> timesteamp<1543602870> behavior<pv> uid<1> sku_id<1> cate_id<1> stay_time<60>|
|2018/12/01 02:34:30: exposure_timesteamp<1543601846> exposure_loc<detail> timesteamp<1543602870> behavior<pv> uid<1> sku_id<1> cate_id<1> stay_time<60>|
|2018/12/01 02:34:31: exposure_timesteamp<1543601846> exposure_loc<detail> timesteamp<1543602871> behavior<pv> uid<1> sku_id<1> cate_id<1> stay_time<60>|
|2018/12/01 02:17:30: exposure_timesteamp<1543601846> exposure_loc<detail> timesteamp<1543601850> behavior<pv> uid<1> sku_id<1> cate_id<1> stay_time<60>|
|2018/12/01 02:17:30: exposure_timesteamp<1543601846> exposure_loc<detail> timesteamp<1543601850> behavior<pv> uid<1> sku_id<1> cate_id<1> stay_time<60>|
|2018/12/01 02:17:54: exposure_timesteamp<1543601846> exposure_loc<detail> timesteamp<1543601850> behavior<pv> uid<1> sku_id<1> cate_id<1> stay_time<60>|
|2018/12/01 02:17:54: exposure_timesteamp<1543601846> exposure_loc<detail> timesteamp<1543601850> behavior<pv> uid<1> sku_id<1> cate_id<1> stay_time<60>|
|2018/12/01 02:30:54: exposure_timesteamp<1543601846> exposure_loc<detail> timesteamp<1543602654> behavior<pv> uid<1> sku_id<1> cate_id<1> stay_time<60>|
|2018/12/01 02:30:54: exposure_timesteamp<1543601846> exposure_loc<detail> timesteamp<1543602654> behavior<pv> uid<1> sku_id<1> cate_id<1> stay_time<60>|
+-------------------------------------------------------------------------------------------------------------------------------------------------------+
only showing top 20 rows
```

- spark读取曝光日志

``` python
date = "18-12-01"
exposure = spark.read.csv("hdfs://node-teach1:8020/project2-meiduo-rs/logs/exposure/%s"%date)
exposure.show()
```

终端显示

``` shell
+-----------------------------------------------------------------------------------------------------+
|_c0                                                                                                  |
+-----------------------------------------------------------------------------------------------------+
|2018/11/30 03:19:41: exposure_timesteamp<1543519181> exposure_loc<detail> uid<1> sku_id<1> cate_id<1>|
|2018/11/30 13:18:13: exposure_timesteamp<1543555093> exposure_loc<detail> uid<1> sku_id<1> cate_id<1>|
|2018/11/30 13:18:13: exposure_timesteamp<1543555093> exposure_loc<detail> uid<1> sku_id<1> cate_id<1>|
|2018/12/01 01:47:59: exposure_timesteamp<1543600079> exposure_loc<detail> uid<1> sku_id<1> cate_id<1>|
|2018/12/01 01:49:20: exposure_timesteamp<1543600079> exposure_loc<detail> uid<1> sku_id<1> cate_id<1>|
|2018/12/01 01:52:53: exposure_timesteamp<1543600373> exposure_loc<detail> uid<1> sku_id<1> cate_id<1>|
|2018/12/01 01:57:47: exposure_timesteamp<1543600666> exposure_loc<detail> uid<1> sku_id<1> cate_id<1>|
+-----------------------------------------------------------------------------------------------------+
```

- 处理曝光日志

```python
import re
from pyspark.sql import Row

def map(row):
    match = re.search("\
exposure_timesteamp<(?P<exposure_timesteamp>.*?)> \
exposure_loc<(?P<exposure_loc>.*?)> \
timesteamp<(?P<timesteamp>.*?)> \
behavior<(?P<behavior>.*?)> \
uid<(?P<uid>.*?)> \
sku_id<(?P<sku_id>.*?)> \
cate_id<(?P<cate_id>.*?)> \
", row._c0)

    row = Row(exposure_timesteamp=match.group("exposure_timesteamp"),
                exposure_loc=match.group("exposure_loc"),
                timesteamp=match.group("timesteamp"),
                behavior=match.group("behavior"),
                uid=match.group("uid"),
                sku_id=match.group("sku_id"),
                cate_id=match.group("cate_id"),
                )
    
    return row
click_trace.rdd.map(map).toDF().show()
    
```

终端数据显示

``` shell
+--------+-------+------------+-------------------+------+---------+----------+---+
|behavior|cate_id|exposure_loc|exposure_timesteamp|sku_id|stay_time|timesteamp|uid|
+--------+-------+------------+-------------------+------+---------+----------+---+
|      pv|      1|      detail|         1543555093|     1|       60|1543555268|  1|
|      pv|      1|      detail|         1543555093|     1|       60|1543555268|  1|
|      pv|      1|      detail|         1543555093|     1|       60|1543555717|  1|
|      pv|      1|      detail|         1543555093|     1|       60|1543555717|  1|
|      pv|      1|      detail|         1543555093|     1|       60|1543555717|  1|
|      pv|      1|      detail|         1543555093|     1|       60|1543555717|  1|
|      pv|      1|      detail|         1543555093|     1|       60|1543555717|  1|
|      pv|      1|      detail|         1543601846|     1|       60|1543602868|  1|
|      pv|      1|      detail|         1543601846|     1|       60|1543602868|  1|
|      pv|      1|      detail|         1543601846|     1|       60|1543602869|  1|
|      pv|      1|      detail|         1543601846|     1|       60|1543602869|  1|
|      pv|      1|      detail|         1543601846|     1|       60|1543602870|  1|
|      pv|      1|      detail|         1543601846|     1|       60|1543602870|  1|
|      pv|      1|      detail|         1543601846|     1|       60|1543602871|  1|
|      pv|      1|      detail|         1543601846|     1|       60|1543601850|  1|
|      pv|      1|      detail|         1543601846|     1|       60|1543601850|  1|
|      pv|      1|      detail|         1543601846|     1|       60|1543601850|  1|
|      pv|      1|      detail|         1543601846|     1|       60|1543601850|  1|
|      pv|      1|      detail|         1543601846|     1|       60|1543602654|  1|
|      pv|      1|      detail|         1543601846|     1|       60|1543602654|  1|
+--------+-------+------------+-------------------+------+---------+----------+---+
only showing top 20 rows
```

- 处理点击日志

```python
import re
from pyspark.sql import Row

def map(row):
    match = re.search("\
exposure_timesteamp<(?P<exposure_timesteamp>.*?)> \
exposure_loc<(?P<exposure_loc>.*?)> \
uid<(?P<uid>.*?)> \
sku_id<(?P<sku_id>.*?)> \
cate_id<(?P<cate_id>.*?)>", row._c0)

    row = Row(exposure_timesteamp=match.group("exposure_timesteamp"),
                exposure_loc=match.group("exposure_loc"),
                uid=match.group("uid"),
                sku_id=match.group("sku_id"),
                cate_id=match.group("cate_id"))
    
    return row
exposure.rdd.map(map).toDF().show()
```

终端显示

```python
+-------+------------+-------------------+------+---+
|cate_id|exposure_loc|exposure_timesteamp|sku_id|uid|
+-------+------------+-------------------+------+---+
|      1|      detail|         1543519181|     1|  1|
|      1|      detail|         1543555093|     1|  1|
|      1|      detail|         1543555093|     1|  1|
|      1|      detail|         1543600079|     1|  1|
|      1|      detail|         1543600079|     1|  1|
|      1|      detail|         1543600373|     1|  1|
|      1|      detail|         1543600666|     1|  1|
+-------+------------+-------------------+------+---+
```

- 利用点击流日志中行为是"pv"的数据同时`cate_id|exposure_loc|exposure_timesteamp|sku_id|uid`一一对应的数据进行对应，最终就能得出：所有曝光的商品中，哪些商品被用户浏览了，哪些没有被浏览



- 
