超人学院storm深入浅出.ppt

上传人:小飞机 文档编号:6489331 上传时间:2023-11-05 格式:PPT 页数:33 大小:803KB
返回 下载 相关 举报
超人学院storm深入浅出.ppt_第1页
第1页 / 共33页
超人学院storm深入浅出.ppt_第2页
第2页 / 共33页
超人学院storm深入浅出.ppt_第3页
第3页 / 共33页
超人学院storm深入浅出.ppt_第4页
第4页 / 共33页
超人学院storm深入浅出.ppt_第5页
第5页 / 共33页
点击查看更多>>
资源描述

《超人学院storm深入浅出.ppt》由会员分享,可在线阅读,更多相关《超人学院storm深入浅出.ppt(33页珍藏版)》请在三一办公上搜索。

1、深入浅出S,Storm简介,Storm是Twitter开源的一个类似于Hadoop的实时数据处理框架。Storm能实现高频数据和大规模数据的实时处理。官网资料显示storm的一个节点1秒钟能够处理100万个100字节的消息(IntelE56452.4Ghz的CPU,24GB的内存),HADOOP与STORM比较,数据来源:HADOOP处理的是HDFS上TB级别的数据(历史数据),STORM是处理的是实时新增的某一笔数据(实时数据);处理过程:HADOOP是分MAP阶段到REDUCE阶段,STORM是由用户定义处理流程,流程中可以包含多个步骤,每个步骤可以是数据源(SPOUT)或处理逻辑(BOL

2、T);是否结束:HADOOP最后是要结束的,STORM是没有结束状态,到最后一步时,就停在那,直到有新数据进入时再从头开始;处理速度:HADOOP是以处理HDFS上TB级别数据为目的,处理速度慢,STORM是只要处理新增的某一笔数据即可,可以做到很快;适用场景:HADOOP是在要处理批量数据时用的,不讲究时效性,STORM是要处理某一新增数据时用的,要讲时效性;,Storm的设计思想,Storm是对流Stream的抽象,流是一个不间断的无界的连续tuple,注意Storm在建模事件流时,把流中的事件抽象为tuple即元组。Storm将流中元素抽象为Tuple,一个tuple就是一个值列表val

3、ue list,list中的每个value都有一个name,并且该value可以是基本类型,字符类型,字节数组等,当然也可以是其他可序列化的类型。Storm认为每个stream都有一个stream源,也就是原始元组的源头,所以它将这个源头称为Spout。有了源头即spout也就是有了stream,那么该如何处理stream内的tuple呢。将流的状态转换称为Bolt,bolt可以消费任意数量的输入流,只要将流方向导向该bolt,同时它也可以发送新的流给其他bolt使用,这样一来,只要打开特定的spout(管口)再将spout中流出的tuple导向特定的bolt,又bolt对导入的流做处理后再导

4、向其他bolt或者目的地。以上处理过程统称为Topology即拓扑。拓扑是storm中最高层次的一个抽象概念,它可以被提交到storm集群执行,一个拓扑就是一个流转换图,图中每个节点是一个spout或者bolt,图中的边表示bolt订阅了哪些流,当spout或者bolt发送元组到流时,它就发送元组到每个订阅了该流的bolt(这就意味着不需要我们手工拉管道,只要预先订阅,spout就会将流发到适当bolt上)。拓扑的每个节点都要说明它所发射出的元组的字段的name,其他节点只需要订阅该name就可以接收处理。,流处理过程,Storm开发,在eclipse中建立maven工程增加一个依赖org.a

5、pache.stormstorm-core0.9.3,编写Storm本地程序,核心类LocalCluster、Topology、TopologyBuilder、BaseRichSpout、BaseRichBoltLocalCluster cluster=new LocalCluster();,注意:,spout和bolt的ID不能以_开头,这些是系统保留的spout和bolt不能使用相同的ID集群中topology的名称不能重复,Storm术语解释,Topology 用于封装一个实时计算应用程序的逻辑,类似于Hadoop的MapReduce JobStream 消息流,是一个没有边界的tupl

6、e序列,这些tuples会被以一种分布式的方式并行地创建和处理Spouts 消息源,是消息生产者,他会从一个外部源读取数据并向topology里面面发出消息:tupleBolts 消息处理者,所有的消息处理逻辑被封装在bolts里面,处理输入的数据流并产生新的输出数据流,可执行过滤,聚合,查询数据库等操作Task 每一个Spout和Bolt会被当作很多task在整个集群里面执行,每一个task对应到一个线程.Stream groupings 消息分发策略,定义一个Topology的其中一步是定义每个tuple接受什么样的流作为输入,stream grouping就是用来定义一个stream应该

