Storm实时流处理框架PPT演讲李乾DHU).ppt

上传人:laozhun 文档编号:2946358 上传时间:2023-03-05 格式:PPT 页数:47 大小:1.18MB
返回 下载 相关 举报
Storm实时流处理框架PPT演讲李乾DHU).ppt_第1页
第1页 / 共47页
Storm实时流处理框架PPT演讲李乾DHU).ppt_第2页
第2页 / 共47页
Storm实时流处理框架PPT演讲李乾DHU).ppt_第3页
第3页 / 共47页
Storm实时流处理框架PPT演讲李乾DHU).ppt_第4页
第4页 / 共47页
Storm实时流处理框架PPT演讲李乾DHU).ppt_第5页
第5页 / 共47页
点击查看更多>>
资源描述

《Storm实时流处理框架PPT演讲李乾DHU).ppt》由会员分享,可在线阅读,更多相关《Storm实时流处理框架PPT演讲李乾DHU).ppt(47页珍藏版)》请在三一办公上搜索。

1、实时流处理框架Storm,演讲:李乾文,2013年11月5日,目录,Storm介绍Storm环境配置Storm程序流程Storm总结及问题,目录,Storm介绍Storm环境配置Storm程序流程Storm总结及问题,实时流计算背景,参考:1.http:/storm入门教程 第一章 前言2.http:/流处理框架Storm简介,RPC(RemoteProcedureCallProtocol)远程过程调用协议,随着互联网的更进一步发展,信息浏览、搜索、关系交互传递型,以及电子商务、互联网旅游生活产品等将生活中的流通环节在线化。对于实时性的要求进一步提升,而信息的交互和沟通正在从点对点往信息链甚至

2、信息网的方向发展,这样必然带来数据在各个维度的交叉关联,数据爆炸已不可避免。因此流式处理和NoSQL产品应运而生,分别解决实时框架和数据大规模存储计算的问题。流式处理可以用于3种不同场景:事件流、持续计算以及分布式RPC。,数据分析系统组成,参考:1.http:/流式计算系统,数据分析系统整体组成示意图,如HDFS,流处理与批处理,参考:1.实时处理方案架构(作者-落枫).pdf,Storm 关注的是数据多次处理一次写入,而 hadoop 关注的是数据一次写入,多次查询使用。Storm系统运行起来后是持续不断的,而 hadoop往往只是在业务需要时调用数据。,Storm和Hadoop角色对比,

3、参考:1.http:/流式计算系统,Storm组件,参考:1.http:/storm简介,Topology:storm中运行的一个实时应用程序.Nimbus:负责资源分配和任务调度.Supervisor:负责接受nimbus分配的任务,启动和停止属于自己管理的worker进程.Worker:运行具体处理组件逻辑的进程.Task:worker中每一个spout/bolt的线程称为一个task.Spout:在一个topology中产生源数据流的组件.Bolt:在一个topology中接受数据然后执行处理的组件.Tuple:一次消息传递的基本单元.Stream grouping:消息的分组方法,St

4、orm组件,参考:1.百度图片搜索,Storm组件,参考:1.http:/Storm入门教程 第二章 构建Topology,Storm特点,参考:1.百度百科2.http:/storm入门教程 第一章 前言3.http:/Storm安装部署步骤,可扩展 计算任务可在多个线程、进程和服务器之间并行进行,支持灵活的水平扩展高可靠 保证每条消息都能被完全处理高容错性 nimbus、supervisor都是无状态的,可以用kill-9来杀死Nimbus和Supervisor进程,然后再重启它们,任务照常进行.当worker失败后,supervisor会尝试在本机重启它支持多种编程语言 除了用java实

5、现spout和bolt,还可用其他语言支持本地模式 可在本地模拟一个Storm集群功能、进行本地测试高效 用ZeroMQ作为底层消息队列,保证消息能快速被处理,目录,Storm介绍Storm环境配置Storm程序实例Storm总结及问题,依赖软件,Storm的依赖软件有Python、Zeromq、Jzmq、Zookeeper。此外,系统应安装有Java、GCC、G+编译环境。,参考:1.http:/Twitter Storm 安装实战2.http:/centos 怎么安装 g+,安装Python,#wget http:/www.python.org/ftp/python/2.7.2/Pytho

6、n-2.7.2.tgz#tar zxvf Python-2.7.2.tgz#cd Python-2.7.2#./configure#make#make install#vi/etc/ld.so.conf 追加/usr/local/lib/#sudo ldconfig这样的话,Python2.7.2就安装完毕了。,参考:1.http:/Twitter Storm 安装实战2.http:/linux ld.so.conf 和 pkgconf3.http:/linux下python安装,安装Zeromq,jzmq的安装是依赖zeromq的,所以应该先装zeromq,再装jzmq。#wget http

