大数据导论思维第8章大数据批处理课件.pptx

上传人:牧羊曲112 文档编号:2147239 上传时间:2023-01-19 格式:PPTX 页数:81 大小:5.43MB
返回 下载 相关 举报
大数据导论思维第8章大数据批处理课件.pptx_第1页
第1页 / 共81页
大数据导论思维第8章大数据批处理课件.pptx_第2页
第2页 / 共81页
大数据导论思维第8章大数据批处理课件.pptx_第3页
第3页 / 共81页
大数据导论思维第8章大数据批处理课件.pptx_第4页
第4页 / 共81页
大数据导论思维第8章大数据批处理课件.pptx_第5页
第5页 / 共81页
点击查看更多>>
资源描述

《大数据导论思维第8章大数据批处理课件.pptx》由会员分享,可在线阅读,更多相关《大数据导论思维第8章大数据批处理课件.pptx(81页珍藏版)》请在三一办公上搜索。

1、,大数据导论,第八章,PART 01 MapReduce概述,PART 02 Hadoop MapReduce架构,CONTENTS,目录,PART 03 Hadoop MapReduce的工作流程,PART 04 实例分析:单词计数,CONTENTS,目录,PART 05 Hadoop MapReduce 的工作机制,PART 06 Hadoop MapReduce的主要特点,PART 07 Hadoop MapReduce编程实战,PART 08 习题,PART 01 MapReduce概述,批处理模式,批处理模式是一种进行大规模数据处理的最早的模式。批处理主要操作大规模静态数据集,并在整

2、体数据处理完毕后返回结果。批处理模式中使用的数据集通常符合下列特征:,有界:批处理数据集代表数据的有限集合;持久:数据通常始终存储在某种类型的持久存储位置中;大量:批处理操作通常是处理超大规模数据集的唯一方法。,需要处理大量数据的任务通常最适合用批处理操作进行处理。由于批处理在应对大量持久数据方面的表现极为出色,因此经常被用于对历史数据进行分析。,批处理模式,为了提高处理效率,对大规模数据集进行批处理需要借助分布式并行程序。分布式并行程序运行在大量计算机组成的集群上,从而可以同时利用多台计算机并发完成同一个数据处理任务,提高了处理效率。同时,可以通过增加新的计算机扩充集群的计算能力。,Goog

3、le公司最先实现了分布式并行处理模式MapReduce,并于2004年以论文的方式对外公布了其工作原理。Hadoop MapReduce是它的开源实现。Google的MapReduce运行在Google的分布式文件系统GFS上;Hadoop MapReduce运行在分布式文件系统HDFS上。,批处理模式,MapReduce简释,思考:如何知道相当厚的一摞牌中有多少张红桃。,MapReduce简释,MapReduce方法则是:(1)把这摞牌分配给在座的所有玩家;(2)让每个玩家数自己手中的牌中有几张是红桃,然后把这个数目汇报给你;(3)你把所有玩家告诉你的数字加起来,得到最后的结论。,最直观的方

4、式就是你通过一张张检查这些牌,并且数出有多少张是红桃。这种方法的缺陷是速度太慢,特别是当牌的数量特别高的情况下,获取结果的时间会很长。,显而易见,MapReduce方法通过让所有玩家同时并行检查牌来找出一摞牌中有多少红桃,可以大大加快得多答案的速度。,MapReduce简释,MapReduce方法使用了拆分的思想,合并了两种经典函数:映射(Map):对集合中的每个元素进行同一个操作。如果想把表单里每个单元格乘以二,那么把这个函数单独地应用在每个单元格上的操作就属于映射(Map)。化简(Reduce):遍历集合中的元素来返回一个综合的结果。如果想找出表单里所有数字的总和,那么输出表单里一列数字的

