《Spark大数据技术与应用案例教程》教案第14课实时计算电影热度.docx

上传人:李司机 文档编号:7016841 上传时间:2024-04-12 格式:DOCX 页数:11 大小:92.88KB
返回 下载 相关 举报
《Spark大数据技术与应用案例教程》教案第14课实时计算电影热度.docx_第1页
第1页 / 共11页
《Spark大数据技术与应用案例教程》教案第14课实时计算电影热度.docx_第2页
第2页 / 共11页
《Spark大数据技术与应用案例教程》教案第14课实时计算电影热度.docx_第3页
第3页 / 共11页
《Spark大数据技术与应用案例教程》教案第14课实时计算电影热度.docx_第4页
第4页 / 共11页
《Spark大数据技术与应用案例教程》教案第14课实时计算电影热度.docx_第5页
第5页 / 共11页
点击查看更多>>
资源描述

《《Spark大数据技术与应用案例教程》教案第14课实时计算电影热度.docx》由会员分享,可在线阅读,更多相关《《Spark大数据技术与应用案例教程》教案第14课实时计算电影热度.docx(11页珍藏版)》请在三一办公上搜索。

1、课题实时计算电影热度课时2课时(90min)教学目标知识技能目标:(1)熟悉DStream的转换操作(2)熟悉DStream的输出操作素质目标:培养自我学习和持续学习能力,能够及时掌握新技术和工具,并将其应用到实际项目中教学重难点教学重点:DStream的转换操作和输出操作教学睚点:使用DSlream的输出操作输出实时处理的结果教学方法案例分析法、问答法、讨论法、i井授法教学用具电脑、投影仪、多媒体课件、教材教学过程主要教学内容及步骤课前任务【教师】布置课前任务,和学生负责人取得联系,让其提醒同学通过APP或其他学习软件,完成课前任务请大家了解DStream的转换操作和输出操作。【学生】完成课

2、前任务考勤【教师】使用APP进行签到【学生】班干部报请假人员及原因问题导入【教师】提出以下问题:在实时计算中,SparkStreaming是如何读取实时数据的?【学生】思考、举手回答传授新知【教师】通过学生的回答引入新知,介绍DStream的转换操作和输出操作等知识一、DStream的转换操作【教师】介绍DStream的转换操作在实时计算中,数据会源源不断地到达,SparkStreaming读取数据时会创建DStream,将实时数据流划为一系列小的批次,每个批次包含一段时间内的数据。一旦创建了DStream,就可以对其应用各种转换操作和输出操作来处理数据.DStream的转换操作通常包括无状态

3、转换操作和有状态转换操作.1.DStream无状态转换操作DStream无状态转换操作是指输出结果只与当前批次的数据相关,不依赖于之前或之后批次的数据.在任务一的任务实施中,词频统计应用程序wordCount.py中的DStream就采用了无状态转换操作,每次统计只统计当前批次到达的单词的词频,与其他批次的单词无关。DStream无状态转换操作的常用方法如表4-2所示。表4-2DStream无状态转换操作的常用方法方法说明map(func)对当前DStream的每个元素采用函数func进行转换,返回一个新的DStreamflatMap(func)与map相似,但是每个输入项可以被映射为0个或者

4、多个输出项,返回一个新的DStreamfilter(func)返回一个新的DSiream,仅包含源DStream中满足函数func的项repartition(numPartitions)通过创建更多或者更少的分区改变DStream的并行程度,返回一个新的DStreamunion(other)将当前DStream与另一个DStream进行合并,返回一个新的DStream,新的DStream中包含当前DStream和其他DStream的元素count()统计DStream中每个RDD包含的元素数量reduce(func)利用函数func聚集源DStream中每个RDD的元素,返回一个包含单元素RD