7、如何分配给Bolts们.,Storm集群结构,Storm集群表面类似Hadoop集群。但在Hadoop上你运行的是”MapReduce jobs”,在Storm上你运行的是”topologies”。”Jobs”和”topologies”是大不同的,一个关键不同是一个MapReduce的Job最终会结束,而一个topology永远处理消息(或直到你kill它)。Storm集群有两种节点:控制(master)节点和工作者(worker)节点。控制节点运行一个称之为”Nimbus”的后台程序,它类似于Haddop的”JobTracker”。Nimbus负责在集群范围内分发代码、为worker分配任务

8、和故障监测。每个工作者节点运行一个称之”Supervisor”的后台程序。Supervisor监听分配给它所在机器的工作,基于Nimbus分配给它的事情来决定启动或停止工作者进程。每个工作者进程执行一个topology的子集(也就是一个子拓扑结构);一个运行中的topology由许多跨多个机器的工作者进程组成。,一个Zookeeper集群负责Nimbus和多个Supervisor之间的所有协调工作(一个完整的拓扑可能被分为多个子拓扑并由多个supervisor完成)。此外,Nimbus后台程序和Supervisor后台程序都是快速失败(fail-fast)和无状态的;所有状态维持在Zookee

9、per或本地磁盘。这意味着你可以kill-9杀掉nimbus进程和supervisor进程,然后重启,它们将恢复状态并继续工作,就像什么也没发生。这种设计使storm极其稳定。这种设计中Master并没有直接和worker通信,而是借助一个中介Zookeeper,这样一来可以分离master和worker的依赖,将状态信息存放在zookeeper集群内以快速回复任何失败的一方。,storm基本体系架构图,storm集群搭建,安装zookeeper集群(要保证集群各个节点的时间保持一致)修改文件conf/storm.yaml,storm配置文件写法,使用两个空格作为一级缩进是 YAML 的约定,

10、不能使用制表符(Tab)来代替以“:”为结束符的字符串,代表了一个键名,“:”后面则是键值。“:”和键值之间必须有至少一个空格。列表的元素前面的“-”必不可少,并且要跟随至少一个空格。也可以使用-value1,value2,value3 表示列表,启动,在nimbus节点执行nohup bin/storm nimbus/dev/null 2&1&启动Nimbus后台程序,并放到后台执行在supervisor节点执行nohup bin/storm supervisor/dev/null 2&1&启动Supervisor后台程序,并放到后台执行;在nimbus节点执行nohup bin/storm

11、 ui/dev/null 2&1&启动UI后台程序,并放到后台执行,启动后可以通过http:/nimbus host:8080观察集群的worker资源使用情况、Topologies的运行状态等信息。在所有从节点执行nohup bin/storm logviewer/dev/null 2&1&启动log后台程序,并放到后台执行,启动后可以通过http:/host:8000观察日志信息。(nimbus节点可以不用启动logviewer进程,因为logviewer进程主要是为了方便查看任务的执行日志,这些执行日志都在supervisor节点上。),启动时注意,1:当我们把当前终端关闭的话,之前在后

12、台启动的进程也会被停掉。所以如果想要避免这种情况就需要在刚才执行的命令前面都加一个nohup命令2:启动Storm后台进程时,需要对conf/storm.yaml配置文件中设置的storm.local.dir目录具有写权限3:当发现启动nimbus或者supervisor报这个错误时(右图1),是因为之前关闭服务时,没有先关闭topology,再启动时,zk中依然保留了上次运行的topology信息,但是本地的一些文件没了,启动就会报错。解决方法,使用zk的zkCleanup.sh清理缓存数据,或者直接清空zk的dataDir目录中的数据4:storm非正常退出(服务器断电),再次启动supe

13、rvisor时可能会引起这个异常(右图2),解决办法,删除属性对应目录中的supervisor和workers两个目录,然后重启supervisor进程即可,实现一键启动和停止集群脚本,详细步骤参考,集群代码写法,代码StormSubmitter.submitTopology(mytopology,conf,topology);提交storm jar path/to/allmycode.jar org.me.MyTopology arg1 arg2 arg3,向集群提交作业,storm jar*.jar xxxxMainClass,停止作业,先查询作业列表storm list命令行下执行sto

14、rm kill TopologyName在storm ui上点击kill按钮,topology读取本地文件,如果需要在topology中读取磁盘中的文件加载一些配置信息的话,需要保证这个文件存在于集群的所有节点中。,并行度,worker,executor,task解释,1个worker进程执行的是1个topology的子集(注:不会出现1个worker为多个topology服务)。1个worker进程会启动1个或多个executor线程来执行1个topology的(spout或bolt)。因此,1个运行中的topology就是由集群中多台物理机上的多个worker进程组成的。executor是