5、总和这个任务就属于化简(Reduce)。,MapReduce简释,站在MapReduce角度,重新审视前面分散纸牌找出红桃总数的例子玩家代表计算机,因为他们同时工作,所以他们是个集群。通过把牌分给多个玩家并且让他们各自数数,就是在并行执行运算,通过告诉每个人去数数,实际上就是对一项检查每张牌的任务进行了映射。还要注意的情况就是牌分配的是否均匀。,MapReduce算法的机制要远比数牌复杂得多,但是主体思想是一致的,通过分散计算来分析大量数据。,MapReduce基本思想,使用MapReduce处理大数据的基本思想包括三个层面。对大数据采取分而治之的思想构建抽象模型:Map和Reduce函数上升

6、到构架:并行自动化并隐藏低层细节,MapReduce基本思想,大数据处理思想:分而治之,1,并行计算的第一个重要问题是:如何划分计算任务或者计算数据,以便对划分的子任务或数据块同时进行计算。一个大数据若可以分为具有同样计算过程的数据块,并且这些数据块之间不存在数据依赖关系,则提高处理速度的最好办法就是并行计算但是,一些计算问题前后数据项之间存在很强的依赖关系,无法进行划分,只能串行计算。,MapReduce基本思想,大数据处理思想:分而治之,1,例如:假设有一个巨大的2维数据,大的无法同时放进一个计算机的内存。现在需要求每个元素的开立方。思考:因为对每个元素的处理是相同的,并且数据元素间不存在

7、数据依赖关系,可以考虑不同的划分方法将其划分为子数组,由一组计算机并行处理。,MapReduce基本思想,构建抽象模型:Map和Reduce函数,2,Map函数对一组数据元素进行某种重复式的处理,Reduce函数对Map的中间结果进行某种进一步的结果整理。MapReduce借鉴了函数式程序设计语言Lisp中的思想,定义了Map和Reduce两个抽象的编程接口,为程序员提供了一个清晰的操作接口抽象描述,由用户去编程实现。,MapReduce基本思想,构建抽象模型:Map和Reduce函数,2,Map:(k1;v1)(k2;v2)输入:键值对(k1;v1)表示的数据。处理:文档数据记录(如文本文件

8、中的行,或数据表格中的行)将以“键值对”形式传入Map函数;Map函数将处理这些键值对,并以另一种键值对形式输出处理的一组键值对中间结果(k2;v2)。输出:键值对(k2;v2)表示的一组中间数据。,MapReduce基本思想,构建抽象模型:Map和Reduce函数,2,Reduce:(k2;v2)(k3;v3)输入:由Map输出的一组键值对(k2;v2)将被进行合并处理将同样主键下的不同数值合并到一个列表v2中,故Reduce的输入为(k2;v2)。处理:对传入的中间结果列表数据进行某种整理或进一步的处理,并产生最终的某种形式的结果输出(k3;v3)。输出:最终输出结果(k3;v3)。,Ma

9、pReduce基本思想,构建抽象模型:Map和Reduce函数,2,各个Map函数对所划分的数据并行处理,从不同的输入数据产生不同的中间结果输出。各个Reduce也各自并行计算,各自负责处理不同的中间结果数据集合。进行Reduce处理之前,必须等到所有的Map函数做完。因此,在进入Reduce前需要有一个同步障;,MapReduce基本思想,上升到构架:并行自动化并隐藏低层细节,3,MapReduce提供一个统一的计算框架,完成计算任务的划分和调度,数据的分布存储和划分,处理数据与计算任务的同步,结果数据的收集整理,系统通信、负载平衡、计算性能优化处理等,处理系统节点出错检测和失效恢复。Map

10、Reduce设计并提供了统一的计算框架,为程序员隐藏了绝大多数系统层面的处理细节,仅需要关心其应用层的具体计算问题,仅需编写少量的处理应用本身计算问题的程序代码。,MapReduce基本思想,上升到构架:并行自动化并隐藏低层细节,3,MapReduce计算架构提供的主要功能包括如下几点:任务调度数据/程序互定位出错处理分布式数据存储与文件管理Combiner和Partitioner,Map和Reduce函数,MapReduce将复杂的、运行在大规模集群上的并行计算过程高度地抽象为两个简单的函数:Map和Reduce。,Map函数的任务就是把每一个输入的键值对映射成一个或一批新的键值对。Redu