5、DS的新DStreamCountByValueO计算DStream中每个元素的出现次数,返回一个新的DStreamFeduceByKey(func,numPartiiions)将具有相同键的元素进行聚合,并使用提供的函数func来执行聚合操作,返回一个新的DStreame参数func表示用于聚合操作的函数;参数numPartitions表示分区数mapValues(func)将DStream中每个键值对的值应用指定的函数func,并保持键不变,返回一个新的DStreamjoin(other.numPartitions)将两个DStream进行连接操作,返回一个新的DStream,结果包含两个原

6、始DStream中具有相同键的元素transfbrm(func)对DStream中的每个元素应用自定义函数func,返回一个新的DStream【教师】通过例子,帮助学生掌握DStream无状态转换操作【例4-3使用transform。方法从DStream中移除特定的数据。打开第I个终端,启动PySpark交互式执行环境,并执行以下代码。hadoopbogon$pysparkfrompyspark.streamingimportStreamingContextssc=StreamingContext(sc,10)#创建DStreamlines=ssc.socketTextStream(local

7、host,9999)(详见教材)打开第2个终端,执行命令并持续输入文本,向端口发送数据流,如图4-17所示.此时,第1个终端会持续输出过滤掉love之后的单词,如图4-18所示。Time:2023-08-0315:49:50ISparkTime:2023-08-0315:50:00IHadoopTime:2023-08-0315:50:10hadoopbogon-$nc-Ik9999IloveSparkamIloveHadooplearningIamlearningSparkSPark图4-17输入文本图4-18输出过滤掉love之后的单词2DStream有状态转换操作与无状态转换操作不同,D

8、Streain有状态转换操作是指操作当前批次数据时,需要使用之前批次的数据或中间结果。有状态转换操作会将新到达的数据与当前状态中的部分数据或全部合并,然后计算结果。有状态转换包括基于滑动窗口的转换操作和UMaIeStaleByKey转换操作。(1)基于滑动窗口的转换操作。【教师】利用多媒体展示“基于滑动窗口的转换操作的工作过程”图片,并进行讲解基于滑动窗口的转换操作是指固定持续时间的窗口以固定滑动间隔在DStream上滑动,然后对窗口内的数据执行计算操作,生成新的RDD作为窗口DStream的一个RDDo基于滑动窗口的转换操作的工作过程如图4-19所示。首先,设置窗口的持续时间和滑动间隔,分别

9、为3秒和2秒;然后在第3秒时,对第1一3秒的3个RDD执行计算操作,生成一个新的RDD(即第3秒时窗口的计算结果);两秒后,又会对第35秒的3个RDD执行计算操作,生成T新的RDD(即第5秒时窗口的计算结果);一个个窗口的计算结果组成了窗口DStream.第1秒第2秒第3秒第4秒第5秒Y:DStream一:|TFL-|-IT窗Il操作窗口一门rhDStream第1秒时的窗口第3秒时的窗口第5计算结果计算结果图4-19基于滑动窗口的转换操作的工作过程基于滑动窗口的转换操作至少需要两个参数。WindowDuration:窗口的持续时间,即窗口覆盖的时间长度0SlideDuration:窗口的滑动间

