## 3.4 利用MRJob编写和运行MapReduce代码

**mrjob 简介**

- mrjob是编写在Hadoop上运行的Python程序的最简单方法。如果使用mrjob，您将能够在本地测试代码，而无需安装Hadoop或在自己选择的集群上运行它。

  - 在一个类中保存一个作业的所有MapReduce代码

  - 在运行时很容易上传和安装代码和数据依赖

  - 用一行代码切换输入和输出格式

  - 自动下载和解析Python跟踪的错误日志

  - 在Python代码之前或之后放置命令行过滤器

  如果你不想成为Hadoop专家，但是需要MapReduce的计算能力，mrjob可能正是适合你的。

**mrjob 安装**

- 使用pip安装

  `pip install mrjob`

**运行模式**

- 内嵌( -r inline)
- 本地 (-r local)
- Hadoop (-r hadoop)
- Amazon EMR (-r emr)

**mrjob实现WordCount**

```python
from mrjob.job import MRJob
#定义一个类继承MRJob
class MRWordCounter(MRJob):
	#定义两个方法：mapper和reducer
    def mapper(self, key, line):
        for word in line.split():
            yield word, 1
            
    def reducer(self, word, occurrences):
        yield word, sum(occurrences)
        
if __name__ == '__main__':
    MRWordCounter.run()
```

运行MapReduce

1、内嵌(-r inline)方式

特点是调试方便，启动单一进程模拟任务执行状态和结果，默认(-r inline)可以省略，输出文件使用 > output-file 或-o output-file，比如下面两种运行方式是等价的

python word_count.py -r inline input.txt > output.txt
python word_count.py input.txt > output.txt

2、本地(-r local)方式

用于本地模拟Hadoop调试，与内嵌(inline)方式的区别是启动了多进程执行每一个任务。如：

python word_count.py -r local input.txt > output1.txt

3、Hadoop(-r hadoop)方式

用于hadoop环境，支持Hadoop运行调度控制参数，如：

1)指定Hadoop任务调度优先级(VERY_HIGH|HIGH),如：--jobconf mapreduce.job.priority=VERY_HIGH。

2)Map及Reduce任务个数限制，如：--jobconf mapreduce.map.tasks=2  --jobconf mapreduce.reduce.tasks=5

python word_count.py -r hadoop hdfs:///test.txt -o  hdfs:///output

**mrjob 实现 topN统计**

统计数据中出现次数最多的前n个数据

```python
import sys
from mrjob.job import MRJob,MRStep
import heapq

class TopNWords(MRJob):
    def mapper(self, _, line):
        if line.strip() != "":
            for word in line.strip().split():
                yield word,1

    #介于mapper和reducer之间，用于临时的将mapper输出的数据进行统计
    def combiner(self, word, counts):
        yield word,sum(counts)

    def reducer_sum(self, word, counts):
        yield None,(sum(counts),word)

    #利用heapq将数据进行排序，将最大的2个取出
    def top_n_reducer(self,_,word_cnts):
        for cnt,word in heapq.nlargest(2,word_cnts):
            yield word,cnt
    
	#实现steps方法用于指定自定义的mapper，comnbiner和reducer方法
    def steps(self):
        return [
            MRStep(mapper=self.mapper,
                   combiner=self.combiner,
                   reducer=self.reducer_sum),
            MRStep(reducer=self.top_n_reducer)
        ]

def main():
    TopNWords.run()

if __name__=='__main__':
    main()
```