11、ce函数的任务是将输入的一系列具有相同键的键值对以某种方式组合起来,然后输出处理后的键值对。,Map操作就是对一些独立元素组成的概念上的列表的每一个元素进行指定的操作。Reduce操作指的是对一个列表的元素进行适当的合并。,广义角度,狭义角度,Map和Reduce函数,思路:把18个员工的表分成3个模块,每个模块包括6个员工,由一个Map负责处理,这样就可以比顺序处理的效率提高2倍。而在每一个Map函数中,对每个员工薪资处理操作都是完全相同的,增加10%。,员工薪资列表中,每个员工的薪资都增加10%,采用Map函数该怎么完成?,Map和Reduce函数,思路:通过让列表中的元素跟自己的相邻的元

12、素相加的方式把列表减半,如此递归运算直到列表只剩下一个元素,然后用这个元素除以人数,就得到了平均薪资。,如果想知道员工的平均薪资是多少,该如何定义reduce函数?,Map和Reduce函数,Map函数和Reduce函数都是以作为输入,按一定的映射规则转换成另一个或一批进行输出。,PART 02 Hadoop MapReduce架构,Hadoop MapReduce架构,首先以MapReduce 1.0介绍MapReduce的核心概念,然后再在此基础上介绍MapReduce 2.0。,Hadoop MapReduce架构由一个单独的Master JobTracker 和每个集群节点一个Slav

13、e TaskTracker共同组成。Master负责调度构成一个作业的所有任务,这些任务分布在不同的Slave上,由Master监控它们的执行,重新执行已经失败的任务。而Slave仅负责执行由Master指派的任务。,Hadoop MapReduce架构,MapReduce1.0的架构由Client(客户端)、JobTracker(作业跟踪器)、TaskTracker(任务跟踪器)、Task(任务)组成。,Client用户编写的MapReduce程序通过Client提交给JobTracker。,Hadoop MapReduce架构,Job TrackerJobTracker主要负责资源监控和作

14、业调度,并且监控所有TaskTracker与作业的健康情况,一旦有失败情况发生,就会将相应的任务分配到其他节点上去执行。Task TrackerTaskTraker会周期性的将本节点资源使用和任务进度汇报给JobTracker,也就是“心跳”方式;与此同时会接受jobTracker发送过来的命令并执行操作。TaskTask分为Map Task和Reduce Task两种,由TaskTracker启动,分别执行Map和Reduce任务。一般来讲,每个节点可以运行多个Map和Reduce任务。,Hadoop MapReduce架构,MapReduce设计的一个核心理念就是“计算向数据靠拢”,而不是

15、传统计算模式的“数据向计算靠拢”。这是因为大量的移动数据需要的网络传输开销太大,同时也大大降低了数据处理的效率。所以,Hadoop MapReduce框架和分布式文件系统HDFS是运行在一组相同的节点上的。这种配置允许框架在那些已经存好数据的节点上高效地调度任务,这可以使整个集群的网络带宽被非常高效地利用,从而减少了节点间数据的移动。,Hadoop MapReduce架构,Hadoop的MapReduce与HDFS集群架构,PART 03 Hadoop MapReduce工作流程,Hadoop MapReduce的工作流程,MapReduce就是将输入进行分片,然后交给不同的Map任务进行处理

16、,然后由Reduce任务合并成最终的解。,实际的处理过程可以理解为InputMapSortCombinePartitionReduceOutput,Hadoop MapReduce的工作流程,在Map阶段,框架调用Map函数对输入的每一个对进行处理,也就是完成map(k1,v1)-list(k2,v2)的映射操作。,在Input阶段,就是根据数据的存储位置,把数据分成多个分片(Split)在多个节点上并行处理。,在Sort阶段,当Map任务结束以后,会生成许多形式的中间结果,框架会对这些中间结果按照键进行排序。,在Combine阶段,框架对于在Sort阶段排序之后有相同键的中间结果进行合并。,

