在 Flink 的流式处理中,会涉及到时间的不同概念 Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间。 Ingestion Time:是数据进入 Flink 的时间 Processing Time:被算子处理时的时间。默认的时间属性就是Processing Time。 我们来举个Time时间的例子↓↓↓ 例如: 现在系统产生了一条日志,日志内容如下: 2020-02-11 15:15:06.218 INFO kernel:NMI watchdog: BUG: soft lockup       对于业务需求来说,如果我们要统计 1min 内的故障日志个数,那么这三个时间中哪个时间是最有意义的?       答案显然是 EventTime,因为我们要根据日志的生成时间进行统计 Window的介绍,参考自 Flink 官方文档        Streaming流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的无限的数据集。而 Window 是一种切割无限数据集为有限块进行处理的手段。       1.没有 Window 操作之前,我们对数据的处理是:来一条计算一条。       Window是无限数据流处理的核心,Window 可以将一个无限的 stream 流拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。  Window 可以分成两类: 1.CountWindow:按照指定的数据条数生成一个Window,与时间无关 对于 TimeWindow,可以根据窗口实现原理的不同分成三类:1.滚动窗口(Tumbling Window)、2.滑动窗口(Sliding Window)、3.会话窗口(Session Window) 1.概念: 2.特点: 3.图示:        例如:如果你指定了一个 5 分钟大小的滚动窗口,窗口的创建如下图所示: 4.使用场景: 1.概念: 2.特点: 3.图示:        例如:你有10分钟的窗口和5分钟的滑动,那么每个窗口中5分钟的窗口里包含着上个10分钟产生的数据,如下图所示: 4.使用场景: 1.概念: 2.特点: 3.图示:        一个session窗口通过一个session间隔来配置,这个session间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的session将关闭并且后续的元素将被分配到新的session窗口中去。 4.使用场景:       Window窗口的使用,分为如下两种类型:1.Keyed Windows(分组窗口)、2.Non-Keyed Windows(未分组窗口)。   两者之间唯一的区别是:使用 keyBy(…) 分组,后面使用 .window(…);不使用 keyBy(…) 分组,后面使用 .windowAll(…); 1.Keyed Windows(分组窗口) 2.Non-Keyed Windows(未分组窗口) 在上面,方括号([…])中的命令是可选的。这表明Flink允许您以多种不同方式自定义窗口逻辑,从而使其最适合您的需求。 开启窗口操作实例,如下:   按照指定的数据条数生成一个Window,与时间无关。     countWindow的使用,也分为 Keyed 和 Non-Keyed 两种情况:         将数据依据固定的窗口长度,对数据进行切片。通常用来计算窗口滚动一次,在窗口中的数据。     滚动窗口(Tumbling Window)的使用,也分为 Keyed 和 Non-Keyed 两种情况: 和上面一样的代码 测试结果:         滑动窗口是固定窗口的更广义的一种形式,滑动窗口由1.固定的窗口长度和2.滑动步长组成。通常用来计算窗口中数据的变化趋势。(滑动步长越短,绘出的曲线图就更加的平滑,而不是突兀的)     滚动窗口(Sliding Window)的使用,也分为 Keyed 和 Non-Keyed 两种情况: 和上面一样的代码 滚动窗口流程示意图   设置窗口长度为10s,5s滚动一次           由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于 Web 应用的 session,也就是一段时间没有接收到新的数据就会生成新的窗口。通常用来计算两条数据之间的间隔,如果大于指定的时间,前面的数据就划分一个Window。     会话窗口(Session Window)的使用,也分为 Keyed 和 Non-Keyed 两种情况:   5s 内没有数据输入时,执行输出操作          1 Time