7、:/download.zeromq.org/zeromq-2.1.7.tar.gz#tar zxf zeromq-2.1.7.tar.gz#cd zeromq-2.1.7#./configure#make#make install#sudo ldconfig 2)安装jzmq#yum install git(CentOS)&apt-get install git(Ubuntu)#git clone git:/cd jzmq#./autogen.sh#./configure#make#make install在过程中很可能会遇到依赖库相关问题,详见参考1,参考:1.http:/Twitter S

8、torm 安装实战2.http:/Storm集群安装部署步骤【详细版】,可到https:/,SSH免密码配置,参考:1.http:/Hadoop集群(第5期副刊)_JDK和SSH无密码配置,为使各台计算机间Zookeeper同步数据、应配置SSH免密码登录1 确认本机sshd的配置文件(root)$vi/etc/ssh/sshd_config找到以下内容,并去掉注释符#RSAAuthentication yesPubkeyAuthentication yesAuthorizedKeysFile.ssh/authorized_keys2 如果修改了配置文件需要重启sshd服务(root)$vi/

9、sbin/service sshd restart3 ssh登陆系统 后执行测试命令$ssh localhost回车会提示你输入密码,因为此时我们还没有生成证书。4 生成证书公私钥$ssh-keygen-t dsa-P-f/.ssh/id_dsa$cat/.ssh/id_dsa.pub/.ssh/authorized_keys,SSH免密码配置,参考:1.http:/Hadoop集群(第5期副刊)_JDK和SSH无密码配置,5 拷贝本地生产的key到远程服务器端$cat/.ssh/id_rsa.pub|ssh 远程用户名远程服务器ip cat-/.ssh/authorized_keys6 测试

10、登陆 ssh user远程ip7 如果登陆不成功,需要修改远程服务器上的authorized_keys文件权限$chmod 600/.ssh/authorized_keys,安装Zookeeper及单机配置,#wget http:/ftp.meisei-u.ac.jp/mirror/apache/dist/zookeeper/zookeeper-3.4.5/zookeeper-3.4.5.tar.gz#tar-zxf zookeeper-3.4.5.tar.gz#cp-R zookeeper-3.4.5/usr/local/#ln-s/usr/local/zookeeper-3.4.5/usr

11、/local/zookeeper#vi./bashrc(设置ZOOKEEPER_HOME和ZOOKEEPER_HOME/bin环境变量,vim/etc/profile&source/etc/profile)追加:export ZOOKEEPER_HOME=/path/to/zookeeper export PATH=$PATH:$ZOOKEEPER_HOME/bin#cd/usr/local/zookeeper/conf/#cp zoo_sample.cfg zoo.cfg(用zoo_sample.cfg制作$ZOOKEEPER_HOME/conf/zoo.cfg)zookeeper的单机安装

12、完成。启动服务 bin/zkServer.sh start用bin/zkServer.sh status 查看会显示standalone启动客户端测试bin/zkCli.sh-server 127.0.0.1:2181,参考:1.http:/Twitter Storm 安装实战2.http:/linux下zookeeper安装与测试,安装Zookeeper及单机配置,zoo_sample.cfg内容:,Zookeeper集群配置,参考:1.http:/Twitter Storm 安装实战2.http:/linux下zookeeper安装与测试3.http:/ZooKeeper系列之十:ZooK

13、eeper的一致性保证及Leader选举,在单机配置文件zoo.cfg末尾加上server.1=192.168.61.130:2888:3888server.2=192.168.61.134:2888:3888server.3=192.168.61.135:2888:3888,格式为:server.id=host:port:port id是为每个Zookeeper节点的编号同时需要在 dataDir目录下面新建文件myid,内容为$id数值.echo$id$dataDir/myid第一个port是用于follower连接leader的端口,第二个port是用于leader选举的端口.当集群中的

14、leader宕机后其中一台follower的模式转变成leader.所以Storm集群中的Nimbus主机不一定是leader.,关于server.id=host:port:port,server.x=hostname:nnnnn:nnnnn,etcservers making up the ZooKeeper ensemble.When the server starts up,it determines which server it is by looking for the file myid in the data directory.That file contains the s