17、Hadoop MapReduce的工作流程,在Partition阶段,框架将Combine后的中间结果按照键的取值的范围划分为R份,分别发给R个运行Reduce任务的节点,并行执行。,在Reduce阶段,每个Reduce任务对Map函数处理的结果按照用户定义的Reduce函数进行汇总计算得到最后结果。,在Output阶段,框架把Reduce处理的结果按照用户指定的输出数据格式写入HDFS。,在MapReduce的整个处理过程中,不同的Map任务之间不会进行任何通信,不同的Reduce任务之间也不会发生任何信息交换。所有的信息交换都是通过MapReduce框架实现的。,PART 04 实例分析:

18、单词计数,设计思路,首先,需要检查单词计数是否可以使用MapReduce进行处理,1,因为在单词计数程序任务中,不同单词之间的出现的次数不存在相关性,相互独立,所以,可以把不同的单词分发给不同的机器进行并行处理。因此,可以采用MapReduce来实现单词计数的统计任务。,其次,确定MapReduce程序的设计思路,2,就是把文件内容分解成许多个单词,然后把所有相同的单词聚集到一起,最后计算出每个单词出现的次数。,设计思路,最后,确定MapReduce程序的执行过程。,3,首先把一个大的文件切分成许多个分片,每个分片将会输入到不同节点上形成不同的Map任务。每个Map任务完成从文件块中解析出所有

19、单词的任务。Map的输入采用方式,用文件的行号作为key,文件的一行作为value。Map的输出以单词作为key,1作为value,即表示该单词出现了1次。Map阶段结束以后,会输出许多形式的中间结果。然后Sort会把这些中间结果进行排序并把同一单词的出现次数合并成一个列表,得到形式。比如,就表明hello单词在5个地方出现过。,设计思路,最后,确定MapReduce程序的执行过程。,3,如果使用Combine,那么Combine会把每个单词的List(value)值进行合并,得到形式。比如,表明hello单词出现过5次。在Partition阶段,把Combine的结果分发给不同的Reduce

20、任务。Reduce任务接收到所有分配给自己的中间结果以后,就开始执行汇总计算工作,计算得到每个单词出现的次数并把结果输出到HDFS中。,处理过程,下面通过一个实例对单词计数进行更详细的讲解。,(1)首先,将文件拆分成多个分片。由于测试用的文件较小,只有4行,所以把该文件拆分成2个分片,每个分片包含2行内容。这一步由MapReduce框架自动完成,其中key的值为单词在文本中的偏移量。,处理过程,(2)将分割好的对交给用户定义的Map方法进行处理,生成新的对,处理过程,(3)得到Map方法输出的对后,框架会将它们按照key值进行排序,并执行Combine过程,将key至相同value值累加,得到

21、Map的最终输出结果。,处理过程,(4)Reduce先对从Map端接收的数据进行排序,再交由用户自定义的Reduce方法进行处理,得到新的对,并作为WordCount的输出结果。,PART 05 Hadoop MapReduce 工作机制,Hadoop MapReduce作业执行流程,Hadoop MapReduce作业的执行流程中涉及到4个独立的实体,客户端(Client):编写MapReduce代码,配置作业,提交作业;JobTracker:初始化作业,分配作业,与TaskTracker通信,协调整个作业的执行;TaskTracker:保持与JobTracker的通信,在分配数据片段上执行

22、Map或Reduce任务;HDFS:保存作业的数据、配置信息等,以及保存作业结果。,Hadoop apReduce作业执行流程,Hadoop MapReduce作业执行流程,提交作业,客户端通过run job方法启动作业提交过程。客户端通过JobTracker的getNewJobId()请求一个新的作业ID。客户端检查作业的输出说明,计算作业的输入分片等,如果有问题就抛出异常;如果正常,就将运行作业所需的资源(比如作业Jar文件,配置文件,计算所得的输入分片等)复制到一个以作业ID命名的目录中。通过调用JobTracker的submitJob()方法告知作业准备执行。,初始化作业,JobTra