10、隔,即执行一次计算的时间间隔.【小提示】WindowDuration和SlideDuration这两个参数值必须是批处理时间间W基于滑动窗口的转换操作的常用方法如表4-3所示。表4-3基于滑动窗口的转换操作的常用方法匚秒时(计算经鬲的整勺窗口;果数倍。方法说明window(vindowDuration,SlideDuration)将数据流按照指定的窗口持续时间和滑动间隔进行分组,返回一个新的DStrcamCountByWindow(WindowDuration,SlideDuration)在指定的窗口持续时间内计算数据流中元素的数量一个新的DStream,返回reduceByWindow(fu

11、nc,invFunc,WindowDuration,SlideDuration)将DStream中的数据按照指定的窗口持续时间和滑动间隔进行分组,并在每个窗口上应用func函数进行聚合操作,返回一个新的DStreame参数func表示指定的聚合函数;参数invFunc表示反向聚合函数,用于将两个值取消合并为一三FeduceByKeyAndWindow(func,invFunc,WindowDuraiion,SlideDuration,在滑动窗口上对键值对进行聚合操作,返回一个新的numPartitions,RlterFunc)DStream参数numPartitions(可选)代表分区数;参数

12、IiIterFunc(可选)代表过滤函数,用于过滤不满足条件的键值对.需要注意的是,使用此操作必须启用检杳点(checkpointing)功能countByValueAndWindow(WindowDuration,SlideDuration,numPartitions)计算窗口中每个元素的出现次数,返回一个新的DStream【教师】通过例子,帮助学生掌握基于滑动窗口的转换操作【例4-4使用WindoVV()方法截取DStream中的元素。打开第1个终端,启动PySpark交互式执行环境,并执行以下代码。hadoopbogon-$pysparkfrompyspark.streamingimpo

13、rtStreamingContextssc=StreamingContext(sc,1)lines=ssc.socketTextStream(localhost9999)(详见教材)打开第2个终端,执行命令并按照每秒1个字母的速度持续输入文本,向端口发送健流,如图4-20所示。此时,第1个终端的输出结果如图4-21所示。从图中可以看出,第1秒的输出结果为a,第2秒的输出结果为ab,第3秒的输出结果为abc;因为第4秒时a已经滑出了窗口,所以第4秒的输出结果为bcdTime:2023-08-0316:24:34aTime:2023-08-0316:24:35abTime:2023-08-0316

14、:24:36abhadoop0bogon-$nc-Ik9999CabTime:2023-08-0316:24:37I图4-20输入文本图4-21WindoW()方法示例的输出结果【例4-5使用reduceByKeyAndWindow()方法对窗口内数据进行词频统计。打开第1个终端,启动PySpark交互式执行环境,并执行以下代码。|hadoopbogon-$pysparkfrompyspark.streamingimportStreamingContext(详见教材)【高手点拨】如果在应用程序中使用reduceByKeyAndWindow()或UPdateStateByKey()方法,则必须提

15、供检直点目录以允许定期保存RDD检直点。执行例4-5的代码之前,需要先新建检查点目录7usr/local/spark/mycode/DStream/checkpointH.打开第2个终端,执行命令并按照每秒1个字母的速度持续输入文本,向端口发送数据流,如图4-22所示。此时,第1个终端输出词频统计结果,如图4-23所示。2021-01-031618:5DTite20n*0l03 16:“:Mhadoop9bogon *$ nc -Ik 9999图4-22输入文本Tg(b, Ca,2023-01-03 16:39:011)22023-0-03 16:19:022) D20230103 16:39