15、erver number,in ASCII,and it should match x in server.x in the left hand side of this setting.,http:/zookeeper.apache.org/doc/r3.4.3/zookeeperAdmin.html#sc_CrossMachineRequirements,官方解释,安装Storm及单机配置,#wget https:/unzip storm-0.8.1.zip#cp-R storm-0.8.1/usr/local/vim/.bashrc 追加export STORM_HOME=/usr/lo

16、cal/storm-0.8.1 export PATH=$PATH:$STORM_HOME/bin到此为止单机版的Storm就安装完毕了。,参考:1.http:/Twitter Storm 安装实战2.http:/Storm集群安装部署步骤【详细版】,Storm下载地址http:/storm-,Storm集群配置,配置文件:storm.yaml#These MUST be filled in for a storm configuration storm.zookeeper.servers:-192.168.61.130-192.168.61.134-192.168.61.135 nimbus

17、.host:192.168.61.130 storm.local.dir:/data/storm/data ui.port:8080,参考:1.http:/Twitter Storm 安装实战2.http:/Storm集群安装部署步骤【详细版】,Storm下载地址http:/storm-,启动Storm,启动Storm(在此之前要启动Zookeeper)./storm nimbus/dev/null 2&1&./storm supervisor/dev/null 2&1&./storm ui/dev/null 2&1&,参考:1.http:/Twitter Storm 安装实战2.http:/

18、Storm集群安装部署步骤【详细版】3.http:/linux nohup命令详解,./command.sh output 2&1&意思是把标准错误(2)重定向到标准输出中(1),而标准输出又导入文件output里面 所以结果是标准错误和标准输出都导入文件output里面了。,关闭Storm,参考:1.http:/Twitter Storm 安装实战2.http:/Storm集群安装部署步骤【详细版】3.http:/最近碰到的一些storm问题总结,结束nimbus进程rootlocalhost bin#kill 2781(nimbus进程号),关闭nimbus相关进程:kill ps aux

19、|egrep(daemon.nimbus)|(storm.ui.core)|fgrep-v egrep|awk print$2 关闭supervisor上的所有storm进程:kill ps aux|fgrep storm|fgrep-v fgrep|awk print$2,Storm环境配置,参考:1.http:/Twitter Storm 安装实战2.http:/Storm集群安装部署步骤【详细版】,启动后可以通过http:/nimbus host:8080 观察集群的worker资源使用情况,Topologies的运行状态等信息.,1)提交Storm Topology:storm jar

20、 mycode.jar storm.MyTopology arg1 arg2.mycode.jar是包含Topology实现代码的jar包,storm.MyTopology的main方法是Topology的入口,arg1,arg2等为main方法参数.2)列出Storm Topology:storm list 2)停止Storm Topology:storm kill topologyname在非nimbus主机上不能list和kill,会显示拒绝连接在nimbus主机上还可运行supervisor、在supervisor主机上不能运行nimbus,Topology提交(正确),提交Topol