2.Window
2.1 Window概念
       2.有了 Window 操作之后,我们便可以使用 Window 操作,将无界的数据流通过划分窗口,把某段时间的数据当做有界的数据流,然后对这部分有界的数据流进行计算。2.2 Window 类型划分
 2.TimeWindow:按照时间生成Window2.2.1 滚动窗口
        将数据依据固定的窗口长度,对数据进行切片。通常用来计算窗口滚动一次,在窗口中的数据。
        时间对齐,窗口长度固定,没有重叠
        滚动窗口分配器,将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小(window size),并且不会出现重叠。
     适合对固定时间数据进行计算。比如:
           ①对 BI 数据进行统计(对每个时间段的数据进行聚合计算)
           ②统计指定时间段的在线用户数(比如时间段为:5分钟)2.2.2 滑动窗口
        滑动窗口是固定窗口的更广义的一种形式,滑动窗口由1.固定的窗口长度和2.滑动步长组成。通常用来计算窗口中数据的变化趋势。(滑动步长越短,绘出的曲线图就更加的平滑,而不是突兀的)
        时间对齐,窗口长度固定,有重叠
        滑动窗口分配器将元素分配到固定长度的窗口中。与滚动窗口类似,窗口的大小由窗口大小参数来配置,窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,元素会被分配到多个窗口中(即:数据会被重复计算)
     适合对最近一段时间内数据的统计。比如:
           ①股票交易上升势头(最近30分钟,哪个股票上升快等问题)
           ②某接口最近 5min 的失败率来决定是否报警2.2.3 会话窗口
        由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于 Web 应用的 session,也就是一段时间没有接收到新的数据就会生成新的窗口。通常用来计算两条数据之间的间隔,如果大于指定的时间,前面的数据就划分一个Window。
        时间无对齐
        session窗口分配器通过session活动来对元素进行分组,session 窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定开始时间和结束时间的情况。相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那么这个窗口就会关闭。
  
        Web 应用中的 session2.3 Window 类型示例
2.3.0 如何开启窗口操作
   
stream        .keyBy(...)               <-  keyed versus non-keyed windows        .window(...)              <-  required: "assigner"       [.trigger(...)]            <-  optional: "trigger" (else default trigger)       [.evictor(...)]            <-  optional: "evictor" (else no evictor)       [.allowedLateness(...)]    <-  optional: "lateness" (else zero)       [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)        .reduce/aggregate/fold/apply()      <-  required: "function"       [.getSideOutput(...)]      <-  optional: "output tag"  
stream        .windowAll(...)           <-  required: "assigner"       [.trigger(...)]            <-  optional: "trigger" (else default trigger)       [.evictor(...)]            <-  optional: "evictor" (else no evictor)       [.allowedLateness(...)]    <-  optional: "lateness" (else zero)       [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)        .reduce/aggregate/fold/apply()      <-  required: "function"       [.getSideOutput(...)]      <-  optional: "output tag" In the above, the commands in square brackets ([…]) are optional. This reveals that Flink allows you to customize your windowing logic in many different ways so that it best fits your needs.  
 //1.Non-Keyed(未分组使用 windowAll()开启窗口) SingleOutputStreamOperator<Tuple2<Integer, Integer>> mapOperatorStream =         source.map(Integer::parseInt).map(num -> Tuple2.of(num, 1)).returns(Types.TUPLE(Types.INT, Types.INT));  mapOperatorStream .windowAll()                  //2.Keyed(已分组使用 window()开启窗口) KeyedStream<Tuple2<Integer, Integer>, Tuple> keyedStream = mapOperator.keyBy(0);  keyedStream.window(); 2.3.1 countWindow 示例
         1.分组情况下使用 countWindow();
          2.未分组情况下使用 countWindowAll()
   
