hadoop[1].源码阅读总结.doc

上传人:文库蛋蛋多 文档编号:2393127 上传时间:2023-02-17 格式:DOC 页数:11 大小:526KB
返回 下载 相关 举报
hadoop[1].源码阅读总结.doc_第1页
第1页 / 共11页
hadoop[1].源码阅读总结.doc_第2页
第2页 / 共11页
hadoop[1].源码阅读总结.doc_第3页
第3页 / 共11页
hadoop[1].源码阅读总结.doc_第4页
第4页 / 共11页
hadoop[1].源码阅读总结.doc_第5页
第5页 / 共11页
点击查看更多>>
资源描述

《hadoop[1].源码阅读总结.doc》由会员分享,可在线阅读,更多相关《hadoop[1].源码阅读总结.doc(11页珍藏版)》请在三一办公上搜索。

1、CommonIPC/RPC Server大概流程基于NIO,Listener关注OP_ACCEPT事件,当有客户端连接过来,Accept后,从readers中选取一个Reader将客户端Channel注册到Reader中的NIO selector,并新建一个Connection对象关联客户端Channel,Reader关注OP_READ事件.客户端建立连接后,首先发送的是ConnnectionHeader包含协议名,用户组信息,验证方法,Connection会根据以上信息进行校验.之后将是先读取4位的长度代表这次请求的数据的长度,然后一直等待事件触发读取够长度,将读取的数据 解码为调用id和p

2、aram,新建一个Call对象(关联Connection)放入call队列中,handlers中的Handler会将Call中callQuene中取走,Call中的param实际为Invocation对象,包含调用方法名,参数名,参数类型,由这些信息使用Java反射API调用Server的instance对象,获取返回值,组织返回数据,写入Call的response属性中,马上调用responder的doRespond方法,将Call加入到Connection的responseQuene中,如果responseQuene长度等于1做一次NIO写操作,如果不能一次性能够将数据写完,将客户端cha

3、nnel注册到responder关注写事件OP_WRITE,下一次继续写,如果长度不为1证明该channel已经注册到了responder了直接加入队列,由responder线程后续处理.NOTE:客户端关闭流后出发一次读操作,返回为-1,Server关闭连接CurCall与获取客户端IPHandler获取一个Call后,会将Server的curCall(ThreadLocal类型)设置为当前的Call,调用Instance方法实际是在Handler线程中,在Instance的方法内就可以使用Server提供的方法来获取客户端IPPing在上述过程中如果读取的4位长度为-1代表是客户端PING

4、操作清理空闲连接如果一定时间ipc.client.connection.maxidletime没有读取到数据并且当前连接现有Call数目为0,则视为空闲连接,Listener会在每次接受完新连接之后进行一次清理,最多清理ipc.client.kill.max个连接,如果出现OutOfMemoryError则强制清理全部空闲连接DataNode,TaskTractor设置的心跳时间需小于空闲时间.清理长时间未发送到客户端的响应注册到responder的Call如果长时间没有发送到客户端,每隔一段时间会清理掉涉及参数ipc.server.handler.queue.sizecallQuene队列大

5、小,随集群增大而增大,ipc.server.max.response.size如果返回的结果序列化后大小大于这个值,重置缓冲区ByteArrayOutputStream释放内存.ipc.server.read.threadpool.sizereaders个数ipc.server.tcpnodelayTCP优化参数Nagles algorithmhandlerCounthandler个数,由构造参数指定,在DN,TT中配置socketSendBufferSizesocket设置ipc.server.listen.queue.sizesocket设置socket.bind(address, bac

6、klog)ipc.client.idlethreshold总连接数超过多少后,开始清理空闲连接ipc.client.kill.max一次最多清理多少个空闲连接IPC/RPC ClientClient代理模式,调用RPC.getProxy实际上返回的一个代理对象,当调用方法的时候实际调用的是Invoker, Invoker将协议,调用的方法名,参数,参数类型封装成Invocation对象经过client发送到server,并读取返回流,根据流中的id,判断是服务器返回的是那次调用的结果.Connection线程负责读取Server返回值,在读取的过程中,调用Client的线程会wait直到Con

