《尚硅谷大数据项目之实时项目4.docx》由会员分享,可在线阅读,更多相关《尚硅谷大数据项目之实时项目4.docx(15页珍藏版)》请在三一办公上搜索。
1、第1章需求分析1.1简介实时预警,是一种经常出现在实时计算中的业务类型。根据日志数据中系统报错异常, 或者用户行为异常的检测,产生对应预警日志。预警日志通过图形化界面的展示,可以提醒 监控方,需要及时核查问题,并采取应对措施。1.2需求说明需求:同一设备,5分钟内三次及以上用不同账号登录并领取优惠券,并且在登录到领 券过程中没有浏览商品。达到以上要求则产生一条预警日志。1.3 预:日志格式同一设备,每分钟只记录一次预警。mid设备iduids领取优惠券登录过的uiditemIds优惠券涉及的商品idevents发生过的行为ts发生预警的时间戳第2章整体流程设计2.1框架流程springboot
2、Kafka clusterElasticSearchSpark streamingspringbootspringbootnglnx2.2开发思路1)从kafka中消费数据,根据条件进行过滤筛选,生成预警日志;2)预警日志保存到ElasticSearch中;3)利用Kibana快速搭建可视化图形界面。第3章实时计算模块3.1筛选条件分析同一设备(分组)5分钟内(窗口)三次不同账号登录(用户)领取优惠券(行为)没有浏览商品(行为)同一设备每分钟只记录一次预警(去重)3.2数据处理流程图3.3代码开发3.3.1事件日志样例类-Eventinfocase class EventInfo(mid:St
3、ring, uid:String, appid:String, area:String, os:String, ch:String, type:String, evid:String, pgid:String, npgid:String, itemid:String, var logDate:String, var logHour:String, var ts:Long)3.3.2预警日志样例类-CouponAlertInfocase class CouponAlertInfo(mid:String,uids:java.util.HashSetString,itemIds:java.util.
4、HashSetString, events:java.util.ListString, ts:Long)3.3.3预警业务类一AlertAppimport com.alibaba.fastjson.JSONimport com.atguigu.gmall.constant.GmallConstantsimportcom.atguigu.gmall2019.realtime.bean.CouponAlertInfo,EventInfoimport com.atguigu.gmall2019.realtime.util.MyEsUtil, MyKafkaUtil import org.apache
5、.kafka.clients.consumer.ConsumerRecordimport org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.DStream, InputDStream import org.apache.spark.streaming.Seconds, StreamingContextimport scala.util.control.Breaks._object AlertApp def main(args: ArrayString): Unit = valsparkConf:SparkCo
6、nf=newSparkConf().setMaster(local*).setAppName(event_app)val ssc = new StreamingContext(sparkConf,Seconds(5)val inputDstream: InputDStreamConsumerRecordString, String = MyKafkaUtil.getKafkaStream(GmallConstants.KAFKA_TOPIC_EVENT,ssc)/1格式转换成样例类val eventInfoDstream: DStreamEventInfo = inputDstream.map
7、 record =val jsonstr: String = record.value()val eventInfo:EventInfo = JSON.parseObject(jsonstr,classOfEventInfo)eventInfo/2开窗口val eventInfoWindowDstream:DStreamEventInfo =eventInfoDstream.window(Seconds(30),Seconds(5)/3同一设备分组val groupbyMidDstream: DStream(String, IterableEventInfo) = eventInfoWindo
8、wDstream.map(eventInfo=(eventInfo.mid,eventInfo). groupByKey()/4判断预警/在一个设备之内/1三次及以上的领取优惠券(evid coupon)且uid都不相同/2没有浏览商品(evid clickItem)val checkCouponAlertDStream: DStream(Boolean, CouponAlertInfo) =groupbyMidDstream.map case (mid, eventInfoItr)=val couponUidsSet = new util.HashSetString()val itemIds
9、Set = new util.HashSetString() val eventIds = new util.ArrayListString() var notClickItem: Boolean = true breakable(for (eventInfo: EventInfo =3& notClickItem,CouponAlertInfo(mid, couponUidsSet, itemIdsSet, eventIds, System.currentTimeMillis() /过滤 val filteredDstream: DStream(Boolean, CouponAlertInf
10、o) = checkCouponAlertDStream.filter_._1/增加一个id用于保存到es的时候进行去重操作val alertInfoWithIdDstream: DStream(String, CouponAlertInfo) = filteredDstream.map case (flag, alertInfo) = val period: Long = alertInfo.ts / 1000L / 60L val id: String = alertInfo.mid + _ + period.toString (id, alertInfo) alertInfoWithId
11、Dstream.foreachRDDrdd= rdd.foreachPartitionalertInfoWithIdIter= MyEsUtil.insertBulk(GmallConstants.ES_INDEX_COUPON_ALERT ,alertIn foWithIdIter.toList) ssc.start() ssc.awaitTermination()第4章ElasticSearch的保存4.1 ES集群搭建参考ElasticSearch集群安装手册4.2 ES上建好索引其实即使不提前建立索引,ES也是可以将数据保存进去的。这种情况,ES会根据第一 条要插入的数据进行推断,但是
12、ES的这种推断往往不够准确。比如:要区分字段要不要进行索引,字段要不要进行分词,如果分词选用哪个分词器等 等。建立索引语句(包含Mapping)PUT gmall_coupon_alertmappings: _doc”:properties:mid:type:keyword,uids:type:keyword,itemIds:type:keyword,events:type:keyword,ts:type:date4.3保存ES4.3.1 pom.xmlio.searchboxjest5.3.3net.java.dev.jnajna4.5.2org.codehaus.janinocommons
13、-compiler2.7.8 4.3.2保存ES的工具类import java.utilimport java.util.Objectsimport io.searchbox.client.JestClient, JestClientFactoryimport io.searchbox.client.config.HttpClientConfig import io.searchbox.core.Bulk, BulkResult, Index import collection.JavaConversions._object MyEsUtil private val ES_HOST = htt
14、p:/hadoop102”private val ES_HTTP_PORT = 9200private var factory:JestClientFactory = null/*获取客户端* return jestclient */def getClient: JestClient = if (factory = null) build() factory.getObject/*关闭客户端 */def close(client: JestClient): Unit = if (!Objects.isNull(client) tryclient.shutdownClient() catch c
15、ase e: Exception = e.printStackTrace() /*建立连接 */private def build(): Unit = factory = new JestClientFactory factory.setHttpClientConfig(newHttpClientConfig.Builder(ES_HOST+:+ES_HTTP_PORT).multiThreaded(true).maxTotalConnection(20) / 连接总数 .connTimeout(10000).readTimeout(10000).build)/批量插入数据到ESdef ins
16、ertBulk(indexName:String,docList:List(String,Any): Unit =if(docList.size0) val jest: JestClient = getClient valbulkBuilder=newBulk.Builder().defaultIndex(indexName).defaultType(_doc) for (id,doc) println(ex.toString) finally close(jest) println(保存”+ items.size() + ”条数据”) for (item New step5.2 建立 vis
17、ualize5.2.1新增一个可视化图5.2.2选择一个图形类型本案例选择柱形图(Vertical BhrNew VisualizationQ AlterSelect a visualization typeGskjgcGomvNhtefls Mnip H-anEontak Sar区坦回&LineMftrkdoMvnMctriii:PicHDMEMapT占昌少州TiiTi刷ImVeig一面toVErtical Bar !sul BuiRdciStart creating /ojrn5U3i :atior &;, ielectin.f; a 序网 for that visualization.5
18、.2.3作图5.2.3.1纵坐标MetricsY AxisAggregationCountCustom LabelSelect metrics typeY-AxisDoi Sz:e聚合方法AggregationCustomer Lable纵坐标的说明标签5.2.3.2横坐标erms helpTermsFieldiiemldsOrder Bymeti ic:操埋次数SifeOrderDescendGroup other values in separate bucket (?)Shaw missing values Cu ic om Lei bet霍无ID*Aggregation分组字段Fiel
19、d分组字段ORDER by排序方式Order升序降序Size列出前n名Customer Lable纵坐标的说明标签e3h Ssecond5 Time RangeQuick Relative Absolute RecentFromSet T NowT。Set To New20106-29 13:30:49.0542019-0&29 13:45:49.054sunMenTueWedThuFrisatsunMon rue wed rhuFrt5机01如 ”.K01020304。50607OS03如1011121314151617IS1920212223242526272829 1293030Go运行
20、产生效果图 i*eg.AEhrtliemlcfsOrder 盼OrdiwShe-De&tend 曾 3Ggup Mlhiti9 VdluHtfl iri lepArALC: KrkHShow mlsUnj values 缶Cus-T-DM Ljbel* Adwmred5.3 建立 DashboardDashboard是一个可以放很多个可视化图的大仪表盘,你可以把之前设计好的多个可视 化图,放置在一个仪表盘中,一起显示。5.3.1 新增一个 Dashboard5.3.2加入多个可视化图5.3.3形成一个含多个图的仪表盘5.3.4最后保存5.4分享到网页中点击最上方的share按钮 EMBEDCODEG*nftfre the lln-k as可以把剪切板中的iframe代码嵌入到网页代码中这样就可以在一张网页中显示kibana中的仪表盘