Flink原理与实践-DataStream-API的介绍和使用课件.pptx

上传人:牧羊曲112 文档编号:1284756 上传时间:2022-11-03 格式:PPTX 页数:39 大小:1.24MB
返回 下载 相关 举报
Flink原理与实践-DataStream-API的介绍和使用课件.pptx_第1页
第1页 / 共39页
Flink原理与实践-DataStream-API的介绍和使用课件.pptx_第2页
第2页 / 共39页
Flink原理与实践-DataStream-API的介绍和使用课件.pptx_第3页
第3页 / 共39页
Flink原理与实践-DataStream-API的介绍和使用课件.pptx_第4页
第4页 / 共39页
Flink原理与实践-DataStream-API的介绍和使用课件.pptx_第5页
第5页 / 共39页
点击查看更多>>
资源描述

《Flink原理与实践-DataStream-API的介绍和使用课件.pptx》由会员分享,可在线阅读,更多相关《Flink原理与实践-DataStream-API的介绍和使用课件.pptx(39页珍藏版)》请在三一办公上搜索。

1、第四章DataStream API的介绍和使用,第四章DataStream API的介绍和使用,Flink程序的骨架结构,初始化运行环境读取一到多个Source数据源根据业务逻辑对数据流进行Transformation转换将结果输出到Sink调用作业执行函数,Flink程序的骨架结构初始化运行环境,执行环境是作业与集群交互的入口设置并行度关闭算子链时间、Checkpoint流处理和批处理的执行环境不一样Java、Scala两套API,设置执行环境,/ 创建Flink执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.

2、getExecutionEnvironment();,env.setParallelism(2);env.disableOperatorChaining();,执行环境是作业与集群交互的入口设置执行环境/ 创建Flin,Source、Transformation和Sink,Source读取数据源统称为Source文件系统、消息队列、数据库等Transformation使用Flink提供的各类函数,进行有状态的计算数据流的分组、窗口和聚合操作等Sink将计算结果输出到外部系统,统称为Sink目的地可以是文件系统、消息队列、数据库等,Source、Transformation和SinkSour,F

3、link是延迟执行(Lazy Evaluation)的调用execute()方法,Flink才会真正执行否则无法得到计算结果字符串参数为当前作业名,执行,/ executeenv.execute(kafka streaming word count);,Flink是延迟执行(Lazy Evaluation)的执行,单数据流转换基于Key的分组转换多数据流转换数据重分布转换DataStream 泛型T为数据流中每个元素的类型,四类Tranformation转换,单数据流转换四类Tranformation转换,每个输入元素对应一个输出元素重写MapFunction或RichMapFunctionM

4、apFunction T为输入类型O为输出类型实现其中的map()虚方法主逻辑中调用该函数,单数据流转换 - map,FunctionalInterface public interface MapFunction extends Function, Serializable / 调用这个API就是继承并实现这个虚函数 O map(T value) throws Exception; ,/ 第一个泛型是输入类型,第二个泛型是输出类型 public static class DoubleMapFunction implements MapFunction Override public Stri