23、cker接收到对其submitJob()方法的调用后,就会把这个调用放入一个内部队列中,交由作业调度器进行调度;初始化主要是创建一个表示正在运行作业的对象,以便跟踪任务的状态和进程。,Hadoop MapReduce作业执行流程,为了创建任务运行列表,作业调度器首先从HDFS中获取JobClient已计算好的输入分片信息。然后为每个分片创建一个MapTask,并且创建ReduceTask。,分配任务,TaskTracker定期通过“心跳”与JobTracker进行通信,主要是告知JobTracker自身是否还存活,以及是否已经准备好运行新的任务等;JobTracker接收到心跳信息,如果有待分

24、配的任务,就会为TaskTracker分配一个任务,并将分配信息封装在心跳通信的返回值中返回给TaskTracker。,Hadoop MapReduce作业执行流程,执行任务,TaskTracker分配到一个任务后,通过从HDFS把作业的Jar文件复制到TaskTracker所在的文件系统,同时TaskTracker将应用程序所需要的全部文件从分布式缓存复制到本地磁盘;TaskTracker为任务新建一个本地工作目录,并把Jar文件中的内容解压到这个文件夹中。TaskTracker启动一个新的JVM来运行每个任务(包括Map任务和Reduce任务),这样Client的MapReduce就不会影

25、响TaskTracker守护进程;任务的子进程每隔几秒便告知父进程它的进度,直到任务完成。,Hadoop MapReduce作业执行流程,进程和状态的更新,一个作业和它的每个任务都有一个状态信息,包括作业或任务的运行状态,Map和Reduce的进度,计数器值,状态消息或描述(可以由用户代码来设置)。这些消息通过一定的时间间隔由Child JVM向TaskTracker,然后再向JobTracker汇聚。,作业的完成,当JobTracker接收到这次作业的最后一个task已经完成时,它会将job的状态改为“successful”。当JobClient获取到作业的状态时,就知道该作业已经成功完成,

26、然后JobClient打印信息告知用户作业已成功结束,最后从RunJob()方法返回。,Hadoop MapReduce的Shuffle和Sort阶段,Shuffle阶段是指从Map的输出开始,包括系统执行排序以及传送Map输出到Reduce作为输入的过程。Sort阶段是指对Map端输出的Key进行排序的过程。Shuffle阶段可以分为Map端的Shuffle和Reduce端的Shuffle。,Hadoop MapReduce的Shuffle和Sort阶段,Hadoop MapReduce的Shuffle和Sort阶段,Map端的Shuffle,每个输入分片会让一个Map任务来处理,默认情况下

27、,以HDFS的一个块的大小(默认为64M)为一个分片。Map函数开始产生输出时,并不是简单地把数据写到磁盘,因为频繁的磁盘操作会导致性能严重下降。它的处理过程更复杂,数据首先写到内存中的一个缓冲区,并做一些预排序,以提升效率;,每个Map任务都有一个用来写入输出数据的循环内存缓冲区(默认大小为100MB),当缓冲区中的数据量达到一个特定阈值时(默认是80%)系统将会启动一个后台线程把缓冲区中的内容写到磁盘(即Spill阶段)。在写磁盘过程中,Map输出继续被写到缓冲区,但如果在此期间缓冲区被填满,那么Map就会阻塞直到写磁盘过程完成;,Hadoop MapReduce的Shuffle和Sort

28、阶段,Map端的Shuffle,在写磁盘前,线程首先根据数据最终要传递到的Reduce任务把数据划分成相应的分区(Partition)。在每个分区中,后台线程按Key进行排序(快速排序),如果有一个Combiner,便会在排序后的输出上运行;,一旦内存缓冲区达到溢出写的阈值,就会创建一个溢出写文件,因此在Map任务完成其最后一个输出记录后,便会有多个溢出写文件。在MapTask完成前,溢出写文件被合并成一个索引文件和数据文件(多路归并排序)(Sort阶段);,Hadoop MapReduce的Shuffle和Sort阶段,Map端的Shuffle,溢出写文件归并完毕后,Map将删除所有的临时溢