7、nection获取到返回值.读取时候如果超时(ipc.ping.interval)就发送一次ping,如果没有出现IOException就继续读取,Conneciton可以根据标识(地址,用户组信息,协议)共用.IPC/RPC AuthingHDFSName协议ClientProtocol:客户端调用协议,涉及文件操作,DFS管理,升级(DFSAdmin)DatanodeProtocol:DN与NN通讯协议,注册,BlockReport,心跳,升级NamenodeProtocol: SN和NN通讯协议,通知NN使用新的fsimage和editRefreshAuthorizationPolicy

8、Protocol, RefreshUserMappingsProtocol FSNamesystem数据结构LightWeightGSetGset 类似Set但提供get方法,主要用于存储BlockInfo,根据Block找到BlockInfo其中一个实现LightWeightGSet,利用数组存储元素,用链表解决冲突问题,类似HashMap但是没有ReHash操作BlocksMap初始化LightWeightGSet时候,会根据JVM内存将数组的大小初始为最大能占用的内存数(4194304 -Xmx1024M)加上高效的hash映射算法, LightWeightGSet在BlockInfo数

9、量比较小的时候get性能逼近数组.BlockInfo继承Block,没有重写hashCode和equals方法,在Block中equals方法只要求传入的对象是Block实例并且blockId相等,就认为两个对象相等,故存储BlockInfo时候分配的在数组中的Index和Get时候由Block的hashCode定位是一致的.BlockMapsBlockMaps负责管理Block以及Block相关的元数据Block 有3个long型的属性blockId(随机生成)numBytes(块大小),generationStampBlockInfo继承Block添加了2个属性,实现了用户LightWei

10、ghtGSet的LinkedElement接口inode:引用文件Inodetriplets:3Xreplication的数组,即replication 个组,每组有3个元素,第一个指向DatanodeDescriptor,代表在这个DN上有一个Block,第二个和第三个分别代表DN上的上一个blockInfo和下一个blockInfoDatanodeDescriptor有一个属性blockList指向一个BlockInfo,因为每个BlockInfo中的triplets中有一组记录着对应的DN上的上一个,下一个BlockInfo,所以从这个角度来看BlockInfo是一个双向链表.新建文件打

11、开输入流后,写入,会在namenode中分配BlockInfo,当Block写入到分配的DN后,DN在发送心跳时候会将新接受到的块报告给NN,此时NN在将triplets可用的组关联到DN(DD).(例子前提假设:新建的集群没有文件,操作是在DN1上,此时很大可能性每次分配块的时候都会首选本地DN1,bkl_* *实际为随机数)namenode中分配BlockInfo 并加入Gset中,blk_1,0-64M,此时DN1的blockList为nullDN1向NN报告接收了新的Block blk_1 ,NN从blocksMap中根据Block blk_1找到BlockInfo blockInfo

12、1 将triplets的可用组(=null)的第一位关联到DN1(DatanodeDescriptor1),将DN1的blockList指向blockInfo1此时blockList指向的是blockInfo1NN分配blockInfo2,DN1向NN报告接受到了信的Block blk_2,NN找到blockInfo2后1,将triplets的可用组(=null)的第一位关联到DN1(DatanodeDescriptor1)2,将第三位指向blockList即blockInfo1,2,将blockInfo1的对应DN1的组的第二位指向blockInfo24,将DN1的blockList指向bl

