《2023大数据技术之Spark.docx》由会员分享,可在线阅读,更多相关《2023大数据技术之Spark.docx(185页珍藏版)》请在三一办公上搜索。
1、大数据技术SPark基础解析第1章Spark概述1.1 什么是SPark什么是SPark1定义SPark是一种基于内存的快速、通用、可扩展的大数据分析引擎。2、历史2009年诞生于加州大学伯克利分校AMPLab,项目采用SCaIa编写。2010年开源;2013年6月成为APaChe孵化项目2014年2月成为APaChe顶级项目。1.2 SPark内置模块SPark内置模块Spark SQL 结构化数据SparkStreaming实时计算Spark Mlib机器学习SparkGraghX图计算SparkCore独立调度器YARNMesosSparkCore:实现了SPark的基本功能,包含任务调
2、度、内存管理、错误恢复、与存储系统交互等模块。SparkCore中还包含了对弹性分布式数据集(ReSilientDistributedDataSet,简称RDD)的APl定义。SparkSQL:是SPark用来操作结构化数据的程序包。通过SParkSQL,我们可以使用SQL或者APaCheHiVe版本的SQL方言(HQL)来查询数据。SParkSQL支持多种数据源,比如HiVe表、ParqUet以及JSoN等。SparkStreaming:是SPark提供的对实时数据进行流式计算的组件。提供了用来操作数据流的API,并且与SParkCOre中的RDDAPl高度对应。SparkMLlib:提供常
3、见的机器学习(ML)功能的程序库。包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据导入等额外的支持功能。集群管理器:Spark设计为可以高效地在一个计算节点到数千个计算节点之间伸缩计算。为了实现这样的要求,同时获得最大灵活性,SPark支持在各种集群管理器(ChISterManager)上运行,包括HadoOPYARN、ApacheMesos,以及SPark自带的一个简易调度器,叫作独立调度器。SPark得到了众多大数据公司的支持,这些公司包括HortonworksIBMInlekCloudera.MapR.Pivotak百度、阿里、腾讯、京东、携程、优酷土豆。当前百度的Spark已应
4、用于大搜索、直达号、百度大数据等业务;阿里利用GraPhX构建了大规模的图计算和图挖掘系统,实现了很多生产系统的推荐算法:腾讯SPark集群达到8000台的规模,是当前已知的世界上最大的SPark集群。1.3 Spark特点沙SPark特点商硅谷D快:与HadoOP的MaPRedUCe相比,SPark基于内存的运算要快100倍以上,基于硬盘的运算也要快10倍以上。SPark实现了高效的DAG执行引擎,可以通过基于内存来高效处理数据流。计算的中间结果是存在于内存中的。2)易用:SPark支持Java、PythOn和SCaIa的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且S
5、Park支持交互式的PylhOn和SCahI的Shell,可以非常方便地在这些ShdI中使用SPark集群来验证解决问题的方法。3)通用:SPark提供了统的解决方案。SPark可以用于批处理、交互式查询J(SParkSQL)、实时流处理(SparkStreaming)、机器学习(SParkMLlib)和图计算(GraPhX)。这些不同类型的处理都可以在同个应用中无缝使用.减少了开发和维护的人力成本和部署平台的物力成本.4兼容性:SPark可以非常方便地与其他的开源产品进行融合。比如,SPark可以使用HadooP的YARN和ApachcMCSOS作为它的资源管理和调度器,并且可以处理所有Ha
6、d。OP支持的数据,包括HDFS、HBasC等。这对于已经部署Had。OP集群的用户特别重要,因为不需要做任何数据迁移就可以使用SPark的强大处理能力。第2章Spark运行模式2.1 Spark安装地址1 .官网地址htlp:/SDark.apache.og/2 .文档查看地址hllps:SPalk.aDachc3gdocs2.1.1/3 .下载地址https:SPark.aDache.org/downloads.html2.3Local模式2.3.1 概述&概述。商硅谷1.OCal模式就是运行在一台计算机上的模式,通常就是用于在本机上练手和测试。它可以通过以下集中方式设置MaSteIOCa
7、1:所有计算都运行在一个线程当中,没有任何并行计算,通常我们在本机执行一些测试代码,或者练手,就用这种模式;IOCalK:指定使用几个线程来运行计算,比如IOCal就是运行4个Worker线程。通常我们的CPU有几个Core,就指定几个线程,最大化利用CPU的计算能力;IOCall*:这种模式直接帮你按照CPU最多CoreS来设置线程数了。2.3.2 安装使用1)上传并解压SPark安装包atguiguhadoopl02sorfware$tar-zxvfspark-2.1.l-bin-hadoop2.7.tgz-Coptmoduleatguigu0hadooplO2module$mvspark
8、-2.1.l-bin-hadoop2.7spark2)官方求Pl案例atguigu0hadoolO2spark$binsark-submit-classorg.apache.spark.examples.SparkPi-executor-memoryIG-total-executor-cores2./examples/jars/spark-examples_2.11-2.1.1.jar100(1)基本语法bin/spark-submit-classmaster-deploy-modeconf=.#otheroptionsapplication-arguments(2)参数说明:-master指
9、定Master的地址,默认为LoCal一CIaSS:你的应用的启动类(如org.apache.spark.examples.SparkPi)-deploy-mode:是否发布你的驱动到Worker节点(CIUSter)或者作为一个本地客户端(client)(default:client)*-conf:任意的SPark配置属性,格式key=value.如果值包含空格,可以加引号“key=VaIUe”application-jar:打包好的应用jar,包含依赖.这个URL在集群中全局可见。比如hdfs:共享存储系统,如果是file:/path,那么所有的节点的Path都包含同样的jarapplic
10、ation-arguments:传给main()方法的参数-executor-memoryIG指定每个executor可用内存为IGtotal-executor-cores2指定每个executor使用的cup核数为2个3)结果展示该算法是利用蒙特卡罗算法求Pl18/09/29 09:04:35 INFO Taskschedulerlmpl: Removed Taskset 0.0, whose tasks have all com pleted, from pool18/09/29 09:04:35 INFO DAGScheduler: Job 0 finished: reduce at S
11、parkPi.seala:38, took 2:10863 S 一弧ioGghl, 3.1410779141077丽18/09/29 18/09/29 topped!18/09/29 18/09/29 18/09/2918/09/29(9:09:04:3509:04:3509:04:3509:04:3509:04:35ommitCoordinatorINFO INFOINFO INFO INFO INFOSparkUl: MtO叩ed spark web Ul at http:/192.168.9.102:4040MapoutputTrackerMasterEndpoint: Mapoutpu
12、tTrackerMasterEndpoint Sstopped!Memorystore: Memorystore clearedBlockManager: BlocKManager stoppedBlOckManagerMaster: BlOckManagerMaster stoppedOutputCommitCoordinatorJutputCommitcoordinatorEndpoint: OutputC18/09/29 09:04:35 info Sparkcontext: successfully stopped Sparkcontext18/09/29 09:04:35 INFO
13、SnutdownHookManager: Shutdown hook called18/09/29 09:04:35 INFO ShutdownHookManager: Deleting di rectory tmpspark-afa7967e-0946-40dl-b786-b41a992ff624atgui gu0hadooplO2 sparkS4)准备文件atguiguQhadoopl02spark$mkdirinput在input下创建3个文件LtXt和2.txt,并输入以下内容helloatguiguhellospark5)启动spark-shellatguiguhadoopl02sp
14、ark$bin/spark-shellUsingSpark*sdefaultlog4jprofile:orgapachesparklog4j-defaults.propertiesSettingdefaultlogleveltoWARN.Toadjustlogginglevelusesc.SetLogLevel(newLevel).ForSparkRzuseSetLogLevelCnewLevel).18/09/2908:50:52WARNNativeCodeLoader:Unabletoloadnative-hadoolibraryforyourplatform.usingbuiltin-j
15、avaclasseswhereapplicable18/09/2908:50:58WARNObjectStore:Failedtogetdatabaseglobal_temp,returningNosuchObjectExceptionSparkcontextWebUIavailableathttp:/192.168.9.102:4040Sparkcontextavailableas,sc,(master=local(*)raid=local-1538182253312).Sparksessionavailableas,spark,.Welcometo/_/_/_version2.1.1一JU
16、singScalaversion2.11.8(JavaHotSpot(TM)64-BitServerVMzJava1.8.0_144)Typeinexpressionstohavethemevaluated.Type:helpformoreinformation.scala开启另一个CRD窗口atguiguhadool02spark)$js3627SparkSubmit4047Jps可登录hadoop102:4040查看程序运行DSparkshellSparkJobS+sc.textFile(input).fIatMap(_.split(,),map(_,1).reduceByKey(_+_)
17、.collectres:Array(String,Int)=Array(hadoopz6),(ooziez3),(spark,3)z(hivez3),(atguiguz3),(hbase,6)scala可登录hadoop102:4040查看程序运行DeUiH for Job 0StofM (22.3.3提交流程1)提交任务分析:SPark通用运行简易流程重要角色:Driver(驱动器)SPark的驱动器是执行开发程序中的main方法的进程。它负责开发人员编写的用来创建SParkComext、创建RDD,以及进行RDD的转化操作和行动操作代码的执行。如果你是用sparkshell,那么当你启动S
18、parkshell的时候,系统后台自启(一个Spark驱动器程序,就是在SParkShen中预加载的一个叫作SC的SParkeomeXt对象。如果驱动器程序终止,那么SPark应用也就结束了。主要负责:1)把用户程序转为任务2)跟踪Executor的运行状况3)为执行器节点调度任务4)Ul展示应用运行状况EXeCUtOr(执行器)SparkExecutor是一个工作进程,负责在Spark作业中运行任务,任务间相互独立。SPark应用启动时,EXeCUtOr节点被同时启动,并且始终伴随着整个SPark应用的生命周期而存在。如果有EXeCUtor节点发生了故障或崩溃,Spark应用也可以继续执行,
19、会将出错节点上的任务调度到其他EXeCUIor节点上继续运行。主要负责:1)负责运行组成Spark应用的任务,并将结果返回给驱动器进程;2)通过自身的块管理器(BlOCkManager)为用户程序中要求缓存的RDD提供内存式存储。RDD是宜接缓存在EXeCUtOr进程内的,因此任务可以在运行时充分利用缓存数据加速运算。2.3.4数据流程textFile(input):读取本地文件inpul文件夹数据;flatMap(-.split():压平操作,按照空格分割符将一行数据映射成一个个单词;map(C,l):对每一个元素操作,将单词映射为元组;reduceByKey(_+_):按照key将值进行聚
20、合,相加;collect:将数据收集到DriVer端展示。WordCOlInt案例分析U雨H苦sc.textFile(,input,).flatMapL.split().map(_,l).reduceByKey(_+_).collect2.4Standalone模式2.4.1 概述构建一个由MaSter+Slave构成的SPark集群,SPark运行在集群中。StandalOne运行模式介绍2.4.2 安装使用1)进入spark安装目录下的conf文件夹StguiguQhadoopl02module$cdspark/conf/2)修改配置文件名称atguighadoopl02conf$mvsl
21、aves.templateslavesatguiguhadoopl02conf$mvspark-env.sh.templatespark-env.sh3)修改SlaVe文件,添加WOrk节点:atguiguhadoopl02conf$vimslaveshadoopl02hadool03hadoopl044)修改SPark-env.sh文件,添加如下配置:(atguiguhadool02conf$vimspark-env.shSPARK_MASTER_HOST=hadoopIOlSPARK二MASTER二PORT=70775)分发spark包atguiguhadoopl02module$xsyn
22、cspark/6)启动atguiguhadool02spark$sbin/start-all.shatguigu0hadooplO2spark$util.sh=atguiguhadoopl02=3330Jps3238Worker3163Master=atguiguhad。PlO3=2966Jps2908Worker2978Worker3036Jps网页杳看:hadoo102:8080注意:如果遇到“JAVA_HOMEnotset”异常,可以在Sbin目录下的SPark-Config.sh文件中加入如下配置:exportJAVAH0ME=XXXX7)官方求Pl案例8)启动SParkSheH参数:
23、-masterspark:hadoop102:7077指定要连接的集群的master执行WOrdCoUm程序scalasc.textFile(input).fIatMap(_.split(,).map(_,1).reduceByKey(_+_).collectres:Array(String,Int)=Array(hadoopz6),(oozie,3),(SPark,3),(hive,3),(atguigu,3),(hbase,6)scala2.4.3 JobHistoryServer配置1)修改spark-default.conf.template名称atguigu0hadooplO2con
24、f$mvspark-defaults.conf.templatespark-defaults.conf2)彳修改SPark-default.conf文件,开启Log:atguigu0hadooplO2conf$vispark-defaults.confspark.eventLog.enabledtruespark.eventLog.dirhdfs:/hadoopl02:9000/directory注意:HDFS上的目录需要提前存在。3)修改SPark-env.sh文件,添加如下配置:atguigu0hadooplO2conf$vispark-env.shexportSPARK_HISTORY_
25、OPTS=-Dspark.history.ui.port=18080-Dspark.history.retainedAlications=30-Dspark.history.fs.IogDirectory=hdfs:/ZhadooplOl:9000directory参数描述:spark.eventLog.dir:Application在运行过程中所有的信息均记录在该属性指定的路径下SPark.history.ui.port=18080WEB5访问的端口号为18080spark.history.fs.logDirectory=hdfs:/hadoop102:9000/directory配置了该属
26、性后,在Start-history-server.sh时就无需再显式的指定路径,SparkHistoryServer页面只展示该指定路径下的信息spark.history.retainedApplications=30指定保存APPliCatiOn历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。4)分发配置文件atguigu0hadoolO2conf$xsyncspark-defaults.confatguiguhadoopl02conf$xsyncspark-env.sh5)启动历史服务atguigu0hadooplO2spark$s
27、bin/start-history-server.sh6)再次执行任务atguiguhadoopl02spark$bin/spark-submit- -classorg.apache.spark.examples.SparkPi- masterspark:/hadoopl01:7077- 一executor-memoryIG- 一total-executor-cores2./examples/jars/spark-examples_2.11-2.1.1.jar1007)查看历史服务hadoopl02:18080History Servera*M*r rwo *. -2,.artTfPU 尸0
28、W tt1t 二黑;黑晨 三三三三 F三三三 - -2.4.4 HA 配置zookeeper clusterDriver AppbcationMasterinstancesExecutor(TaSk) I cache (Taak) (Task) slave workerExecutor(lsk)Icache(TaSk)(Taak)slaveworker图IHA架构图1) ZOOkeePer正常安装并启动2)修改spark-env.sh文件添加如下配置:atguiguhadoopl02conf$vispark-env.sh注释掉如下内容:#SPARK_MASTER_HOST=hadoop102#
29、SPARK二MASTER二PORT=7077添加上如下内容:exportSPARK_DAEMON_JAVA_OPTS=n-Dspark.deploy.recoveryMode=ZOOKEEPER-Dspark.deploy.zookeeper.url=hadoolOl,hadool02,hadool03-Dspark.deploy.zookeeper.dir=sparkn3)分发配置文件atguigu0hadooplO2conf$xsyncspark-env.sh4)在hadoopl02上启动全部节点atguiguhadoopl02spark$sbin/start-all.sh5)在hadoo
30、p103上单独启动master节点atgiguhadoopl03spark$sbinstart-master.sh6)sparkHA集群访问/opt/module/sparkbinspark-shell-masterspark:/hadooplOl:7077,hadoopl02:7077-executor-memory2gtotal-executor-cores22.5Yarn模式2.5.1 概述Spark客户端直接连接Yarn不需要额外构建Spark集群。有yarnclient和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点。yam-client:DriVer程序
31、运行在客户端,适用于交互、调试,希望立即看到app的输出yarn-cluster:DriVer程序运行在由RM(ResourceManager)启动的AP(APPMaster)适用于生产环境。Yarn运行模式介绍NodeManagerNodeManagerSparkAppMaster SparkContext3 AM 启动 ExeCUtOr 并分配TaskSparkExecutor2.5.2 安装使用1)修改hadoop配置文件yam-site.xml,添加如下内容:atguigu0hadooplO2hadoop$viyarn-site.xmlyarn.nodemanager.pmem-che
32、ck-enabledfalseyarn.nodemanager.vmem-check-enabledfalse2)修改spark-env.sh,添加如下配置:atguiguhadoopl02conf$vispark-env.shYARNCONFDIR=optmodulehadoop-2.7.2/etc/hadoop3)分发配置文件atguiguhadoopl02conf)$xsyncoptmodulehadoop-2.7.2/etc/hadoop/yarn-site.xmlatguiguhadoopl02conf$xsyncspark-env.sh4)执行一个程序atguigu0hadoopl
33、O2spark$bin/spark-subnit-classorg.apache.spark.examples.SparkPi-masteryarn-deploy-modeclient./examples/jars/spark-examples_2.11-2.1.1.jar100一注意:在提交任务之前需启动HDFS以及YARN集群。2.5.3 日志查看1)修改配置文件spark-defaults.conf添加如下内容:spark.yarn.historyserver.address=hadoopl01:18080spark.history.ui.port=40002)重启SPark历史服务at
34、guiguhadoopl02spark$sbin/stop-history-server.shstoppingorg.apache.spark.deploy.history.HistoryServeratguiguhadoopl02spark$sbinstart-history-server.shstartingorg.apache.spark.deploy.history.HistoryServer,loggingto/opt/module/spark/logs/spark-atguigu-org.apache.sark.deloy.history.HistoryServer-I-Hadoo
35、pl02.out3)提交任务到Yarn执行atguigu0hadooplO2spark$bin/spark-submit-classorg.apache.spark.examples.SparkPi-masteryarn-deploy-modeclient./examples/jars/spark-examples_2.11-2.1.1.jar100-1.1 Web页面查看日志A91 A41All Applications2.6 Mesos模式Spark客户端直接连接Mesos;不需要额外构建Spark集群。国内应用比较少,更多的是运用yam调度。2.7 几种模式对比模式Spark安装机器数需
36、启动的进程所属者Local1无SparkStandalone3Master及WorkerSparkYarn1Yam及HDFSHadoop第3章案例实操SparkShell仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在IDE中编制程序,然后打成jar包,然后提交到集群,最常用的是创建一个MaVen项目,利用Maven来管理jar包的依赖。3.1 编写WorcICount程序1)创建一个MaVen项目WordCOUnt并导入依赖org.apache.sparksark-core2.ll2.1.lWordCountnet.alchim31.mavenscala-maven-plugin
37、3.2.2compiletestCompile2)编写代码packagecom.atguiguimportorg.apache.spark.SparkConfzSparkContextobjectWordCountdefmain(args:ArraytStringJ):Unit=/1.创建SparkConf并设置App名称valconf=newSparkConfO.SetAppName(nWCn)2.创建SParkCOnteXt,该对象是提交SParkAPP的入口valsc=newSparkContext(conf)/3.使用SC创建RDD并执行相应的transformation和action
38、sc.textFile(args(O).fIatMap(_.split(,).map(_z1).reduceByKey(_+_/1).sortBy(_._2Zfalse).SaveAsTextFile(args(1)/4.关闭连接sc.stop()3)打包插件org.apache.maven.pluginsmaven-assembly-plugin3.0.OWordCountjar-with-deendenciesmake-assemblypackagesingle4)打包到集群测试bin/spark-submit-classWordCount-masterspark:/hadoopl02:7
39、077WordCount.jar/word.txt/out3.2 本地调试本地SPark程序调试需要使用IoCal提交模式,即将本机当做运行环境,MaSter和Worker都为本机。运行时直接加断点调试即可。如下:创建SParkCOnf的时候设置额外属性,表明本地执行:valconf=newSparkConf().SetAppName(,WC,).SetMaster(,local*n)如果本机操作系统是windows,如果在程序中使用了hadoop相关的东西,比如写入文件到HDFS,则会遇到如下异常:2017-09-1416:08:34.907ERROR-aanorj.apeh.hadoop.
40、util.Shellilme:S03:Failedtolocatetheutiisbinaryinth*hAdoopbinarypathja.io.IOExcptor:Couldnotloeatxcualnul1binmi1c.xinthHadoopbimu-i.aor(aorgapache,hadoopxil.SrrmgVt3Is.zchnF3Clicct.init,1.,atog.pachc.hado.hdfs.DFSCliect.init(T一:)atQxg.apache,hadopp.hdfs.DistribvtcFileSystra.initialize1_atorC-Apncke.hap.iz.FileSyateB.createFiIeSystes此三本(C:)ProgramFileshad。P1.lessonSPark名称修改日期bin2017/8/1713