29、出写文件,并告知TaskTracker任务已完成,只要其中一个Map任务完成,ReduceTask就开始复制它的输出(Copy阶段);,Map的输出文件放置在运行Map任务的TaskTracker的本地磁盘上,它是运行Reduce任务的TaskTracker所需要的输入数据。,Hadoop MapReduce的Shuffle和Sort阶段,Reduce端的Shuffle,Copy阶段:Reduce进程启动一些数据复制线程,请求Map任务所在的TaskTracker以获取输出文件;,Merge阶段:将Map端复制过来的数据先放入内存缓冲区中,Merge有3种形式,分别是内存到内存,内存到磁盘,磁

30、盘到磁盘。默认情况下第一种形式不启用,第二种Merge方式一直在运行(Spill阶段)直到结束,然后启用第三种磁盘到磁盘的Merge方式生成最终的文件;,Reduce阶段:最终文件可能存在于磁盘,也可能存在于内存中,但是默认情况下是位于磁盘中的。当Reduce的输入文件已定,整个Shuffle就结束了,然后就是Reduce执行,把结果放到HDFS中。,PART 06 Hadoop MapReduce的主要特点,Hadoop MapReduce的主要特点,MapReduce设计上具有以下主要的技术特点:,向“外”横向扩展,而非向“上”纵向扩展失效被认为是常态把处理向数据迁移顺序处理数据、避免随机

31、访问数据为应用开发者隐藏系统层细节平滑无缝的可扩展性,PART 07 Hadoop MapReduce编程实战,任务准备,首先,在本地创建3个文件,文件file001、file002和file003。,再使用HDFS命令创建一个input文件目录:,hadoop fs-mkdir input,再使用HDFS命令创建一个input文件目录:,hadoop fs-put file001 inputhadoop fs-put file002 inputhadoop fs-put file003 input,编写Map程序,Hadoop MapReduce框架已经在类Mapper中实现了Map任务的基

32、本功能。为了实现Map任务,开发者只需要继承类Mapper,并实现该类的map函数。Map的输入是形式,其中,key是输入文件中一行的行号,value是该行号对应的一行内容。所以,map函数的输入类型为。Map函数的输出也是形式,其中,key是单词,value为该单词出现的次数。所以,map函数的输出类型为。,编写Map程序,public static class CoreMapper extends Mapper private static final IntWritable one=new IntWritable(1);private static Text label=new Text