13、ockInfo1升级LoadBalance磁盘占用,还是分布策略 可能出现一个DN上两个相同的Block么.MapReduce命令行运行bin/hadoop jarjarFilemainClassargs.设置JVM启动参数,将lib,conf等加入classpath,启动JVM运行RunJarRunJar阶段:1,设定MainClass如果jar设置了Manifest,则作为MainClass否则取第二个参数2,在临时目录(hadoop.tmp.dir)中建立临时目录(File.createTempFile(hadoop-unjar, , tmpDir),并注册钩子JVM退出时候删除.3,将

14、Jar解压到建立的临时目录中4,将目录hadoop-unjar38923742,目录hadoop-unjar38923742/class, 目录hadoop-unjar38923742/lib中的每个文件作为URLClassLoader参数,构造一个classLoader.5,将当前线程的上下文ClassLoader设置为classLoader6,以上5步都是为mainClass启动做准备,最后应用反射启动mainClass,将args作为参数MainClassbin/hadoop jar -libjars testlib.jar -archives test.tgz -files file.

15、txt inputjar argsjob.setXXX均将KV设置到了Conf实例中(传值,例如将-file指定的文件设定到Conf中,在submit中获取,从本地复制到hdfs)在Job.submit方法中会向hdfs写入以下信息目录:hdfs:/$mapreduce.jobtracker.staging.root.dir/$user/.staging/$jobId/hdfs:/tmp/hadoop/mapred/staging/$user/.staging/$jobId/目录下文件:job.split-(Split信息)由writeSplits方法写入job.splitmetainfo-(

16、 Split信息元数据,版本,个数,索引)由writeSplits方法写入job.xml-conf对象job.jar-inputjarfiles/-参数-files 逗号分割,交给(DistributedCache管理)archives/-参数-archives 逗号分割, 交给(DistributedCache管理)libjars/-参数-libjars 逗号分割, 交给(DistributedCache管理)split,splitmetainfo(FileSplit)设计目的job.splitmetainfo中保存有Split的在那几个机器上有副本,JT读取这个文件用,用来分配Task使T

17、ask能够读取本地磁盘文件.job.split保存具体的Split,不保存位置信息,因为TT不需要(hdfs决定)JT调度CapacityTaskScheduler,TTTT启动时候,启动线程mapLauncher(用于启动MapTask),reduceLauncher(用于启动ReduceTask), taskCleanupThread(用于清理Task或者Job),TT 通过心跳从JT获得HeartbeatResponse,包含TaskTrackerAction,具体有5种操作LAUNCH_TASK启动任务,将LaunchTaskAction中包装的Task与Conf对象和TaskLaun

18、cher组合成TaskInProgres,然后添加到mapLauncher或者reduceLauncher中的队列中.TaskLauncher构造参数numSlots代表当前TaskTractor能同时执行多少个Task,由参数mapred.tasktracker.map.tasks.maximum, mapred.tasktracker.reduce.tasks.maximum设定,slot意思为: 槽,位置 将TaskTractor的资源抽象化,一般情况下一个task占用一个slot,如果有对资源需求大的Task也可以通过参数来控制(调度器CapacityTaskScheduler设置,未

19、开放给User?)TaskLauncher根据剩余空闲的槽位(numFreeSlots)和队列情况,来从队列中取出Task来运行(synchronized, wait, notify).KILL_TASK杀死任务KILL_JOB杀死和Job相关的任务,放入tasksToCleanup队列中REINIT_TRACKER重新启动TTCOMMIT_TASK提交任务(1, speculative execution 2,need commit file?) OutputCommitterREINIT_TRACKER 重启TT, startNewTask 新的JVM(不是TT的JVM,错误处理,GC)执

20、行Child.class,通过main参数argsMap过程MapTask中会根据jobConf记录的hdfs上的job.split文件以及JT分配的splitIndex获取InputSplit,根据jobConf的配置新建Map和InputFormat,由InputFormat获取RecordReader来读取inputSplit,生成原始original_key, original_value交给Mapper.map方法处理生成gen_key,gen_value,根据partitioner生成partition,成对的(gen_key,gen_value, partition)会先放入一个

21、缓冲区,如图,这个缓冲区分为3级索引(排序kvoffset,复制效率)等这个缓冲区到达一定阀值之后,并不是缓冲区慢之后,SplitThread会标记当前前后界,对界内数据进行排序(现根据partition在根据kv),并写入到磁盘文件中(split.x.out)并记录各个partition段的位置,部分存到内存部分存到磁盘,在这个过程中,map仍然继续进行,如果缓冲区满之后,map线程暂时wait,到SplitThread完毕.当输入读取完毕,随之的SplitThread也结束后,磁盘中中间文件为split.1.out - split.n.out ,索引部分存在内存里面,超过1024*1024

22、个,作为索引文件spill.n.out.index(避免内存不够用).然后通过合并排序将分段的文件(split.x.n)合并排序成一个文件file.out,file.out.index记录partition信息.(详细见MergeQueue)这样在Reduce过程中,通过http请求TT其中需要的partition段(参数reduce),TT根据file.out.index记录的索引信息将file.out的partition段,生成http响应.如果有CombinerSortAndSpillkvoffset达到临界点softRecordLimit,例如100个,设定80个为临界点.Kvbuff

23、er达到临界点softBufferLimit,例如100M,当80M为临界点.目的是为了不让map过程停止浪费时间,但由于IO map可能会慢一点(进一步多磁盘负载).io.sort.mb配置的是图中kvoffset,kvindices,kvbuffer占用的空间总大小Mb.上述参数都可以通过conf.setXXX来配置,根据特定job的特点来设定.来减少Spill次数,同时避免内存溢出.Reduce过程JobInProgress初始化mapred.reduce.tasks个ReduceTask 用参数partition区别.然后JT在心跳过程中,将ReduceTask分给TT执行.Reduc

24、eTask有SHUFFLE, SORT, REDUCE三个阶段SHUFFLE这一阶段是ReduceTask初始化阶段,新建了N(参数控制)个下载线程,来获取Map的输出,TaskTracker中有一个线程会不断的从JT中获取在本TT运行的ReudceTask(s)的JOB的Map完成事件. ReduceTask不断从TT中获取Job的Map完成事件,然后将事件中的Map输出位置交给下载线程来获取.下载的时候,从HTTP响应头获取文件的大小,决定是放在内存中还是写入磁盘.在内存中的数据,满足一定条件会在后台将内存中的数据Merger写入硬盘,在硬盘中的数据,满足一定条件(数目超过了2 * ioS

25、ortFactor - 1)会在后台做Merger.所有的Map输出下载完毕,并且后台Merger线程也结束后,进入SORT阶段.SORT这个阶段还是Merger,将内存和硬盘中的数据,做合并排序(ioSortFactor),使能够高效率的输出key ,values.然后进入REDEUCE阶段.REDUCE这一阶段只要是将上述产生的key value通过ReduceContext转化成key values(ValueIterable),传递给reduce,最后通过输出配置,将reduce的输出写入HDFS(part-r-00000,1,2),或者其他.MergeQueueMR 优化1,控制好每

26、个Map的输入和输出,尽量使Map处理存在本地的Block,一个InputSpilt不要太大,最好使产生的输出能控制在io.sort.mb之内,这样能够减少从内存将输出数据写到磁盘的磁盘的个数. 根据任务将io.sort.mb设置合理,尽量能容纳单个Map全部输出.2, 多磁盘负载,MR中大量的临时输出文件会放在这个下mapred.local.dir = /hd1/mr, /hd2/mr, /hd3/mr hd*为挂载的不同磁盘.LocalDirAllocator从以上多个目录中分配每次创建文件的目录,降低IO负载.(其他SSD)HDFS中逗号分隔的目录(dfs.data.dir, dfs.n

27、ame.dir, dfs.name.edits.dir)是为了冗余.3,Map输出压缩,减少网络传输.4,Reduce阶段,将各个map的输出下载到本地,由于各个Map输出可能有大有小,合适的可以放到内存中,( mapred.job.reduce.total.mem.bytes, mapred.job.shuffle.input.buffer.percent),减少N个下载线程写磁盘.5,设置Java运行内存偏大,GC回收算法. UseConcMarkSweepGC.6,频外心跳mapreduce.tasktracker.outofband.heartbeat,会加重JT负担.预想优化Reduce负载较重(收集N个map输出,执行统计工作),可以通过指定高配置机器,网络节离中心交换机近.做Merger的时候k,vvv,k,vv最后源码基于hadoop-0.20.203.0,粗糙整理,不断完善中,正误自辩,如有疑问交流或指正错误,可发邮件nice2mu。

展开阅读全文
相关资源
猜你喜欢
相关搜索

当前位置:首页 > 建筑/施工/环境 > 项目建议


备案号:宁ICP备20000045号-2

经营许可证:宁B2-20210002

宁公网安备 64010402000987号