MapReduce分布式计算平台.doc

上传人:小飞机 文档编号:3981264 上传时间:2023-03-30 格式:DOC 页数:25 大小:106.50KB
返回 下载 相关 举报
MapReduce分布式计算平台.doc_第1页
第1页 / 共25页
MapReduce分布式计算平台.doc_第2页
第2页 / 共25页
MapReduce分布式计算平台.doc_第3页
第3页 / 共25页
MapReduce分布式计算平台.doc_第4页
第4页 / 共25页
MapReduce分布式计算平台.doc_第5页
第5页 / 共25页
点击查看更多>>
资源描述

《MapReduce分布式计算平台.doc》由会员分享,可在线阅读,更多相关《MapReduce分布式计算平台.doc(25页珍藏版)》请在三一办公上搜索。

1、MapReduce 分布式计算平台 编程示例文档名称MapReduce 分布式计算平台编程示例项目名称Hadoop分布式系统研究,改进,应用作者李相娜,王守彦,马如悦,朱冠胤文档提交日期2008-4-3版本Version 1.01. MapReduce介绍11.1 编程模式12.2 简单例子12 用户自定义接口32.1 map函数32.2 Reduce函数32.3输入和输出格式42.4 partitioner函数42.5 Combiner函数43 Hadoop MapReduce平台使用53.1 streaming介绍53.2 C语言Map-Reduce程序示例63.2.1计算任务63.2.2

2、 Mapper算法设计73.2.3 Reducer算法设计83.2.4 作业提交命令93.3 shell Map-Reduce程序示例93.3.1计算任务93.3.2 map实现103.3.3 reduce实现113.3.4 作业提交命令114技巧124.1 顺序保证124.2 本地执行124.3 状态信息135 实际经验135.1 spider rubbish-mine全库挖掘项目135.2 Rank组page共现信息计算145.3 PS日志分析计算155.4 用户访问信息展现166 参考资料17附录一181. MapReduce介绍MapReduce是一个用于处理海量数据的分布式计算框架。

3、这个框架解决了诸如数据分布、工作调度、容错、机器间通信等复杂问题,可以使没有并发处理或者分布式系统经验的工程师,也能很轻松地写出简单的、能够应用于成百上千台机器上处理大规模数据的并发程序。1.1 编程模式MapReduce基于“分而治之”的思想,将计算任务抽象成Map和Reduce两个计算过程,其实可以简单理解为“分散运算合并结果”的过程。一个MapReduce任务首先会把输入数据分割成不相关的若干键/值对(key/value)集, 这些键/值对会由多个map任务来并行地处理(一个集合划分对应一个map任务)。MapReduce会对map的输出(一组中间键值对)进行排序, 并将同属于一个键(k

4、ey)的值(value)组合在一起(key/list of values)作为reduce任务的输入,由reduce任务计算出最终结果并输出。基本流程如下:(input) - map - - combine - - reduce - (output)通常计算任务的输入和输出都是存放在文件里的,并且这些文件被存放在MapReduce之下的分布式文件系统(DFS)中,系统会尽量使存储结点同时成为相应的计算节点,减少数据在网络中流动,最大程度地节省带宽消耗。2.2 简单例子下面给出一个词频统计的简单例子来演示一下Map-Reduce的过程:输入两篇文档:doc1: MapReduce is a pr

5、ogramming modeldoc2: MapReduce is easy to useMap任务负责统计每篇文档中出现的单词,Reduce任务负责对每个单词出现的总次数进行统计。如果将每一篇文档作为一个输入划分,计算结果只输出到一个文件中;MapReduce将启动2个map任务和1个reduce任务进行处理。数据转换过程如下:Map 输出:Map 1: (MapReduce, 1), (is, 1), (a, 1), (programming, 1), (model, 1). Map 2: (MapReduce, 1), (is, 1), (easy, 1), (to, 1), (use,

6、 1).Reduce输入:(MapReduce, 1,1), (is, 1,1), (a, 1), (programming, 1), (model, 1), (easy, 1), (to, 1), (use, 1).Reduce输出:(MapReduce, 2), (is, 2), (a, 1), (programming, 1), (model, 1), (easy, 1), (to, 1), (use, 1).还有一些有趣的例子,也可以简单地通过MapReduce计算模型来展示:分布式Grep:如果map函数检查输入行,满足条件的时候,map函数就把本行输出。reduce函数就是一个直通