16、:03TiM(,bCe*(d202H3 16:19:04D UD图 4-23 输出使用 reduceByKeyAndWindow()方法【高手点拨】reduceByKeyAndWindow(lambda x,y:x+y, lambda x,y:x-y, 3,1)采用了增量计 算的方式,其计算过程如图4-24所示。I (,aj) J I (ND I第3秒滑动窗口的数据第4秒离开滑动窗口的数据,第3秒和第4秒两个滑动 窗口内的公共部分数据第4秒新进入滑动窗口的数据第4秒的滑动窗口数据第3秒时,滑动窗口包含(.1),(a,.l)Q(b1,1)3个元素,系统会使用reduceByKeyAndWindo

17、w()方法中的“lambdax,y:x+y”函数对上述3个元素进行词频统计。第4秒时,第1个(冗1)元素离开滑动窗口,第2个(&,1)元素进入滑动窗口.此时,滑动窗口包含(n,1),(b,l)(b1)3个元素,系统会使用reduceByKeyAndWindowO方法中的Rambdax,y:x-y”函数将离开滑动窗口的元素(W,1)从之前的词频统计结果中减掉,并使用该方法的“lambdax,y:x-y”函数将新进入滑动窗口的元素CKl)加入到词频统计结果中。(2)UpdateStateByKey转换操作。基于滑动窗口的转换操作只能对当前窗口内的数据进行操作,无法跨批次操作数据。如果要跨批次操作数

18、据,就必须使用UpdateStateByKey转换操作。UPdateStateByKey()方法的基本格式如下。UPdateStaleByKey(UPdaIeFUnc,numPartitions)使用函数UPdateFUnC将先前的状态和键的新值应用于每个键,以更新每个键的状态,并返回一个包含新的DStreame【教师】通过例子,帮助学生掌握UPdateStateByKey转换操作【例4-6使用UPdateStateByKeyo方法对DStreani数据迸行词频统计。打开第1个终端,新建并打开“usrIoCalsparkmycodeDStreaniZupdateWordCoUnt.py”文件,

19、编写应用程序。fromPysparic-SlreamingimportSlreamingContextfrompysparkimportSparkContexlsc=SparkContext(local2,UpdateStateByKey)ssc=StreamingContext(sc,1)(详见教材)在第1个终端中执行以下命令,运行UpdateWordCountpy”文件。hadoopbogon-$Cdusrlocalspark|hadoopbogonspark$./bin/spark-submitusrlocalsparkmycodeDSIreafn/UpdaieWordCouni.py打

20、开第2个终端,执行命令并按照每秒1个字母的速度持续输入文本,向端口发送数据流,如图4-25所示。此时第I个终端输出词频统计结果如图4-26所示。从词频统计结果可以看出,SparkStreaming每隔I秒执行1次词频统计,并且每次词频统计都包含了所有历史词频统计结果。Time:2023-08-0317:14:47(,a,1)Time:2023-08-0317:14:48(,a,2)Time:2023-08-0317:14:49(b,1)(a,2)Time:2023-08-0317:14:50(,b,2)(,a,2)Time:2023-08-0317:14:51(b,2)(c,1)(a,2)ha

21、doopbogon-$nc-Ik9999aTime:2023-08-0317:14:52ab(,b,2)b(c,1)C(,d,1)d(a,.2)图4-25输入文本图4-26输出使用UPdateStaleByKeyo方法进彳铜频统计的结果【小提示】对比例4-5和例4-6输出的词频统计结果,分析DStream有状态转换操作中基于滑动窗口的转换操作和UpdateStateByKey转换操作的区别。二、DStream的输出操作在SparkStreaming中,执行DStream的输出操作时,才会真正触发DStream转换操作的执行。执行DStream的输出操作后,SparkStreaming处理完的雌

22、才能与外部存储系统进行交互,用户可以根据需求将DSiream中的数据输出到文本文件、数据库或其他应用中。DSiream输出操作的常用方法如表4-4DStream输出操作的常用方法方法说明pprint(num)打印DStream生成的每个RDD的前num个元素,默认为前10个元素saveAsTextFiles(prefix,suffix)将DStream中的每个RDD保存为文本文件,元素以字符串的形式表示。参数prefix为保存文件的路径前缀;参数suffix(可选)为保存文件的后缀名,默认为FxEforeachRDD(func)最常用的输出操作表示将func函数应用于DSueam中的每个RDD

23、上PPrimo的使用方法简单,应用示例参见前面的例题,此处不再鳌述。下面详细介绍DStream输出到文本文件和数据库中的方法.1.DStream输出到文本文件中在SparkStreaming中,使用SaVeASTeXtFiIeS()方法可以将DStream输出到文本文件中,该方法会将每个RDD保存为一个单独的文本文件,并自动根据时间戳创建目录和文件名。参考示例如下。#将DStream输出到本地文件系统的文本文件中dstream.saveAsTextFiles(ge:/pathtodirectoryOUlPUt)(详见教材)【教师】通过例子,帮助学生掌握DStream输出到文本文件中的方法【例4

24、-7使用SaVeASTeXtFileSo方法将词频统计结果输出到新建的,7usrlocalsparkmycodeDStream/output”目录下O打开第I个终端,新建并打开“usrIoCalsparkmycodeDStream/SaveWordCounLpy”文件,编写应用程序。frompyspark.streamingimportStreamingContextfrompysparkimportSparkContextsc=SparkContext(local(2,saveWordCount)(详见教材)参照例4-6中运行Python文件的方式,在第I个终端中运行saveWordCoun

25、t.py文件。打开第2个终端,执行命令并持续输入文本,向端口发送数据流,如图4-27所示。hadoopbogon-$nc-Ik9999SparkHadoopSparkStreaming图4-27输入文本在第1个终端中按Clrl+Z组合键停止运行应用程序。然后,执行以下命令,检查这些词频统计结果是否成功输出到usrlocalsparkmycodeDStreamoulpui目录下。如果输出文件信息,则证明输出成功,如图4-28所示。hadoopbogonspark)$cdusrlocalsparkmycodeDStreamoutput(hadoopbogonoutput)$11hadoopbogo

26、noutput$Il总用号24drwxr-xr-x.3hadoophadoop40968月408:57output-1691110610000drwxr-r-.2hadoophadoop40968月409:00output-1691110840000drwxr-xr-x.2hadoophadoop40968月409:00output-1691110850000drwxr-xr-x.2hadoophadoop40968月409:01output-1691110860000drwxr-xr-x.2hadoophadoop40968月409:01output-1691110870000drwxr-x

27、r-x.2hadoophadoop40968月409:01output-1691110880000图428输出文件信息2.DStream输出到MySQL数据库中在SparkStreaming中,可以使用foreachRDD()方法对DStream中的每个RDD执行自定义的函数,从而将DStream输出到MySQL数据库中.在该自定义函数中,需要建立与MySQL数据库的连接,然后将数据写入数据库中,具体方法有以下两种。(1)将RDD转换为DaiaFrame,然后与MySQL数据库建立连接,并将DataFrame保存到数据库中。(2)使用PyMySQL库的ConneCtO方法与MySQL数据库建立

28、连接,然后使用execut()方法执行SQL语句,直接将RDD保存至UMySQL数据库中。【小提示】PyMySQL是一个用于Python编程语言的第三方库,它提供了一种简单的方式与MySQL数据库进行交互。【教师】通过例子,帮助学生掌握DStream输出到MySQL数据库中的方法【例4-8使用foreachRDDO方法将词频统计结果保存到MySQL数据库spark的wordcount表中。打开第1个终端执行以下命令启动MySQL数据库,并在SPark数据库中创建一个名为WordCoUnt的表。hadoop()bogon$mysql-hlocalhost-uroot-pmysqlusespark

29、;(详见教材)继续在第1个终端中新建并打开“usrlocalSParkmycodeDSIream/MySQLWordCount.py”文件,编写应用程序。Irompyspark.slreamingimportSlreamingContextfrompyspark.sqlimportSparkSession.(详见教材)参考例4-6中运行PythOn文件的方式,在第1个终端中运行,MySQLWordCount.py”文件。打开第2个终端,执行命令并持续输入文本,向端口发送数据流,如图4-29所示。hadoopbogon$nc-Ik9999SparkHadoopSparkStreaming图4-2

30、9输入文本在第I个终端中按cm+z组合键停止运行应用程序。然后,执行以下命令,检查这些词频统计结果是否成功输出到MySQL数据库中。如果输出wordcount表的记录,则证明输出成功,如图4-30hadoopbogon-$mysql-hlocalhost-uroot-pmysqlusespark;mysqlselect*fromwordcount;mysqlselect*fromwordcount;+IwordIcount+IsparkI1IIHadoopI1IISpark|2|IHadoopI1|IStreaming|1|+5rowsinset(0.00sec)图4-30输出wordcoun

31、t表的数据【学生】聆听、思考、理解、记录【学生】聆听要求、进行操作课程实践【教师】介绍“实时计算电影热度”的大概流程,安排学生扫描微课二维码观看视频”实时计算电影热度”(详见教材),并要求学生迸行相应操作【任务分析】电影的平均评分能够直观地反映出某电影的热度。因此,实时计算电影热度时,首先计算电影的被评分次数和评分总和,接着计算电影的平均评分(即电影热度),最后将电影热度前三的电影信息输出到MySQL数据库中。开发应用程序前,需要打开终端,执行以下命令,在spark数据库中创建InovieHot表。hadoopbogonmysql-hlocalhost-uroot-pmysqlusespark

32、;mysqlcreate(ablemovieHot(movie_namevarchar(50),movieHotdouble);打开PyCharm,在DStream”目录下新建TopMovieHoLpy”文件,并在该文件中编写应用程序,计算电影热度并将电影热度前三的数据输出到MySQL数据库中。实现步骤如下。步骤IA步骤【创建SparkContext对象。步骤2A创建StreamingContexl对象,将批处理时间间隔设置为10秒.步骤3A使用textFileStream()方法读取创建DStream(即lines)o(详见教材)【参考代码】frompysparkimportSparkCon

33、lextfrompyspark.streamingimportStreamingContextimportpymysql#创建SparkContext对象sc=SparkContext(locaU2,MovieRatingStreaming)(详见教材)【小提示】在PyCharm中运行代码前,需要在PyChann中安装pymysql包。【运行结果】在PyCharm中运行MsgProducepy文件自动生成数据源,然后运行TopMovieHoLpy文件,实时计算电影热度,并输出电影热度、电影热度前三的数据和提示信息,如图4-31所示。Time:223-8-1411:03:88(大fit海棠,8.

34、2)皮I 7.3)(我不是药裨,9.3)西游记之大圣归来,7.5)(红海行动I 7.4)(阳光灿烂的日子,8.3)三嫉好人L 8.6)飞艳人生,7.0)无向西东6.8)中国机长L 8.4)Time:223-8-1411:83:10(湄公河行动,7.2)(三唉好人,8.149999999999999)(英罹I8.4)(红海行动I7.4)(,放牛迹的春天,9.15)(中国机长,8.4)流浪地球L7.1)(,南京!南京!,8.1)Time:223-8-1411:63:08Time:223-8-1411:03:10我不是药神,9.3)三喂好人I8.6)中国机长,8.4)电影热度前三的数据成功保存到My