15、1个被worker进程启动的单独线程。每个executor只会运行1个topology的1个(spout或bolt)的task(注:task可以是1个或多个,storm默认是1个(spout或bolt)只生成1个task,executor线程会在每次循环里顺序调用所有task实例)。task是最终运行spout或bolt中代码的执行单元(注:1个task即为spout或bolt的1个实例,executor线程在执行期间会调用该task的nextTuple或execute方法)。topology启动后,1个(spout或bolt)的task数目是固定不变的,但该(spout或bolt)使用的ex

16、ecutor线程数可以动态调整(例如:1个executor线程可以执行该(spout或bolt)的1个或多个task实例)。这意味着,对于1个(spout或bolt)存在这样的条件:#threads=#tasks(即:线程数小于等于task数目)。默认情况下task的数目等于executor线程数目,即1个executor线程只运行1个task。默认情况下,一个supervisor节点最多可以启动4个worker进程,每一个topology默认占用一个worker进程,每个spout或者bolt会占用1个executor,每个executor启动1个task。,提高并行度,worker(slo

17、ts)默认一个从节点上面可以启动4个worker进程,参数是supervisor.slots.port。在storm配置文件中已经配置过了,默认是在strom-core.jar包中的defaults.yaml中配置的有。默认一个strom项目只使用一个worker进程,可以通过代码来设置使用多少个worker进程。通过config.setNumWorkers(workers)设置通过conf.setNumAckers(0);可以取消acker任务最好一台机器上的一个topology只使用一个worker,主要原因是减少了worker之间的数据传输如果worker使用完的话再提交topology

18、就不会执行,会处于等待状态executor默认情况下一个executor运行一个task,可以通过在代码中设置builder.setSpout(id,spout,parallelism_hint);builder.setBolt(id,bolt,parallelism_hint);task通过boltDeclarer.setNumTasks(num);来设置实例的个数executor的数量会小于等于task的数量(为了rebalance)例子:通过提高并行度实现单词汇总。,弹性计算,通过代码调整topologyBuilder.setBolt(green-bolt,new GreenBolt()

19、,2).setNumTasks(4).shuffleGrouping(blue-spout);通过shell调整#10秒之后开始调整#Reconfigure the topology mytopology to use 5 worker processes,#the spout blue-spout to use 3 executors and#the bolt yellow-bolt to use 10 executors.storm rebalance mytopology-w 10-n 5-e blue-spout=3-e yellow-bolt=10注意:acker数目运行时是不会变化

20、的,所以多指定几个worker进程,acker线程数也不会增加。通过UI(不推荐使用),官网蓝绿黄例子,stream grouping,stream grouping分类,Shuffle Grouping:随机分组,随机派发stream里面的tuple,保证bolt中的每个任务接收到的tuple数目相同.(它能实现较好的负载均衡)Fields Grouping:按字段分组,比如按userid来分组,具有同样userid的tuple会被分到同一任务,而不同的userid则会被分配到不同的任务All Grouping:广播发送,对于每一个tuple,Bolts中的所有任务都会收到.Global G

21、rouping:全局分组,这个tuple被分配到storm中的一个bolt的其中一个task.再具体一点就是分配给id值最低的那个task.Non Grouping:随机分派,意思是说stream不关心到底谁会收到它的tuple.目前他和Shuffle grouping是一样的效果,Direct Grouping:直接分组,这是一种比较特别的分组方法,用这种分组意味着消息的发送者举鼎由消息接收者的哪个task处理这个消息.只有被声明为Direct Stream的消息流可以声明这种分组方法.而且这种消息tuple必须使用emitDirect方法来发射.消息处理者可以通过TopologyConte

22、xt来或者处理它的消息的taskid(OutputCollector.emit方法也会返回taskid),实例-统计网站PV、UV、DV,(类似WordCount的计算去重word总数):bolt1通过fieldGrouping 进行多线程局部汇总,下一级blot2进行单线程保存ip和count数到Map,下一级blot3进行Map遍历,可以得到:PV、UV、DV(访问深度,也就是每个ip 的浏览数),storm的可靠性,worker进程死掉supervisor进程死掉nimbus进程死掉(存在HA的问题)节点宕机ack/fail消息确认机制,storm的定时任务,可以每隔指定的时间将数据整合

23、一次存入数据库。或者每隔指定的时间执行一些1:在main中设置conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,60);/设置本Bolt定时发射数据2:在bolt中使用下面代码判断是否是触发用的bolttuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)如果为true,则执行定时任务需要执行的代码,最后return,如果为false,则执行正常的tuple处理的业务逻辑注意:storm会按照用户设置的时间间隔给拓扑中的所有bolt发送系统级别的tuple。详细代码参考备注中的例子,实例-网站访客区域分布,

展开阅读全文
相关资源
猜你喜欢
相关搜索

当前位置:首页 > 生活休闲 > 在线阅读


备案号:宁ICP备20000045号-2

经营许可证:宁B2-20210002

宁公网安备 64010402000987号