RELATEED CONSULTING
相关咨询
选择下列产品马上在线沟通
服务时间:8:30-17:00
你可能遇到了下面的问题
关闭右侧工具栏

新闻中心

这里有您想知道的互联网营销解决方案
flink中窗口的作用是什么

这篇文章主要讲解了“flink中窗口的作用是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“flink中窗口的作用是什么”吧!

创新互联建站致力于网站设计制作、网站设计,成都网站设计,集团网站建设等服务标准化,推过标准化降低中小企业的建站的成本,并持续提升建站的定制化服务水平进行质量交付,让企业网站从市场竞争中脱颖而出。 选择创新互联建站,就选择了安全、稳定、美观的网站建设服务!

窗口

  • 窗口计算是流式计算中常用的数据计算方式之一,通过按照固定时间或长度将数据流切分成不同的窗口,再对数据进行相应的聚合操作,得到一定时间范围内的统计结果,例如统计最近5分钟内某网站的点击数,此时,点击数据在不断产生,通过5分钟窗口将数据限定在固定时间范围内,就可以对该范围内的有界数据执行聚合,得到最近5分钟的网站点击数。

  • 代码接口规则

stream.keyBy(...)  //keyed类型数据集
.window(...)   //指定窗口分配器类型
[.trigger(...)]  //指定触发器类型(可选)
[.evictor(...)]  //指定evictor(可选)
[.allowedLateness(...)]  //指定是否延迟处理数据(可选)
[.sideOutputLateData(...)]  //指定Output Lag(可选)
.reduce/aggregate/fold/apply()  //指定窗口计算函数
[.getSideOutput(...)]  //根据Tag输出数据(可选)
  • 算子

    • Windows Assigner:指定窗口类型,定义如何将数据流分配到一个或多个窗口

    • Windows Trigger:指定窗口触发的时机,定义窗口满足什么样的条件触发计算;

    • Evictor:用于数据剔除

    • Lateness:标记是否处理迟到数据,当迟到数据到达窗口中是否触发计算。

    • Output Tag:标记输出标签,然后通过getSideOutput将窗口中的数据根据标签输出。

    • Windows Function:定义窗口上数据处理的逻辑,例如对数据进行sum操作。

Keyed 和 Non-Keyed窗口

  • 在运用窗口计算时,Flink根据上游数据集是否为KeyedStream类型(将数据集按Key分区),对应的Window Assigner会不同,

    • 上游数据集为KeyedStream类型,则调用DataStream API的Windwo()方法指定Windows Assigner,数据将根据Key在不同的Task实例中并行分别计算,最后得出针对每个Key统计的结果。

    • 如果是Non-Keyed类型,则调用WindowsAll()方法来指定Windows Assigner,所有数据都被窗口算子路由到一个Task中计算,并得到结果。

  • 建议数据进行KeyedStream处理,这样启动并行计算,加速效率。

Window Assigner

  • flink支持两种类型的窗口,一种基于时间,窗口大小由开始和结束时间戳约束,一种基于数量,根据固定数量定义窗口大小。

  • 根据Windows Assigner数据分配方式的不同将Windows分为4大类:滚动窗口(Tumbling Windows)、滑动窗口(Sliding Windows)、会话窗口(Session Windows)和全局窗口(Global Windows)

滚动窗口

  • 滚动窗口根据固定时间或大小切分,且窗口与窗口间元素互不重叠,适合于固定时间大小和周期统计某一指标的窗口计算。

  • DataStream API提供了基于Event Time和Process Time两种时间类型的Tumbling窗口,对应的Assigner分别为TumblingEventTimeWindows和TumblingProcessTimeWindows,窗口大小童工of()指定,时间单位分别为Time.milliseconds(x)、Time.seconds(x)或Time.minutes(x),也可以是不同时间单位的组合。

  • 如下实例,窗口时间按10S进行切分,窗口的时间是[1:00:00.000-1:00:09.999] 到[1:00:10.000-1:00:19.999]的等固定时间范围。

val inputStream:DataStream[T]= ...
//定义Event Time Tumbling Windows
val tumblingEventTimeWindows=inputStream.keyBy(_.id)
//通过使用TumblingEventTimeWindows定义Event Time滚动窗口
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(...)  //定义窗口函数

//定义Process Time Tumbling Windows
val tumblingProcessingTimeWindows = inputStream.keyBy(_.id)
//通过TumblingProcessTimeWindows定义Evnet Time滚动窗口
.window(TumblingProcessTimeWindows.of(Times.seconds(10)))
.process(...)  //定义窗口函数