35、SQL数据库中放牛班的春天,9.15)(英槌,8.4)(中国机长,8.4)电影热度前三的数据成功保存到MySQL数据库中!图4-31实时输出结果在终端执行以下命令,杳询movieHo表中的数据,如图4-32所示。hadoopbogon$mysql-hlocalhost-uroot-pmysqlusespark;mysqlselect*frommovieHot;mysqlselect*frommovieHot;+Imovie,namemovie_hotI尊不是药神I放牛班的春天I中国机长9.38.68.49.158.48.46rowsinset(0.02sec)图4-32查询movieHoi表中的数据【学生】自行扫码观看配套微课,按照要求进行操作,如遇问题可询问老师【教师】巡堂辅导,及时解决学生遇到的问题【教师】简要总结本节课的要点课堂小结DStream的转换操作DStream的输出操作【学生】总结回顾知识点【教师】布置课后作业作业布置(1)完成项目四项目实训中与本课相关的习题;(2)根据课堂知识,课后自己尝试使用DStream的转换和输出操作进行实时计算。【学生】完成课后任务教学反思11

展开阅读全文
相关资源
猜你喜欢
相关搜索

当前位置:首页 > 生活休闲 > 在线阅读


备案号:宁ICP备20000045号-2

经营许可证:宁B2-20210002

宁公网安备 64010402000987号