7、函数,简单的把中间数据输出就可以了。URL访问频率统计: map函数处理webpag请求和应答(URL,1)的log。Reduce函数把所有相同的URL的值合并,并且输出一个成对的(URL,总个数)。逆向Web-Link 图:map函数输出所有包含指向target URL的source网页,用(target,source)这样的结构对输出。Reduce函数局和所有关联相同target URL的source列表,并且输出一个(target,list(source)这样的结构。主机关键向量指标(Term-Vector per Hosts):关键词向量指标简言之就是在一个文档或者一组文档中的重点次出

8、现的频率,用(word,frequency)表达。map函数计算每一个输入文档(主机名字是从文档的URL取出的)的关键词向量,然后输出(hostname,关键词向量(Term-Vector))。reduce函数处理所有相同host的文档关键词向量。去掉不常用的关键词,并且输出最终的(hostname,关键词向量)对。逆序索引:map函数分析每一个文档,并且产生一个序列(word,documentID)组。reduce函数处理指定word的所有的序列组,并且对相关的document ID进行排序,输出一个(word,list(document ID)组。所有的输出组,组成一个简单的逆序索引。通过

9、这种方法可以很容易保持关键词在文档库中的位置。分布式排序: map函数从每条记录中抽取关键字,并且产生(key,record)对。reduce函数原样输出所有的关键字对。可见,使用MapReduce框架,RD可以只关心根据怎样将问题进行分解,哪些操作是Map操作,哪些操作是Reduce操作,很大程度上简化了整个编程模型。2 用户自定义接口2.1 map函数需要由用户提供,用于处理输入的键值对,并且产生一组中间的(intermediate)键值对。MapReduce把所有具有相同中间key 的中间value聚合在一起,然后把它们提供给reduce函数.2.2 Reduce函数同样也需要由用户提供

10、,它处理中间键值对、以及这个中间键值相关的值集合,合并这些值,最后形成一个相对较小的值集合。通常一个单次Reduce执行会产生0个或者1个输出值。提供给Reduce函数的中间值是通过一个迭代器(iterator)来实现的,这就让我们可以处理超过内存容量的值列表。2.3输入和输出格式用户可以定义输入数据和输出结果的格式。输入格式(InputFormat)定义了输入数据的划分方法,并实现输入键值对到相应的map任务中。输出格式(OutputFormat)定义了reduce输出数据的格式和位置。2.4 partitioner函数用户可以提供一个partitioner函数,用来根据Reduce任务数对

11、map输出的中间结果进行划分。一般默认使用Hash划分的方法(例如hash(key)mod R),就可以得到分散均匀的分区。不过,在某些情况下,对key用其它的函数进行分区可能更有用。比如,某些情况下key是URL,那么我们希望所有对单个host的入口URL都保存在相同的输出文件。为了支持类似的情况,用户可以提供一个特定的分区函数,比如使用hash(hostname(urlkey)mod R作为分区函数,让指向同一个hostname的URL分配到相同的输出文件中。2.5 Combiner函数用户可以提供一个Combiner函数,先将map输出的中间结果在本地进行合并,然后再通过网络发送给red

12、uce任务。例如:在单词统计的计算中,Combiner函数可以将map的输出 Map output: (“hi”, 1), (“Owen”, 1), (“bye”,1), (“Owen”,1)合并为 Combiner output: (“Owen”, 2), (“bye”, 1), (“hi”, 1)作为最终的map任务输出。Combiner函数在每一个map任务的机器上执行。通常这个combiner函数的代码和reduce的代码实现上都是一样的。reduce函数和combiner函数唯一的不同就是MapReduce对于这两个函数的输出处理上不同。对于reduce函数的输出是直接写到最终的输出

13、文件。对于combiner函数来说,输出是写到中间文件,并且会被发送到reduce任务中去。3 Hadoop MapReduce平台使用3.1 streaming介绍Hadoop MapReduce是对MapReduce框架的一个java开源实现,并且可以运行非java语言编写的用户程序。它提供了一个非常有用的工具 - streaming,可以用STDIN (标准输入)和STDOUT (标准输出)与用户编写的Map和Reduce进行数据的交换。任何能够使用STDIN和STDOUT的编程语言都可以用来编写MapReduce程序,比如我们用Python的sys.stdin和sys.stdout,或

14、者是C中的stdin和stdout。 streaming的使用方法:Usage: $HADOOP_HOME/bin/hadoop -config dir jar $HADOOP_HOME/hadoop-streaming.jar options常用的命令选项:-input 指定输入文件的存放路径-output 指定结果文件的输出位置-mapper 指定map可执行程序、或实现类名称-reducer 指定reduce可执行程序、或实现类名称例如:$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar -input /user/m

15、e/samples/cachefile/input.txt -mapper xargs cat -reducer cat -output /user/me/samples/cachefile/out -jobconf mapred.map.tasks=1 -jobconf mapred.reduce.tasks=1 -jobconf mapred.job.name=Experiment3.2 C语言Map-Reduce程序示例3.2.1计算任务计算Apache日志中各个Refer站点出现的次数,按降序排列输出。日志格式如下: 221.4.220.234 - - 23/Mar/2007:00:0

16、0:00 +0800 GET / HTTP/1.1 200 14 mod_gzip: 0pct. 其站点名为 3.2.2 Mapper算法设计mapper读取标准输入,将输入保存在局部临时变量m中,从m字符串中取”http:/后只包含由0-9, a-z, A-Z和.号以及-号和_号字符组成的字符串,本程序认为上述子串为合法的url。 将url部分取出来以后存储到glib提供的哈希表中,待输入完成以后,通过遍历该哈希表,输出归并完成以后的url部分以及该url总共出现的频度。mapper的输入:222.245.11.184 - - 28/Mar/2007:00:00:00 +0800 GET /

17、i?z=0&cl=2&ct=201326592&sn=&lm=-1&cm=1&sc=0&bu=&rn=21&tn=baiduimage&word=%B3%C2%BB%DB%C1%D5&pn=162 HTTP/1.1 200 40562 mod_gzip: 0pct. BAIDUID=D2B56887CA8714B51143E38F04880822; iCast_Rotator_1_2=1175011181843; icast_44326_682=1以上为一条合法的输入记录, mapper将以”http:/后的i为合法url为开始, 到/结束。因此以上记录的url部分为, 将该条记录增加到哈希表

18、中。Mapper输出: 1 5 1-每行两列,列之间用tab分开。补充说明: Mapper完成标准输入部分的归并操作,但不对记录排序。url部分目前只匹配http:/打头的,对含有大写字母如HttP:/不做处理, 对于https:/不支持。url部分区分大小写。 如和WWW.BAIDU.COM将作为不同的url统计。3.2.3 Reducer算法设计Reducer读取标准输入,将输入的字段归并,然后输出和输入类似的结果。Reducer以t分开每行输入的两列,然后以第一列为关键字去和上一次输入的关键字比较,如果两者匹配,则将该输入列t分隔符后的数值部分累加到上一次的数值结果中。 如果不匹配,则作

19、为新的纪录重新开始,并将上一记录的结果标准输出。输出部分每行两列,第一列为key, 第二列为value部分,两列以t分割。在匹配过程中参考了内核进程调度O(1)算法,通过交换指针,减少了字符串拷贝与重置的过程,提高效率。Reducer输入: 2 3 5每行两列,列间以t分开。Reducer输出: 5 /对该条记录进行了归并 5每行两列,列间以t分开字符串匹配部分说明:考虑了两种特殊情况,举例如下:2www.kernel.org 10www.kernel.org 20第一种是 后没有数字; 此时输出为0第二种是和www.kernel.org 10 这两列之间为一空行(该行以回车结束),此时忽略该

20、行。因此上述输入的输出为:20www.kernel.org303.2.4 作业提交命令 ./hadoop jar ./contrib/hadoop-0.15.1-streaming.jar -mapper /home/hadoop/access_log_mapreduce/mapper_example -reducer /home/hadoop/access_log_mapreduce/reducer_example -input /mry/access_log/ -output /hdfs/output/ -jobconf mapred.job.name=”access_log_mapred

21、uce_test”其中: -mapper给出了map程序,-reducer给出了reduce程序,-input给出了数据源路径,-output给出了数据结果存储路径,-jobconf mapred.job.name 给出了作业名称。【程序见附录一】3.3 shell Map-Reduce程序示例3.3.1计算任务同上。3.3.2 map实现streaming接口会读取每一个切割块的日志行,然后以行开头所在的文件偏移为key值,以整行的内容为value值,通过管道送入脚本程序,脚本程序使用awk来统计各个站点的出现次数,最后输出map结果为以站点url为key值,url在这个切割块中的出现次数为

22、value值。比如:经过streaming接口后,通过管道送入map脚本程序的输入为:109 221.4.220.234 - - 23/Mar/2007:00:00:00 +0800 GET / HTTP/1.1 200 14 mod_gzip: 0pct. 568 221.4.220.224 - - 23/Mar/2007:00:00:02 +0800 GET / HTTP/1.1 200 14 mod_gzip: 0pct. 722 221.4.220.214 - - 23/Mar/2007:00:00:05 +0800 GET / HTTP/1.1 200 14 mod_gzip: 0p

23、ct. map脚本程序的输出是: 1 2脚本代码为:#!/usr/bin/awk -f urlstr = $13; if(index(urlstr,http:/) = 2 )urlstr = substr(urlstr, 9, length(urlstr)-2); elseurlstr = substr(urlstr, 2, length(urlstr)-2); theindex=index(urlstr, /); if(theindex != 0)urlstr = substr(urlstr,1,theindex-1); sortarrayurlstr+; END for(i in sort

24、array) print i, sortarrayi;3.3.3 reduce实现streaming接口会将已经聚合好的(reduce前的排序和聚合)key/value值送入reduce脚本。reduce脚本首先将所有key相同(即为同一个url)的value进行累加,最后输出key/value对。输出为次数/url。比如输入为: 1 2 3 2 2输出则为: 6 4脚本代码为:#!/usr/bin/awk -fBEGIN num = 0; urlstr = if (NR = 1) urlstr = $1; num = $2; else if (urlstr = $1) num += $2;

25、else print urlstr, num; urlstr = $1; num = $2; END print urlstr, num; ./hadoop jar ./contrib/hadoop-0.15.1-streaming.jar -mapper /home/hadoop/access_log_mapreduce/mapper.awk -reducer /home/hadoop/access_log_mapreduce/reducer. awk -input /mry/access_log/ -output /hdfs/output/ -jobconf mapred.job.name

26、=”access_log_mapreduce_test”3.3.4 作业提交命令4技巧4.1 顺序保证如果确保在给定的分区中,中间键/值对的处理顺序是根据key增量处理的,便可以很容易生成有序的输出文件。这对于输出文件格式需要支持对key的随机存取时很有用,并且对输出数据再进行排序也变得很容易。 4.2 本地执行因为实际计算是分布在系统中执行的,通常是在好几千台计算机上进行,并且是由master机器进行动态调度的任务;所以在把用户程序提交给MapReduce平台之前,可以先在本地机器上按MapReduce操作顺序将程序执行一遍,确保程序都调试通过。可以使用以下类似命令:cat input |

27、mapper | sort | reducer output4.3 状态信息Hadoop Namenode(Master)内部有一个HTTP服务器,并且可以输出状态报告。状态页面提供了计算的进度报告,比如有多少任务已经完成,有多少任务正在处理,输入的字节数,中间数据的字节数,输出的字节数,处理百分比,等等。用户可以根据这些数据来预测计算需要大约执行多长时间,是否需要为这个计算增加额外的计算资源。这些页面也可以用来分析为何计算执行的会比预期的慢。此外,状态页面也显示了哪些worker失效了,以及他们失效的时候上面运行的map和reduce任务。这些信息对于调试用户代码中的bug很有帮助。5 实际

28、经验5.1 spider rubbish-mine全库挖掘项目过程:从全库中抽词,获得网页模式,根据模式相似度去除垃圾页面。困难:数据量巨大,全部文本数据480TB,压缩后120TB,单机抽词速度很慢。抽词后结果35TB,需要对35TB数据排序,把相同模式的网页汇聚在一起,普通机群无法完成,编程困难比较大。平台使用解决方法:使用50台机器搭建机群,使用250个核并行抽词并局部排序,抽词后对结果使用分布式排序,送给垃圾页面处理程序处理。项目实施困难:1 网页格式和平台默认格式不一致;2 实验机群异构性很强,存储能力不平衡;3 实验机群计算能力不平衡;4 编码问题;5 数据导入比较慢;6 平台容量

29、有限,不足以存下抽词结果;7 存储能力极不平衡时,数据导入停滞。处理能力:1 使用不压缩方式,200个核每天处理48TB不压缩数据;2 使用压缩方式,250个核每天处理15TB压缩数据,数据压缩比为4:1。迁移成本:完成程序从单机迁移到平台上并测试耗时一星期,由于平台实现了分布式排序,大大简化了程序编写。收益:原计划一个月计算完成的项目迁移到250个核的机群上,经过多次改进和调整,目前只需8-9天。5.2 Rank组page共现信息计算过程:对网页进行抽词,对抽词结果两两组合,统计所有网页中按各种组合出现的总次数。困难:数据量接近1TB,抽词组成词对后数据量放大5倍,抽词组合速度很慢。自己构建

30、机群处理,程序实现周期比较长,节点失效后不能自动处理,无法实现计算资源和存储计算的平衡,节点数超过一定数量后难于维护;中间结果巨大,如果使用少量机器,网络压力很大。平台使用解决方法:使用50台机器并行进行多轮抽词,多抽词结果做两次reduce计算,完成最终结果。项目实施中的困难:1 中间结果巨大,但是平台网络异构比较严重,为了保障其他业务不受影响,只能把任务切割的尽量小;2 在本次计算中,根据资源情况,参数配置并没有达到最优,未能发挥平台最佳性能;3 编码问题。迁移成本:迁移到分布式平台的成本适中,提取出核心代码就能够在分布式平台上使用,源程序的大部分代码为数据分发功能,使用分布式平台大大减少

31、了代码编写量。大概使用一天时间完成代码迁移和单元测试。收益:NLP搭建了7台机器的小分布式系统,该系统使用2周时间处理了3000W网页,计划至少处理1亿以上网页,与预期相差太多,只好使用分布式计算平台,使用分布式平台的最大收益是计算时间的大量较少,本次计算了2亿网页使用了18小时,能够满足时间要求。5.3 PS日志分析计算需求:竞争对手流量来源分析,用户主动输入url分析,大网站用户访问路径挖掘输入数据量在200GB-400GB之间,属于分布式平台上的典型应用。迁移成本:基本没有迁移成本,平台可以实现分布式排序,简化了程序编写。收益:之前使用单机进行数据分析,一个分析需要耗时2小时左右,使用分

32、布式平台耗时在5分钟以内。使用分布式平台提供的大容量存储能力可以保存1个月以上的日志数据,用来做深度分析。5.4 用户访问信息展现需求:把用户一天的访问信息导入数据平台,pm可以根据关键字迅速找到访问数据,用于分析用户行为。困难:数据量巨大,单机平台(数据库)效率低,之前使用RD自己办些的索引系统实现,实现周期比较长,可扩展性比较差。平台使用解决办法:使用在分布式存储之上构建的分布式索引系统,相当于分布式数据库,在数据倒入时,自动建索引,查询时也使用分布式查询。迁移成本:编写程序把数据倒入即可,不需要考虑索引和效率问题。项目实施困难:目前的分布式索引系统还不太稳定,只适用于线下系统。收益:降低

33、了系统实现难度和维护成本,增加了可扩展性。6 参考资料1. Google MapReduce paper2. Hadoop Map-Reduce Tutorial3. Hadoop Streaming附录一mapper_example.c/* * On platform 32: gcc -I/usr/include/glib-2.0 -I/usr/lib/glib-2.0/include -lglib-2.0 -o mapper mapper_example.c -O3 * On platform 64: gcc -I/usr/include/glib-2.0 -I/usr/lib64/gli

34、b-2.0/include -lglib-2.0 -o mapper mapper_example.c -O3 */ #include #include #include #include #define HTTP_HEADER_LEN 9#define HTTP_HEADER http:/#define MAX_LEN 8192#define MAX_KEY_LEN 256#define INIT 1#define HASH_FOUND 1#define HASH_NOT_FOUND 0#define FREE(ptr) if(ptr)!=NULL) free(ptr);(ptr) = NU

35、LL;static GHashTable* g_url_hash = NULL;/* * run for each hash pair, called by g_hash_table_foreach() */inline void hash_table_iterator(gpointer key, gpointer value, gpointer user_data) printf(%st%un, key, *(long*)value);FREE(key);FREE(value); /* * function: update the keys value in hash, if key exi

36、st, update value, if key not exist, insert into hash. * return: 1 if key exist in hash, 0 for not exist. */inline int hash_table_update(GHashTable *hash, const char *key, long value)char *pkey= NULL;long *pvalue = NULL;int length = 0;int res = -1;pvalue = g_hash_table_lookup(hash, key);if(pvalue = N

37、ULL)res = HASH_NOT_FOUND;length = strlen(key);pkey = (char*)malloc(sizeof(char)*(length+1);memset(void*)pkey, 0, length+1);memcpy(pkey, key, length);pvalue = (long*)malloc(sizeof(long);*pvalue = value;g_hash_table_insert(hash, pkey, pvalue); else *(long*)pvalue += value;res = HASH_FOUND;return res;/

38、* * compute next value for sting s. */void getnext(const char *s, int next) int i,j; i=0;j=-1;next0=-1; while(si) if(j=-1|si=sj) +i;+j;nexti=j; else j=nextj; /* * return the offset in string m, if string s was founded in m. start from 0, -1 means not found. */inline int kmp(const char *m, const char

39、 *s, const int next) int i,j; i=0;j=0; while(mi) if(j=-1|mi=sj) +i;+j; if(sj=0) return (i-j); else j=nextj; return -1;/* * Function: using kmp algorithm to math string s in string m, store the matched string in key. * Return: NULL for not matched, or key pointer. */inline char* getkey(const char *m,

40、 const char *s, const int next, char *key)int urlStart = 0, urlEnd = 0, kmpres = 0;kmpres = kmp(m, s, next);if(kmpres = -1)return NULL;urlStart = urlEnd = kmpres + strlen(s);while(murlEnd&(murlEnd=a&murlEnd=0&murlEnd=A&murlEnd=Z)|(murlEnd=-)|(murlEnd=_)+urlEnd; if(urlEnd!=urlStart)memset(void*)key,

41、0, sizeof(key);memcpy(key, &murlStart, urlEnd-urlStart);return key;return NULL;int main(int argc, char *argv)char mMAX_LEN,sHTTP_HEADER_LEN;char keystrMAX_KEY_LEN;int nextHTTP_HEADER_LEN; memset(void*)m, 0, sizeof(m);memset(void*)keystr, 0, sizeof(keystr);/ init s string (kmp algorithm)memset(void*)s, 0, sizeof(s); snprintf(s, sizeof(s), %s, HTTP_HEADER);/ co

展开阅读全文
相关资源
猜你喜欢
相关搜索

当前位置:首页 > 生活休闲 > 在线阅读


备案号:宁ICP备20000045号-2

经营许可证:宁B2-20210002

宁公网安备 64010402000987号