滑动窗口

  • 滑动窗口是一种常见的窗口类型,特点是在滚动窗口基础上增加了窗口滑动时间(Slide Time),且允许窗口数据发生重叠。这种窗口不像滚动窗口按照Windows Size向前移动,而是根据设定的Slide Time向前滑动。窗口之间的数据重叠大小根据Windows Size和Slide time决定,当Slide Time小于Windows Size便会发生窗口重叠,Slide Size大于WindowsSize会出现窗口不连续,数据可能不会再任何一个窗口内计算。

  • DataStream API针对Sliding Windows根据不同时间类型Assigner,包括基于Event Time的SlidingEventTimeWindows和基于Process Time的SlidingProcessingTimeWindows。

  • 实例如下,指定Windows Size为1h,Slide Time为10m。

val inputStream:DataStream[T]= ...
//定义Event Time Sliding Windows
val slidingEventTimeWindows=inputStream.keyBy(_.id)
//通过使用SlidingEventTimeWindows定义Event Time滚动窗口
.window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(10)))
.process(...)  //定义窗口函数

//定义Process Time Sliding Windows
val slidingProcessTimeWindows = inputStream.keyBy(_.id)
//通过SlidingProcessTimeWindows定义Evnet Time滚动窗口
.window(SlidingProcessTimeWindows.of(Time.hours(1),Time.minutes(10)))
.process(...)  //定义窗口函数

会话窗口

  • 将某个时间段内活跃较高的数据聚合为一个窗口进行计算,窗口的触发条件为Session Gap,指规定时间内没有数据活跃接入,则任务窗口结束,触发窗口计算。

  • 注意:如果数据一直不间断,会导致窗口始终不触发。

  • 与滑动、滚动窗口不同,Session Windows不需要定义Windows Size和Slide Time,只需要定义session gap,规定不活跃数据的时间上线即可。

  • Session Windows比较适合非连续型数据处理或周期性产生数据的场景。DataStream API中可以创建基于Event Time和Process Time的Session Windows,对应的有Assigner分别为EventTimeSessionWindow和ProcessTimerSessionWindows。

  • 实例代码如下:

val inputStream:DataStream[T]= ...
//定义Event Time Session Windows
val eventTimeSessionWindows=inputStream.keyBy(_.id)
//通过使用EventTimeSessionWindows定义Event Time滚动窗口
.window(EventTimeSessionWindows.withGap(Time.milliseconds(10)))
.process(...)  //定义窗口函数

//定义Process Time Session Windows
val processTimeSessionWindows = inputStream.keyBy(_.id)
//通过ProcessTimeSessionWindows定义Evnet Time滚动窗口
.window(ProcessTimeSessionWindows.withGap(Time.milliseconds(10)))
.process(...)  //定义窗口函数
  • flink支持动态调整的Session Gap,需要实现SessionWindowTimeGapExtractor接口,并复写extract方法,完成Session Gap的抽取,然后将创建好的Session Gap抽取器传入ProcessiongTimeSessionWindows.withDynamicGap()方法即可。

val inputStream:DataStream[T]= ...
//定义Event Time Session Windows
val eventTimeSessionWindows=inputStream.keyBy(_.id)
//通过使用EventTimeSessionWindows定义Event Time滚动窗口
.window(EventTimeSessionWindows.withDynamicGap(

    //实例化SessionWindowTimeGapExtractor接口
    new SessionWindowTimeGapExtractor[String]{
        override def extract(element:String):Long={
            //动态指定并返回Session Gap
        }
    }
))
.process(...)  //定义窗口函数

//定义Process Time Session Windows
val processTimeSessionWindows = inputStream.keyBy(_.id)
//通过ProcessTimeSessionWindows定义Evnet Time滚动窗口
.window(ProcessTimeSessionWindows.withDynamicGap(

    //实例化SessionWindowTimeGapExtractor接口
    new SessionWindowTimeGapExtractor[String]{
        override def extract(element:String):Long={
            //动态指定并返回Session Gap
        }
    }
))
.process(...)  //定义窗口函数

全局窗口

  • 全局会话窗口将所有相同的key数据分配到单个窗口中计算,窗口没有起始和结束时间,窗口需要借助Triger触发计算,如果不指定,则不会触发计算。

  • 使用全局窗口要非常谨慎,必须明确自己在整个窗口中统计出的结果是什么,并指定对应的触发器,同时指定相应的数据清理机制,否则数据将一直留在内存中。

val inputStream:DataStream[T]= ...
val globalWindows = inputStream.keyBy(_.id)
.window(GlobalWindows.create())  //通过GlobalWindows定义Global Windows
.process()

总结

  • flink定义的四种窗口,容易和时间窗口和事件窗口混淆,他们是不同维度的的窗口定义,需要特别注意下。

  • 越长大越孤单,珍惜好身边人。

感谢各位的阅读,以上就是“flink中窗口的作用是什么”的内容了,经过本文的学习后,相信大家对flink中窗口的作用是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!


分享名称:flink中窗口的作用是什么
网站地址:http://scpingwu.com/article/phoshe.html