《Spark大数据技术与应用案例教程》教案第13课读取电影评分数据创建DStream.docx

上传人:李司机 文档编号:7016866 上传时间:2024-04-12 格式:DOCX 页数:7 大小:60.77KB
返回 下载 相关 举报
《Spark大数据技术与应用案例教程》教案第13课读取电影评分数据创建DStream.docx_第1页
第1页 / 共7页
《Spark大数据技术与应用案例教程》教案第13课读取电影评分数据创建DStream.docx_第2页
第2页 / 共7页
《Spark大数据技术与应用案例教程》教案第13课读取电影评分数据创建DStream.docx_第3页
第3页 / 共7页
《Spark大数据技术与应用案例教程》教案第13课读取电影评分数据创建DStream.docx_第4页
第4页 / 共7页
《Spark大数据技术与应用案例教程》教案第13课读取电影评分数据创建DStream.docx_第5页
第5页 / 共7页
点击查看更多>>
资源描述

《《Spark大数据技术与应用案例教程》教案第13课读取电影评分数据创建DStream.docx》由会员分享,可在线阅读,更多相关《《Spark大数据技术与应用案例教程》教案第13课读取电影评分数据创建DStream.docx(7页珍藏版)》请在三一办公上搜索。

1、课题读取电影评分数据创建DStream课时2课时(90min)教学目标知识技能目标:(1)熟悉基础数据源(2)熟悉高级数据源(3)掌握读取数据创建DStream的方法素质目标:培养自我学习和持续学习能力,能够及时掌握新技术和工具,并将其应用到实际项目中教学重难点教学重点:基础数据源、高级数据源教学难点:读取数据创建DS(ream教学方法案例分析法、问答法、讨论法、讲授法教学用具电脑、投影仪、多媒体课件、教材教学过程主要教学内容及步骤课前任务【教师】布置课前任务,和学生负责人取得联系,让其提醒同学通过APP或其他学习软件,完成课前任务请大家了解什么是数据源,什么是DSlream.【学生】完成课前

2、任务考勤【教师】使用APP进行签到【学生】班干部报请假人员及原因问题导入(5min)【教师】提出以下问题:什么是数据源?数据源可分为哪些类型?【学生】思考、举手回答传授新知【教师】通过学生的回答引入新知,介绍基础数据源和高级数据源的相关知识一、基础数据源【教师】介绍基础数据源的概念和类型在SparkStreaming中,基础数据源指的是可以用来读取实时数据并创建DStream的常见数据源。这些数据源已经被广泛使用和测试,并且被集成到了SparkStreaming框架中,用户只需调用相应的API即可读取数据。基础数据源包括文件流、套接字流和RDD队列流等。1.文件流在SparkStreaming

3、中,文件流(filestream)是一种可以从本地文件系统或分布式文件系统(如HDFS)中读取数据的输入流。它允许将一个目录视为一个数据源,并读取目录中实时生成或更新的文件。在SParkStreaming中,可以使用textFileStream()方法创建DStream定义一个输入流用于监视HadOOP兼容的文件系统中的新文件,并将其作为文本文件读取。文件必须通过同一文件系统中的另一个位置移动到监视目录中。该方法的基本格式如下。(extFileStream(directory)其中,参数directory表示指定的目录。读取不同文件流创建DStream的参考示例如下。ssc=SIreaming

4、COnIeXl(SC,10)#读取本地文件流dstream_(ext=ssc.IexiFileSlream(file:/spark_dstream)曦取HDFS文件流dstream-hdfs=ssc.textFileStream(hdfs:/spark_dstream)【教师】通过例子,帮助学生掌握文件流的应用【例4-1以读取HDFS文件为例,编写SparkStreaming应用程序实时监视HDFS文件目录,当发现新文件到达后,输出文件中的数据。打开第1个终端,执行以下命令,启动HDFS服务并创建spark_dstream”目录.hadoopbogon$Cdusrlocalhadoopsbin

5、hadoop(三)bogonsbin$./start-dfs.sh#在HDFS上新建一个Hspark_dstreamH目录hadoo(3)bogonsbin$Cdusrlocalhadoopbinhadoopbogonbin$hdfsdfs-mkdirspark-dstream在usrlocalsparkmycodeDSIream”目录下新建3个文件,分别为filel.txl、file2.lxt和file3.ixl,其内容如图4-9所示。耽3,Bfi,elttt保存三*HRio1,aME夕存二X11(O),B3保存三JM-IloveSpark10veHddOoPIloveDStrean1amI

6、earnllgSMra】nlearningHadoopXanlearningDStrcanSparkXSverySiNIeHadoopisverysinpleOStreamisverySinple女本8-3fi,215114人文本8我符3L8。第3行,第7列福人文本.帮表为贡度:8淤3行,第8外Ja入图4-93个文件的内容打开第2个终端,执行以下命令,进入PySpark交互式执行环境,编写代码,监视HDFS文件目录。SparkStreaming实时计算启动后,还未接收到数据时,终端显示的信息如图4-10所示。hadoopbogon$pysparkfrompyspak.streamingimpo

7、rtStreamingContext舱(J建StreamingContext对象,设置批处理时间间隔为20秒ssc=StreamingContext(sc,20)跄J建DStream,监视HDFS文件目录dstream=ssc.textFileStream(hdfs:/spark_dstream)# 打印监懒!1的瘫dstream.pprint()# 启动StreamingContext对象ssc.start()# 等待StreamingContexi对象终止ssc.awaitTermina(ion()图4-10未接收至媵煽时的终端显示信息在第1个终端上执行以下命令,将filel.txt.fi

8、le2.txt和file3.txt文件依次上传到HDFS的wspark-dstreamw目录下。hadoopbogonbin$./hdfsdfs-putusrlocalsparkmycodeDStreamfile1.txtspark-dstreamhadoopbogonbin$./hdfsdfs-putusrlocalsparkmycodeDStreamfile2.txtspark-dstreamhadoopbogonbin$./hdfsdfs-putusrlocalsparkmycodeDStreamfile3.txtspark-dstream在第2个终端可以监视到HDFS不断有数据流入,并

9、输出结果,如图4-11所示。hadoopbogon:X文件(F)嫡辑(E)查看M搜索终端(T)帮助(三)Time:2023-08-0314:02:40IloveSparkIamlearningSparkSparkisverysimpleTime:2023-08-0314:03:00IloveHadoopIamlearningHadoopHadoopisverysimpleTime:2023-08-0314:03:20IloveDStreamIamlearningDStreamDStreamisverysimple图4-11SparkStreaming监视HDFS并输出结果2.套接字流套接字流(

10、socketstream)是SParkStreaming中用于从网络套接字接收数据的输入流。它可以连接到指定的主机和端口,并实时接收通过套接字发送的数据。在SparkStreaming中,可以使用SOCkelTeXlStream()方法读取套接字流创建DStreame该方法的基本格式如下.socketTextStream(hostname,port,StorageLevel)其中,参数的含义如下.(1)hostname:表示要连接的主机名或IP地址。(2)port:表示要连接的端口号。(3)StOrageLeVeI(可选):表示流数据的存储级别,常见的存储级别包括MEMORY-ONLY.MEM

11、oRY_AND_DISK、MEMORY_ONLY_SER和MEMORY_AND_DISK_SER等,默认值为MEMORY_AND_DISK_SER_2e(详见教材)3RDD队列流RDD队列流由一个RDD队列构成,其中每个RDD包含作为输入源数据的批次内容。在SparkStreaming中,可以使用queueStream()方法创建基于RDD队列的DStreame该方法的基本格式如下。queueStream(rdds,OneAtATime,deult)其中,参数的含义如下.(1)rdds:RDD队列。(2)OneAtATime(可选):每次选取一RDD还是一次性选取所有RDDe默认值为True,

12、即每次只选取一个RDDe(3)default(可选):当队列为空时返回的默认值。如果队列中没有可用的RDD时,返回此默认值。(详见教材)【教师】通过例子,帮助学生掌握RDD队列流的应用【例4-2读取RDD队列流创建DStreame首先创建一个RDD队列作为数据源,然后使用queueStream()方法创建DStream定义一介输入流inputstream,SparkStreaming每两秒从RDD队列中获取一批数据,最后输出RDD队列流中的数据,如图4/2所示.hadoopbogon$pysparkfrompyspark.StreamingimportStreamingContextssc=S

13、treamingContext(sc,2)舱U建一个空的RDD队列rddQueue=|foriinrange(5):rddQueue.append(sc.parallelize(range(1.1001),10)跄J建DStream,定义输入流生成RDD队列流inputStream=ssc.queueStream(rddQueue)#打印RDD队列流中的数据inputStream.pprint()ssc.start()ssc.awaitTermination()inputstream.pprint()ssc.start()Time:2023-08-0314:35:5035678910Time:

14、2023-08-0314:35:52234567810图4/2输出RDD队列流中的嫡二、高级数据源【教师】介绍高级数据源的类型和应用除了文件流、套接字流f口RDD队列流等基础数据源外,SparkStreaming还支持KaHa和Kinesis等高级数据源。SparkSlreaming可以让高级雌源产生的数据发送给应用程序,应用程序再对接收到的数据进行实时处理,从而完成一个典型的实时计算过程。1 .KafkaKatla最初由LinkedIn开发,是一种高性能、分布式的消息传递系统。它支持水平伸缩,可以通过添加更多的代理服务器来增加处理能力。此外,Kaki还具有许多特性,使得它在实时数据处理和大数

15、据场景下的应用非常灵活.因此,KaRa被广泛应用于实时流处理、日志收集、大数据分析等领域.在Spark中,可以读取Kafka数据源,实现方法是先使用SparkSession对象的readStream属性返回DaIaSIreanIReader对象;然后使用该对象的formal。方法指定数据源类型为Kafka;接着使用OPtionO方法设置Kafka的相关选项,如Kafka服务器地址和端口(kafka.btstrap.servers),以及要订阅的主题名称(subscribe)等;最后使用IOadO方法加载流数据,并返回一个DataFramee参考示例如下。dstream=SparkSession

16、.readStream.format(kafka).option(kafka.bootstrap.servers,kafka_serverl:port,kafka_server2:port.kafka-server3:port).oPtion(subscribe,IoPijname).load()2 .KinesisKinesis是一项托管的流式数据处理服务,它可用于实时收集、处理和分析大规模数据流。Kinesis可以帮助用户轻松地处理和存储来自各种数据源(如传感器、应用程序日志、网站点击流等)持续生成的数据。在Spark中,可以读取Kinesis数据源,实现方法与读取Kafka数据源类l以。

17、不同的是,format。方法指定的数据源类型为Kinesis;oPtion()方法用于设置Kinesis的相关选项,如Kinesis流的名称、Kinesis数据流所在的AWS区域和Kinesis服务的终端URL参考示例如下。kinesis_df=SParkSeSSion.readSlream.fonat(kinesis).OptionCstreamName,stream_name).option(region,region-name).option(endpointUrl,httpsz,).load()【学生】聆听、思考、理解、记录【教师】介绍“读取电影评分数据创建DStream”的大概流程,

18、安排学生扫描微课二维码观看视频“读取电影评分数据创建DStream“(详见教材),并要求学生进行相应操作电影评分数据存放在usrlocalSPark/mycode/DSiream/movie_daia.ixl”文件中,该文件中包含用户ID、电影名称和电影评分3个字段,如图4-13所示.打开(。)amovie_data保存=X;JIJusrocalsparkIIUSer1,我不是药神,9.0心2,红海行动,7.8第63,烈火英雄,7.6”4,红海行动,7.7user%功夫,8.2心63,我不是药神,9.3user*南京哺京!,8.6USerl,筑影长城,7.7心4,中国合伙人,8.2心2烈火英雄

19、,7.8user4,西游记之大圣归来,8.0图4-13movie_data.txtM文件的瘫(部分)课堂实践为实现更好的案例效果,我们先以电影评分数据为基础设置数据源的自动生成方式.具体实现方法是设置每隔】()秒从Mmovie_data.txt/,文件中随机荻取IO行数据并写入新的日志文件中,新生成的日志文件存放在新建的usrlocalSParkmycodeDStream/movie”目录下。接下来,使用SparkStreaming监视,7usr/local/spark/mycode/DStream/niovieH目录,每隔10秒读取新产生的日志文件,并输出读取到的日志文件内容。1 .自动生成

20、数据源打开PyCharm,在DStream目录下新建MsgProducepy文件,并在该文件中编写应用程序,实现自动生成数据源。实现步骤如下。步骤IA步骤I定义generate_log_file()函数.步骤2A在函数内部,定义两个变量log_file_directory和data_file_path,分别表示存储日志文件的目录和数据文件路径。步骤3A在函数内部,使用无限循环whileTrue不断生成日志文件。(详见教材)【参考代码】importtimeimportrandomdefgenerate_log_file():#存储日志文件的目录和数据文件路径logfiledirectory=7u

21、srlocalsparkmycodeDStreammoviedata_file_path=7usr/local/spark/mycode/DStream/movie_data.txtwhileTrue:# 获取当前时间的时间戳timestamp=int(time.time()# 构建日志文件路径log_file_path=flog_file_directory/log_timestamp.txt# 打印时间戳print(timestamp)# 读取数据文件的所有行withopen(data_file_path,r)asdata-file:lines=data-file.readlines()#

22、 打乱列表中的元素顺序random.shuffle(lines)# 获取列表中的前10个元素SelectedJines=lines:10# 将获取到的数据写入日志文件withopen(log_file_path,w)aslog-file:Iog_file.writelines(selectedjines)# 程序执行暂停】()秒time.sleep(10)if_name_=_main_:#调用generaieog_file。函数开始生成日志文件generate_log_file()【运行结果】在PyCharm中运行代码,控制台显示时间戳提示信息,如图4-14所示.,7usrlocalspark

23、mycodeDStreammovie,目录下生成包含电影评分数据的日志文件含10行数据,如图4-15所示。,每个日志文件中包169147558mycodeDStreammovieQ三=三16910475681691047578169104758811log_log.log.log.log.169147598169104755169104756169104757169104758169104759169147688.txt8.txt8.txt8.txt8.txt1691047618169104762816Q10476389-o9-09-IOg-o9-16910476016910476116910

24、47621691047631691047641691476488.txt8.txt8.txt8.tt8.txt图4-14时间戳提示信息图4-15包含电影评分数据的日志文件(部分)2.读取电影评分数据创建DStream打开PyCharm,在DSlream目录下新建MsgReadpy文件,并在该文件中编写应用程序,监视“usrIOCalsparkmycodeDStream/movie”目录读取电影评分数据。实现步骤如下。步骤I创建SparkContext对象。步骤2A创建StreamingComext对象并将批处理时间间隔设置为10秒。步骤3A定义监视目录directory.(详见教材)【参考代码

25、】frompysparkimportSparkContextfromPySPark.streamingimportStreamingContext船0建SparkContext对象sc=SparkContext(,local2,MovieStreamingApp)#创建StreamingContext对象并将处理间隔设置为IO秒ssc=StreamingContext(sc,10)#定义监视新文件的目录directory=file:/usr/local/spark/mycode/DStream/movie/# 创建DStreamRljstream=ssc.textFileStream(dire

26、ctorj,)# 打印监视目录中新文件的数据file_stream.pprint()# 启动StreamingContext对象ssc.start()# 等待StreamingContext对象终止ssc.awaitTermination()【运行结果】在PyCharm中运行代码,控制台每10秒显示一次监视目录中新文件的数据,如图4-16Tiae:223-8-315:26:46心。2,夺冠,7.6269,飞第人生,7.58。06,飞第人生,7.3USer5,西游记之大圣归来,8.1USeM2,寻龙诀,7.7USerI7,康熙王朝,9.0USerl9,流浪地坳,7.1824,一代宗师,7.8US

27、er34,中国合伙人,7.9USer36,大鱼海臬,8.1Tiee:2623-08-8315:26:56USer8,大鱼海案,8.4user33,三缺好人,8.USeM。,中国机长,8.4866,功夫,7.8USer8,西游记之大至归来,8.1USer24,Tt宗fli,7.8USerI,湄公河行动,6.8USerl7,大鱼海棠,8.31566,飞驰人生,7.38。r3,看王别姬,9.6图4-16监视目录中新文件的数据【学生】自行扫码现看配套微课,按照要求进行操作,如遇问题可询问老师【教师】巡堂辅导,及时解决学生遇到的问题课堂小结【教师】简要总结本节课的要点基础数据源高级数据源【学生】总结回顾知识点作业布置教学反思【教师】布置课后作业(1)完成项目四项目实训中与本课相关的习题;(2)根据课堂知识,课后自己尝试读取数据创建DStreame完成课后任务

展开阅读全文
相关资源
猜你喜欢
相关搜索

当前位置:首页 > 生活休闲 > 在线阅读


备案号:宁ICP备20000045号-2

经营许可证:宁B2-20210002

宁公网安备 64010402000987号