33、();public void map(Object key,Text value,Mapper.Context context)throws IOException,InterruptedException StringTokenizer tokenizer=new StringTokenizer(value.toString();while(tokenizer.hasMoreTokens()label.set(tokenizer.nextToken();context.write(label,one);,编写Map程序,Map任务结束后,三个文件的输出结果如下:,编写Reduce程序,Red

34、uce需要完成的任务就是把输入结果中的数字序列进行求和,从而得到每个单词的出现次数。在执行完map函数之后,会进入Shuffle阶段,在这个阶段中MapReduce框架会自动将Map阶段的输出结果进行排序和分区。,编写Reduce程序,Shuffle阶段执行完毕之后,再分发给相应的Reduce任务去处理。Reduce端还会把同一个key,也就是同一单词的键值对进行合并,形成形式的输出。经过Reduce端Shuffle过程后的结果如下:,编写Reduce程序,Reduce阶段需要对输入数据value中的数字序列进行求和,从而得到每个单词的出现次数。,public static class Cor

35、eReducer extends Reducer private IntWritable count=new IntWritable();public void reduce(Text key,Iterable values,Reducer.Context context)throws IOException,InterruptedException int sum=0;for(IntWritable intWritable:values)sum+=intWritable.get();count.set(sum);context.write(key,count);,编写Reduce程序,当Re

36、duce阶段结束时,就可以得到最终输出的结果,编写main函数,为了使用CoreMapper和CoreReducer类进行真正的数据处理,还需要在main函数中通过Job类设置Hadoop MapReduce程序运行时的环境变量,public static void main(String args)throws Exception Configuration conf=new Configuration();String otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs();if(otherArgs.length!=

37、2)System.err.println(Usage:wordcount);System.exit(2);,编写main函数,Job job=new Job(conf,WordCount);/设置环境参数job.setJarByClass(WordCount.class);/设置程序的类名job.setMapperClass(CoreMapper.class);/添加Mapper类job.setReducerClass(CoreReducer.class);/添加Reducer类job.setOutputKeyClass(Text.class);/设置输出key的类型job.setOutput

38、ValueClass(IntWritable.class);/设置输出value的类型FileInputFormat.addInputPath(job,new Path(otherArgs0);/设置输入文件路径FileOutputFormat.setOutputPath(job,new Path(otherArgs1);/设置输出文件路径 System.exit(job.waitForCompletion(true)?0:1);,运行程序,单词计数(WordCount)的全部代码,import java.io.IOException;import java.util.StringTokeni

39、zer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hado

40、op.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;,运行程序,public class WordCount public static class CoreMapper extends Mapper private final static IntWritable one=new IntWritable(1);private Text wor

41、d=new Text();public void map(Object key,Text value,Mapper.Context context)throws IOException,InterruptedException StringTokenizer itr=new StringTokenizer(value.toString();while(itr.hasMoreTokens()word.set(itr.nextToken();context.write(word,one);,运行程序,public static class CoreReducer extends Reducer p

42、rivate IntWritable result=new IntWritable();public void reduce(Text key,Iterable values,Reducer.Context)throws IOException,InterruptedException int sum=0;for(IntWritable val:values)sum+=val.get();result.set(sum);context.write(key,result);,运行程序,public static void main(String args)throws Exception Con

43、figuration conf=new Configuration();String otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs();if(otherArgs.length!=2)System.err.println(Usage:wordcount);System.exit(2);,运行程序,Job job=new Job(conf,WordCount);/设置环境参数job.setJarByClass(WordCount.class);/设置程序的类名job.setMapperClass(CoreMapper.

44、class);/添加Mapper类job.setReducerClass(CoreReducer.class);/添加Reducer类job.setOutputKeyClass(Text.class);/设置输出key的类型job.setOutputValueClass(IntWritable.class);/设置输出value的类型FileInputFormat.addInputPath(job,new Path(otherArgs0);/设置输入文件路径FileOutputFormat.setOutputPath(job,new Path(otherArgs1);/设置输出文件路径 Sys

45、tem.exit(job.waitForCompletion(true)?0:1);,运行程序,在程序开始的地方引用了Hadoop的几个核心组件包,它们实现了Hadoop MapReduce框架。,运行程序,编译WordCount程序需要三个Jar包,为了简便起见,首先把这三个Jar添加到CLASSPATH:,$export CLASSPATH=/usr/local/hadoop/share/hadoop/common/hadoop-common-2.7.3.jar:$CLASSPATH$export CLASSPATH=/usr/local/hadoop/share/hadoop/mapre

46、duce/hadoop-mapreduce-2.7.3.jar:$CLASSPATH$export CLASSPATH=/usr/local/hadoop/share/hadoop/common/lib/common-cli-1.2.jar:$CLASSPATH,假设当前工作目录为“/user/local/Hadoop”,运行程序,然后使用JDK包中的工具对代码进行编译。,$javac WordCount.java,编译之后,在文件目录下可以发现有3个“.class”文件,这是Java的可执行文件。然后将它们打包并命名为wordcount.jar。,$jar cvf wordcount.jar*.class,启动Hadoop系统,就可以运行程序。,$./bin/Hadoop jar wordcount.jar WordCount input output,最后,查看结果。,$./bin/Hadoop fs-cat output/*,习题,习题,习题,谢谢,FOR YOUR LISTENING,Handge CO.LTD.2016.12.09,

展开阅读全文
相关资源
猜你喜欢
相关搜索
资源标签

当前位置:首页 > 生活休闲 > 在线阅读


备案号:宁ICP备20000045号-2

经营许可证:宁B2-20210002

宁公网安备 64010402000987号