public class CountWindowDemo {      public static void main(String[] args) throws Exception{          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();          DataStreamSource<String> source = env.socketTextStream("localhost", 8888);          SingleOutputStreamOperator<Tuple2<Integer, Integer>> mapOperator =                 source.map(Integer::parseInt).map(num -> Tuple2.of(num, 1)).returns(Types.TUPLE(Types.INT, Types.INT));          //1.未分组,使用countWindowAll()         AllWindowedStream<Tuple2<Integer, Integer>, GlobalWindow> windowedStream = mapOperator.countWindowAll(5);                  //2.分组后,使用countWindow()         //KeyedStream<Tuple2<Integer, Integer>, Tuple> keyedStream = mapOperator.keyBy(0);         //WindowedStream<Tuple2<Integer, Integer>, Tuple, GlobalWindow> windowedStream = keyedStream.countWindow(5);          SingleOutputStreamOperator<Tuple2<Integer, Integer>> sum = windowedStream.sum(0);          sum.print();                  env.execute("CountWindowDemo");     } } 未分组:输入5条记录,执行一次 sum/print 等操作;已分组:当前分组满足5条记录,执行一次 sum/print 等操作;2.3.2 滚动窗口(Tumbling Window)示例
          1.分组情况下使用 timeWindow(Time size);
          2.未分组情况下使用 timeWindowAll(Time size)
   
public class TumblingWindowDemo {      public static void main(String[] args) throws Exception {          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();          DataStreamSource<String> source = env.socketTextStream("localhost", 8888);          SingleOutputStreamOperator<Tuple2<Integer, Integer>> mapOperator =                 source.map(Integer::parseInt).map(num -> Tuple2.of(num, 1)).returns(Types.TUPLE(Types.INT, Types.INT));          //1.未分组,使用timeWindowAll()         AllWindowedStream<Tuple2<Integer, Integer>, TimeWindow> windowedStream = mapOperator.timeWindowAll(Time.seconds(5));          //2.分组后,使用timeWindow()         //KeyedStream<Tuple2<Integer, Integer>, Tuple> keyedStream = mapOperator.keyBy(0);         //WindowedStream<Tuple2<Integer, Integer>, Tuple, TimeWindow> windowedStream = keyedStream.timeWindow(Time.seconds(5));          SingleOutputStreamOperator<Tuple2<Integer, Integer>> sum = windowedStream.sum(0);          sum.print();          env.execute("TumblingWindowDemo");     } }  
AllWindowedStream<Tuple2<Integer, Integer>, TimeWindow> windowedStream = mapOperator.timeWindowAll(Time.seconds(5));  AllWindowedStream<Tuple2<Integer, Integer>, TimeWindow> windowedStream =mapOperator.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))); 未分组:不管你输入多少条数据,都对其求和输出,5 秒滚动一次。如果在 5 秒之内未输入任何数据,则不输出已分组:对分组中的数据 5 秒滚动一次。如果有多个分组,5秒之后,求和后同步输出所有分组数据。2.3.3 滑动窗口(Sliding Window)示例
          1.分组情况下使用 timeWindow(Time size,Time slide);
          2.未分组情况下使用 timeWindowAll(Time size,Time slide)
   
public class SlidingWindowDemo {      public static void main(String[] args) throws Exception {          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();          DataStreamSource<String> source = env.socketTextStream("localhost", 8888);          SingleOutputStreamOperator<Tuple2<Integer, Integer>> mapOperator =                 source.map(Integer::parseInt).map(num -> Tuple2.of(num, 1)).returns(Types.TUPLE(Types.INT, Types.INT));          //1.未分组,使用timeWindowAll()         AllWindowedStream<Tuple2<Integer, Integer>, TimeWindow> windowedStream = mapOperator.timeWindowAll(Time.seconds(10),Time.seconds(5));          //2.分组后,使用timeWindow()         //KeyedStream<Tuple2<Integer, Integer>, Tuple> keyedStream = mapOperator.keyBy(0);         //WindowedStream<Tuple2<Integer, Integer>, Tuple, TimeWindow> windowedStream = keyedStream.timeWindow(Time.seconds(10),Time.seconds(5));          SingleOutputStreamOperator<Tuple2<Integer, Integer>> sum = windowedStream.sum(0);          sum.print();          env.execute("SlidingWindowDemo");     } }  
AllWindowedStream<Tuple2<Integer, Integer>, TimeWindow> windowedStream = mapOperator.timeWindowAll(Time.seconds(10),Time.seconds(5));  AllWindowedStream<Tuple2<Integer, Integer>, TimeWindow> windowedStream = mapOperator.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))); 
未分组:根据输入的数据求和,开始滑动, 5s 输出一次已分组:对分组中的数据 5 秒滑动一次。如果有多个分组,5秒之后,求和后同步输出所有分组数据。2.3.4 会话窗口(Session Window)示例
          1.分组情况下使用 window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));
          2.未分组情况下使用 windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
   
public class SessionWindowDemo {      public static void main(String[] args) throws Exception {          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();          DataStreamSource<String> source = env.socketTextStream("localhost", 8888);          SingleOutputStreamOperator<Tuple2<Integer, Integer>> mapOperator =                 source.map(Integer::parseInt).map(num -> Tuple2.of(num, 1)).returns(Types.TUPLE(Types.INT, Types.INT));          //1.未分组,使用 windowAll()          AllWindowedStream<Tuple2<Integer, Integer>, TimeWindow> windowedStream = mapOperator.windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));          //2.分组后,使用 window() //        KeyedStream<Tuple2<Integer, Integer>, Tuple> keyedStream = mapOperator.keyBy(0); //        WindowedStream<Tuple2<Integer, Integer>, Tuple, TimeWindow> windowedStream = keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)));          SingleOutputStreamOperator<Tuple2<Integer, Integer>> sum = windowedStream.sum(0);          sum.print();          env.execute("SessionWindowDemo");     } } 
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算
 官方软件产品操作指南 (170)
官方软件产品操作指南 (170)