21、ogy,Topology提交(错误),提交Topology,目录,Storm介绍Storm环境配置Storm程序流程Storm总结及问题,项目结构,注:因篇幅限制,以下程序为主要实现代码架构、异常等具体代码省略!,项目入口(TopologyMain.java),public static void main(String args)TopologyBuilder builder=new TopologyBuilder();builder.setSpout(read,new ReadSpout();builder.setBolt(monitor,new MonitorBolt().shuffle

22、Grouping(read);builder.setBolt(print,new PrintBolt().shuffleGrouping(monitor);builder.setBolt(mysql,new MysqlBolt().shuffleGrouping(monitor);Config config=new Config();,1、实例化TopologyBuilder类2、设置数据喷发节点(Spout类):Spout类首先执行open方法、再执行 nextTuple方法(读取数据,使用SpoutOutputCollector类发射出去)3、设置数据处理节点(Bolt类):declare

23、OutputFields方法声明bolt的输出参数、执行execute方法处理数据后使用SpoutOutputCollector类发射出去4、还可以继续设置数据处理节点.,流分组:Shuffle分组,Fields分组,All分组,自定义分组,Direct分组,Global分组,None分组,参考:1.http:/getting start with storm 翻译 第三章 part-1-Topologies-流分组,项目入口(TopologyMain.java),if(args!=null,喷发节点(ReadSpout.java),public class ReadSpout impleme

24、nts IRichSpout private SpoutOutputCollector collector;private boolean complete=false;private Reader Fread=null;private BufferedReader Fbuff=null;Overridepublic void open(Map conf,TopologyContext context,SpoutOutputCollector collector)this.collector=collector;File file=new File(domain.log);Fread=new

25、FileReader(file);Fbuff=new BufferedReader(Fread);Overridepublic void nextTuple()if(!complete)String str=null;while(str=Fbuff.readLine()!=null)this.collector.emit(new Values(str);complete=true;Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer)declarer.declare(new Fields(line);,喷发节

26、点(ReadSpout.java),Overridepublic void close()Overridepublic void activate()Overridepublic void deactivate()Overridepublic void ack(Object msgId)Overridepublic void fail(Object msgId)Overridepublic Map getComponentConfiguration()return null;,处理节点(MonitorBolt.java),public class MonitorBolt implements

27、IRichBolt private OutputCollector collector;Overridepublic void prepare(Map stormConf,TopologyContext context,OutputCollector collector)this.collector=collector;Overridepublic void execute(Tuple input)String str=input.getString(0);/相当于list容器String regex=.google.;boolean result=Ppile(regex).matcher(s

28、tr).find();if(result)this.collector.emit(new Values(str);,处理节点(MonitorBolt.java),Overridepublic void cleanup()Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer)declarer.declare(new Fields(str);/声明bolt的输出参数Overridepublic Map getComponentConfiguration()return null;,处理节点(MysqlBolt.j

29、ava),public class MysqlBolt implements IRichBolt private Connection conn=null;private Statement stmt=null;private OutputCollector collector;Override public void prepare(Map stormConf,TopologyContext context,OutputCollector collector)this.collector=collector;/.连接数据库 Override public void execute(Tuple

30、 input)String str=input.getString(0);/.插入数据库 Override public void cleanup()/.关闭数据库连接 Override public void declareOutputFields(OutputFieldsDeclarer declarer)Override public Map getComponentConfiguration()return null;,Storm程序流程,操作演示,目录,Storm介绍Storm环境配置Storm程序流程Storm总结及问题,Storm数据接入层,1、消息队列(MetaQ)2、通过网络

31、Socket传输数据3、前端业务系统专有数据采集 API4、对日志文件定时监控,参考:1.实时处理方案架构(作者-落枫).pdf,Storm数据落地层,1、消息队列(MetaQ)2、Mysql等数据库3、HDFS等分布式文件系统,参考:1.实时处理方案架构(作者-落枫).pdf,业务需求,参考:1.实时处理方案架构(作者-落枫).pdf,(1)条件过滤 这是Storm最基本的处理方式,对符合条件的数据进行实时过滤,将符合条件的数据保存下来,这种实时查询的业务需求在实际应用中是很常见的。(2)中间计算 我们需要改变数据中某一个字段(例如是数值),我们需要利用一个中间值经过计算(值比较、求和、求平

32、均等等)后改变该值,然后将数据重新输出。(3)求TopN 相信大家对TopN类的业务需求也是比较熟悉的,在规定时间窗口内,统计数据出现的 TopN,该类处理在购物及电商业务需求中,比较常见。(4)分布式RPC Storm有对RPC进行专门的设计,分布式RPC用于对Storm上大量的函数调用进行并行计算,最后将结果返回给客户端。,业务需求,参考:1.实时处理方案架构(作者-落枫).pdf,(5)推荐系统 在实时处理时从 mysql及 hadoop中获取数据库中的信息,例如在电影推荐系统中,传入数据为用户当前点播电影信息,从数据库中获取的是该用户之前的一些点播电影信息统计。(6)批处理 所谓批处理

33、就是数据攒积到一定触发条件,就批量输出,所谓的触发条件类似时间窗口到了,统计数量够了及检测到某种数据传入等等。(7)热度统计 热度统计实现依赖于 TimeCacheMap 数据结构,该结构能够在内存中保存近期活跃的对象。我们可以使用它来实现例如论坛中的热帖排行计算等。,问题,1、使用Maven或Leiningen解决依赖包问题2、Socket端口Spout IP地址不定3、日志、数据库等监控4、Nimbus单点故障5、分布式RPC6、布置metaq集群,写metaq与storm的接口7、实现线上更新(例如动态更改过滤规则)8、部署hadoop集群,写hdfs与storm接口9、支持类Top N统计处理,参考:1.http:/Storm项目:流数据监控32.http:/storm源码之一个class解决nimbus单点问题3.实时处理方案架构(作者-落枫).pdf,问题,参考:1.http:/Storm项目:流数据监控32.http:/storm源码之一个class解决nimbus单点问题,THE END,谢谢观赏,Witter Storm,Realtime Computation System,

展开阅读全文
相关资源
猜你喜欢
相关搜索
资源标签

当前位置:首页 > 建筑/施工/环境 > 项目建议


备案号:宁ICP备20000045号-2

经营许可证:宁B2-20210002

宁公网安备 64010402000987号