5、ng map(Integer input) return function input : + input + , output : + (input * 2); ,DataStream functionDataStream = dataStream.map(new DoubleMapFunction();,MapFunction源代码,一个MapFunction的实现,每个输入元素对应一个输出元素单数据流转换 - mapFun,直接继承接口类并实现map虚方法上页所示使用匿名类使用Lambda表达式,单数据流转换 - map,/ 匿名类 DataStream anonymousDataStr

6、eam = dataStream.map(new MapFunction() Override public String map(Integer input) throws Exception return anonymous function input : + input + , output : + (input * 2); );,/ 使用Lambda表达式 DataStream lambdaStream = dataStream .map(input - lambda input : + input + , output : + (input * 2);,匿名类实现MapFuncti

7、on,Lambda表达式实现MapFunction,直接继承接口类并实现map虚方法单数据流转换 - map/,对输入元素进行过滤继承并实现FilterFunction或RichFilterFunction重写filter虚方法True 保留False 过滤,单数据流转换 - filter,DataStream dataStream = senv.fromElements(1, 2, -3, 0, 5, -9, 8); / 使用 - 构造Lambda表达式 DataStream lambda = dataStream.filter ( input - input 0 );,ublic stat

8、ic class MyFilterFunction extends RichFilterFunction / limit参数可以从外部传入 private Integer limit; public MyFilterFunction(Integer limit) this.limit = limit; Override public boolean filter(Integer input) return input this.limit; ,Lambda表达式实现FilterFunction,实现FilterFunction,对输入元素进行过滤单数据流转换 - filterDataSt,与m

9、ap()相似输出零个、一个或多个元素可对列表结果展平,单数据流转换 - flatMap,苹果,梨,香蕉.map(去皮),去皮苹果,去皮梨,去皮香蕉,map,flatMap,苹果,梨,香蕉.flatMap(切碎),苹果碎片1, 苹果碎片2, 梨碎片1,梨碎片2, 梨碎片3,香蕉碎片1,苹果碎片1, 苹果碎片2, 梨碎片1,梨碎片2, 梨碎片3,香蕉碎片1,与map()相似单数据流转换 - flatMap苹果,梨,,使用Lambda表达式Collector用来收集元素flatMap()虚方法中不使用return返回数据,使用Collector收集返回数据Collector中的泛型String为返回

10、数据类型将flatMap()看做map()和filter()更一般的形式map()和filter()的语义更明确,单数据流转换 - flatMap,DataStream dataStream = senv.fromElements(Hello World, Hello this is Flink); / split函数的输入为 Hello World 输出为 Hello 和 World 组成的列表 Hello, World / flatMap将列表中每个元素提取出来 / 最后输出为 Hello, World, Hello, this, is, Flink DataStream words =

11、dataStream.flatMap ( (String input, Collector collector) - for (String word : input.split( ) collector.collect(word); ).returns(Types.STRING);,使用Lambda表达式单数据流转换 - flatMapDat,数据分组后可进行聚合操作keyBy()将一个DataStream转化为一个KeyedStream聚合操作将KeyedStream转化为DataStreamKeyedStream继承自DataStream,基于Key的分组转换,数据分组后可进行聚合操作基

12、于Key的分组转换,根据某种属性或数据的某些字段对数据进行分组对一个分组内的数据进行处理股票:相同股票代号的数据分组到一起相同Key的数据被分配到同一算子实例上需要指定Key数字位置字段名KeySelector,基于Key的分组转换 - keyBy,DataStream dataStream = senv.fromElements( Tuple2.of(1, 1.0), Tuple2.of(2, 3.2), Tuple2.of(1, 5.5), Tuple2.of(3, 10.0), Tuple2.of(3, 12.5); / 使用数字位置定义Key 按照第一个字段进行分组 DataStrea

13、m keyedStream = dataStream.keyBy(0).sum(1);,根据某种属性或数据的某些字段对数据进行分组基于Key的分组转,KeySelector重写getKey()方法,单数据流转换 - keyBy,/ IN为数据流元素,KEY为所选择的Key FunctionalInterface public interface KeySelector extends Function, Serializable / 选择一个字段作为Key KEY getKey(IN value) throws Exception; ,public class Word public Stri

14、ng word; public int count;,/ 使用KeySelector DataStream keySelectorStream = wordStream.keyBy(new KeySelector () Override public String getKey(Word in) return in.word; ).sum(count);,KeySelector源码,一个KeySelector的实现,KeySelector单数据流转换 - keyBy/ IN,sum()、max()、min()等指定字段,对该字段进行聚合KeySelector流数据上的聚合实时不断输出到下游状态

15、存储中间数据,单数据流转换 Aggregations,sum()、max()、min()等单数据流转换 Agg,将某个字段加和结果保存到该字段上不关心其他字段的计算结果,单数据流转换 sum,DataStream tupleStream = senv.fromElements( Tuple3.of(0, 0, 0), Tuple3.of(0, 1, 1), Tuple3.of(0, 2, 2), Tuple3.of(1, 0, 6), Tuple3.of(1, 1, 7), Tuple3.of(1, 0, 8); / 按第一个字段分组,对第二个字段求和,打印出来的结果如下: / (0,0,0)

16、 / (0,1,0) / (0,3,0) / (1,0,6) / (1,1,6) / (1,1,6) DataStream sumStream = tupleStream.keyBy(0).sum(1);,将某个字段加和单数据流转换 sumDataStream,max()对该字段求最大值结果保存到该字段上不保证其他字段的计算结果maxBy()对该字段求最大值其他字段保留最大值元素的值,单数据流转换 max / maxBy,DataStream tupleStream = senv.fromElements( Tuple3.of(0, 0, 0), Tuple3.of(0, 1, 1), Tup

17、le3.of(0, 2, 2), Tuple3.of(1, 0, 6), Tuple3.of(1, 1, 7), Tuple3.of(1, 0, 8);,/ 按第一个字段分组,对第三个字段求最大值max,打印出来的结果如下: / (0,0,0) / (0,0,1) / (0,0,2) / (1,0,6) / (1,0,7) / (1,0,8) DataStream maxStream = tupleStream.keyBy(0).max(2);,/ 按第一个字段分组,对第三个字段求最大值maxBy,打印出来的结果如下: / (0,0,0) / (0,1,1) / (0,2,2) / (1,0,

18、6) / (1,1,7) / (1,0,8) DataStream maxByStream = tupleStream.keyBy(0).maxBy(2);,max()单数据流转换 max / maxByDataS,比Aggregation更通用在KeyedStream上生效接受两个输入,生成一个输出两两合一地汇总操作,基于Key的分组转换 - reduce,比Aggregation更通用基于Key的分组转换 - re,实现ReduceFunction,基于Key的分组转换 - reduce,public static class MyReduceFunction implements Red

19、uceFunction Override public Score reduce(Score s1, Score s2) return Score.of(s1.name, Sum, s1.score + s2.score); ,DataStream dataStream = senv.fromElements( Score.of(Li, English, 90), Score.of(Wang, English, 88), Score.of(Li, Math, 85), Score.of(Wang, Math, 92), Score.of(Liu, Math, 91), Score.of(Liu

20、, English, 87); / 实现ReduceFunction DataStream sumReduceFunctionStream = dataStream .keyBy(name) .reduce(new MyReduceFunction();,/ 使用 Lambda 表达式 DataStream sumLambdaStream = dataStream .keyBy(name) .reduce(s1, s2) - Score.of(s1.name, Sum, s1.score + s2.score);,实现ReduceFunction基于Key的分组转换 - r,将多个同类型的Da

21、taStream合并为一个DataStream数据按照先进先出(FIFO)合并,多数据流转换 - union,DataStream shenzhenStockStream = . DataStream hongkongStockStream = . DataStream shanghaiStockStream = . DataStream unionStockStream = shenzhenStockStream.union(hongkongStockStream, shanghaiStockStream);,将多个同类型的DataStream合并为一个DataS,只能连接两个DataStr

22、eam数据流两个数据流类型可以不一致两个DataStream经过connect()之后转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态应用场景为:使用一个控制流对另一个数据流进行控制,多数据流转换 - connect,只能连接两个DataStream数据流多数据流转换 - co,重写CoMapFunction或CoFlatMapFunction三个泛型,分别对应第一个输入流的数据类型、第二个输入流的数据类型和输出流的数据类型对于CoFlatMapFunction,flatMap1()方法处理第一个流的数据,fla

23、tMap2()方法处理第二个流的数据可以做到类似SQL Join的效果,多数据流转换 - connect,/ IN1为第一个输入流的数据类型 / IN2为第二个输入流的数据类型 / OUT为输出类型 public interface CoFlatMapFunction extends Function, Serializable / 处理第一个流的数据 void flatMap1(IN1 value, Collector out) throws Exception; / 处理第二个流的数据 void flatMap2(IN2 value, Collector out) throws Excep

24、tion; ,/ CoMapFunction三个泛型分别对应第一个流的输入、第二个流的输入,map之后的输出 public static class MyCoMapFunction implements CoMapFunction Override public String map1(Integer input1) return input1.toString(); Override public String map2(String input2) return input2; ,CoFlatMapFunction源代码,一个CoFlatMapFunction实现,重写CoMapFunct

25、ion或CoFlatMapFunct,并行度逻辑视图中的算子被切分为多个算子子任务每个算子子任务处理一部分数据可以在整个作业的执行环境层面设置也可以对某个算子单独设置,并行度,StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); / 获取当前执行环境的默认并行度 int defaultParalleism = senv.getParallelism(); / 设置所有算子的并行度为4,表示所有算子的并行执行的实例数为4 senv.setParallelism(4);,在执行

26、环境中设置并行度:,对某个算子单独设置:,dataStream.map(new MyMapper().setParallelism(defaultParallelism * 2);,并行度并行度StreamExecutionEnvironme,默认情况下,数据自动分布到多个实例(或者称之为分区)上手动在多个实例上进行数据分配避免数据倾斜输入是DataStream,输出也是DataStream,数据重分布,dataStream.shuffle();,基于正态分布,将数据随机分配到下游各算子实例上:,dataStream.broadcast();,数据会被复制并广播发送给下游的所有实例上:,dat

27、aStream.global();,将所有数据发送给下游算子的第一个实例上:,默认情况下,数据自动分布到多个实例(或者称之为分区)上数据重,rebalance()使用Round-Ribon思想将数据均匀分配到各实例上rescale()就近发送给下游每个实例,数据重分布,rebalance()将数据轮询式地分布到下游子任务上,当上游有2个子任务、下游有4个子任务时使用rescale(),rebalance()使用Round-Ribon思想将数据均,artitionCustom()自定义数据重分布逻辑PartitionerK中泛型K为根据哪个字段进行分区对一个Score类型数据流重分布,希望按照i

28、d均匀分配到下游各实例,那么泛型K就为id的数据类型Long重写partition()方法,数据重分布,FunctionalInterface public interface Partitioner extends java.io.Serializable, Function / 根据key决定该数据分配到下游第几个分区(实例) int partition(K key, int numPartitions); ,/* * Partitioner 其中泛型T为指定的字段类型 * 重写partiton函数,并根据T字段对数据流中的所有元素进行数据重分配 * */ public static cl

29、ass MyPartitioner implements Partitioner private Random rand = new Random(); private Pattern pattern = Ppile(.*d+.*); /* * key 泛型T 即根据哪个字段进行数据重分配,本例中是Tuple2(Int, String)中的String * numPartitons 为当前有多少个并行实例 * 函数返回值是一个Int 为该元素将被发送给下游第几个实例 * */ Override public int partition(String key, int numPartitions

30、) int randomNum = rand.nextInt(numPartitions / 2); Matcher m = pattern.matcher(key); if (m.matches() return randomNum; else return randomNum + numPartitions / 2; ,/ 对(Int, String)中的第二个字段使用 MyPartitioner 中的重分布逻辑 DataStream partitioned = dataStream.partitionCustom(new MyPartitioner(), 1);,Partitioner源

31、码,一个Partitioner的实现,artitionCustom()数据重分布Functio,数据传输、持久化序列化:将内存对象转换成二进制串、网络可传输或可持久化反序列化:将二进制串转换为内存对象,可直接在编程语言中读写和操作常见序列化方式:JSONJava、Kryo、Avro、Thrift、ProtobufFlink开发了自己的序列化框架更早地完成类型检查节省数据存储空间,序列化和反序列化,数据传输、持久化序列化和反序列化,基础类型Java、Scala基础数据类型数组复合类型Scala case classJava POJOTuple辅助类型Option、List、Map泛型和其他类型G

32、eneric,Flink支持的数据类型,基础类型Flink支持的数据类型,TypeInformaton用来表示数据类型,创建序列化器每种数据类型都对应一个TypeInfomationTupleTypeInfo、PojoTypeInfo ,TypeInformation,TypeInformaton用来表示数据类型,创建序列化器T,Flink会自动推断类型,调用对应的序列化器,对数据进行序列化和反序列化,类型推断和序列化,package mon.typeinfo; public class Types / java.lang.Void public static final TypeInform

33、ation VOID = BasicTypeInfo.VOID_TYPE_INFO; / java.lang.String public static final TypeInformation STRING = BasicTypeInfo.STRING_TYPE_INFO; / java.lang.Boolean public static final TypeInformation BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; / java.lang.Integer public static final TypeInformation INT =

34、BasicTypeInfo.INT_TYPE_INFO; / java.lang.Long public static final TypeInformation LONG = BasicTypeInfo.LONG_TYPE_INFO; . ,一些基础类型的TypeInformation:,Flink会自动推断类型,调用对应的序列化器,对数据进行序列,Types.STRING 是用来表示 java.lang.String 的TypeInformationTypes.STRING 被定义为 BasicTypeInfo.STRING_TYPE_INFOSTRING_TYPE_INFO :使用何种序

35、列化器和比较器,类型推断和序列化,public static final BasicTypeInfo STRING_TYPE_INFO = new BasicTypeInfo(String.class, new Class, StringSerializer.INSTANCE, StringComparator.class);,STRING_TYPE_INFO定义使用何种序列化器和比较器:,Types.STRING 是用来表示 java.lang.S,在声明式文件中定义Schema使用工具将Schema转换为Java可用的类Avro Specific生成的类与POJO类似有getter、set

36、ter方法在Flink中可以像使用POJO一样使用Avro Specific模式Avro Generic不生成具体的类用GenericRecord封装所有用户定义的数据结构必须给Flink提供Schema信息,Avro, namespace: org.apache.flink.tutorials.avro, type: record, name: MyPojo, fields: name: id, type: int , name: name, type: string ,Avro声明式文件:,在声明式文件中定义SchemaAvro Avro声明式文件,Kryo是大数据领域经常使用的序列化框架

37、Flink无法推断出数据类型时,将该数据类型定义为GenericTypeInfo,使用Kryo作为后备选项进行序列化最好实现自己的序列化器,并对数据类型和序列化器进行注册Kryo在有些场景效率不高env.getConfig.disableGenericTypes()禁用Kryo,可以定位到具体哪个类型无法被Flink自动推断,然后针对该类型创建更高效的序列化器,Kryo,注册数据类型和序列化器:,/ 将MyCustomType类进行注册 env.getConfig().registerKryoType(MyCustomType.class); / 或者使用下面的方式并且实现自定义序列化器 en

38、v.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class);,static class MyClassSerializer extends Serializer implements Serializable private static final long serialVersionUID = . Override public void write(Kryo kryo, Output output, MyCustomType myCustomType) . Overr

39、ide public MyCustomType read(Kryo kryo, Input input, Class type) . ,Kryo是大数据领域经常使用的序列化框架Kryo注册数据类型,与Avro Specific模式相似,使用声明式语言定义Schema,使用工具将声明式语言转化为Java类有人已经实现好Kryo的序列化器案例:MyCustomType是使用Thrift工具生成的Java类,TBaseSerializer是com.twitter:chill-thrift包中别人实现好的序列化器,该序列化器基于Kryo的Serializer。注意在pom.xml中添加相应的依赖,Th

40、rift、Protobuf,/ Google Protobuf / MyCustomType类是使用Protobuf生成的Java类 / ProtobufSerializer是别人实现好的序列化器 env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, ProtobufSerializer.class); / Apache Thrift / MyCustomType是使用Thrift生成的Java类 / TBaseSerializer是别人实现好的序列化器 env.getConfig().addDefaultKryo

41、Serializer(MyCustomType.class, TBaseSerializer.class);,与Avro Specific模式相似,使用声明式语言定义Sc,Flink的数据类型:Java、Scala、Table API分别有自己的数据类型体系绝大多数情况下,程序员不需要关心使用何种TypeInformation,只需要使用自己所需的数据类型Flink会做类型推断、选择对应的序列化器当自动类型推断失效,用户需要关注TypeInformation数据类型选择:需要考虑:上下游的数据结构、序列化器的性能、状态数据的持续迭代能力POJO和Tuple等内置类型性能更好Avro、Thrif

42、t和Protobuf对上下游数据的兼容性更好,不需要在Flink应用中重新设计一套POJOPOJO和Avro对Flink状态数据的持续迭代更友好,数据类型小结,Flink的数据类型:Java、Scala、Table AP,用户自定义函数的三种方式:继承并实现函数类使用Lambda表达式继承并实现Rich函数类,用户自定义函数,用户自定义函数的三种方式:用户自定义函数,对于map()、flatMap()、reduce()等函数,我们可以实现MapFunction、FlatMapFunction、ReduceFunction等interface接口。以FlatMapFunction函数式接口为例:

43、继承了Flink的Function函数式接口函数在运行过程中要发送到各个实例上,发送前后要进行序列化和反序列化,一定要保证函数内的所有内容都可以被序列化两个泛型T和O,T是输入,O是输出,要设置好输入和输出数据类型,否则会报错重写虚方法flatMap()Collector收集输出数据,函数类,package mon.functions; FunctionalInterface public interface FlatMapFunction extends Function, Serializable void flatMap(T value, Collector out) throws Ex

44、ception; ,/ 使用FlatMapFunction实现过滤逻辑,只对字符串长度大于 limit 的内容进行词频统计 public static class WordSplitFlatMap implements FlatMapFunction private Integer limit; public WordSplitFlatMap(Integer limit) this.limit = limit; Override public void flatMap(String input, Collector collector) throws Exception if (input.l

45、ength() limit) for (String word: input.split( ) collector.collect(word); DataStream dataStream = senv.fromElements(Hello World, Hello this is Flink); DataStream functionStream = dataStream.flatMap(new WordSplitFlatMap(10);,FlatMapFunction源码,一个FlatMapFunction实现,对于map()、flatMap()、reduce()等函数,,简洁紧凑Scal

46、a对Lambda表达式支持更好Java 8之后也开始支持Lambda表达式,有类型擦除问题使用returns 提供类型信息,Lambda表达式,DataStream words = dataStream.flatMap ( (String input, Collector collector) - for (String word : input.split( ) collector.collect(word); ) / 提供类型信息以解决类型擦除问题 .returns(Types.STRING);,val lambda = dataStream.flatMap (value: String,

47、 out: CollectorString) = if (value.size 10) value.split( ).foreach(out.collect) ,Scala:,Java:,简洁紧凑Lambda表达式DataStreamString,RichMapFunction、 RichFlatMapFunction、 RichReduceFunction增加了更多功能:open()方法:初始化close()方法:算子最后执行这个方法,可以释放一些资源getRuntimeContext()方法:获取算子子任务的运行时上下文累加器例子:分布式计算环境下,计算是分布在多台节点上的,每个节点处理一

48、部分数据,使用for循环无法满足累加器功能,Rich函数类,/ 实现RichFlatMapFunction类 / 添加了累加器 Accumulator public static class WordSplitRichFlatMap extends RichFlatMapFunction private int limit; / 创建一个累加器 private IntCounter numOfLines = new IntCounter(0); public WordSplitRichFlatMap(Integer limit) this.limit = limit; Override pub

49、lic void open(Configuration parameters) throws Exception super.open(parameters); / 在RuntimeContext中注册累加器 getRuntimeContext().addAccumulator(num-of-lines, this.numOfLines); Override public void flatMap(String input, Collector collector) throws Exception / 运行过程中调用累加器 this.numOfLines.add(1); if(input.length() limit) for (String word: input.split( ) collector.collect(word); ,RichMapFunction、 RichFlatMapFu,

展开阅读全文
相关资源
猜你喜欢
相关搜索

当前位置:首页 > 生活休闲 > 在线阅读


备案号:宁ICP备20000045号-2

经营许可证:宁B2-20210002

宁公网安备 64010402000987号