2020年11月24日

Flink系列之二Flink的窗口和水印

点击数:14

Windows 窗口定义

按固定时间区间计算该区间的值,例如15s计算汇总一次:

无穷的流,数据不间断的,例如有累计数据的需求,按上图的逻辑是处理不到的。换一种思路,每隔 15 秒,我们都将与上一次的结果进行 sum 操作(滑动聚合):

img

流是无界的,我们不能限制流,所以上述方案也解决不了需求,但可以在有一个有界的范围内处理无界的流数据。

那么按一分钟一个时间窗口计算,相当于一个定义了一个 Window(窗口),window 的界限是1分钟,且每分钟内的数据互不干扰,因此也可以称为滚动(不重合)窗口

img

第一分钟的数量为8,第二分钟是22,第三分钟是27。。。这样,1个小时内会有60个window。

再考虑一种情况,每30秒统计一次过去1分钟的数量之和:

img

通常来讲,Window 就是用来对一个无限的流设置一个有限的集合,在有界的数据集上进行操作的一种机制。window 又可以分为基于时间(Time-based)的 window 以及基于数量(Count-based)的 window。

Flink窗口类型

对于窗口的操作主要分为两种,分别对于 Keyedstream 和 Datastream。他们的主要区别也仅仅在于建立窗口的时候一个为 .window(…),一个为 .windowAll(…)。对于 Keyedstream 的窗口来说,他可以使得多任务并行计算,每一个 logical key stream 将会被独立的进行处理。

stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)/.windowAll(...)  <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"
  • 按照窗口的Assigner来分,窗口可以分为
    Tumbling window(滚动窗口), sliding window(滑动窗口),session window(会话窗口),global window(全局窗口),custom window(自定义窗口)
  • 每种窗口又可分别基于 processing time(处理时间) 和 event time(事件时间)。
  • 还有一种window叫做count window(计数窗口),依据元素到达的数量进行分配,之后也会提到。
  • 窗口的生命周期开始在第一个属于这个窗口的元素到达的时候,结束于第一个不属于这个窗口的元素到达的时候

窗口的操作

Time window

固定相同间隔分配窗口,每个窗口之间没有重叠。

  • tumbling time windows(滚动时间窗口)
data.keyBy(1)
    .timeWindow(Time.minutes(1)) //tumbling time window 每分钟统计一次数量和
    .sum(1);
  • sliding time windows(滑动时间窗口)
    固定相同间隔分配窗口,只不过每个窗口之间有重叠。窗口重叠的部分如果比窗口小,窗口将会有多个重叠,即一个元素可能被分配到多个窗口里去。

data.keyBy(1)
    .timeWindow(Time.minutes(1), Time.seconds(30)) //sliding time window 每隔 30s 统计过去一分钟的数量和
    .sum(1);

那么流处理器如何解释时间?

Apache Flink 具有三个不同的时间概念,即 processing time(处理时间), event time(时间时间) 和 ingestion time(流进入窗口时间,即提取时间)。

img

默认采用:TimeCharacteristic.ProcessingTime 发音为: [ˌkærəktəˈrɪstɪk]

img

我们可以设置为其他:

1. env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
2. env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

指定为 EventTime 的 source 需要自己定义 event time 以及 emit watermark,或者在 source 之外通assignTimestampsAndWatermarks 在程序手工指定

Watermark解释

Count Windows

Apache Flink 还提供计数窗口功能。如果计数窗口设置的为 100 ,那么将会在窗口中收集 100 个事件,并在添加第 100 个元素时计算窗口的值。

  • tumbling count window
data.keyBy(1)
    .countWindow(100) //统计每 100 个元素的数量之和
    .sum(1);
  • sliding count window
data.keyBy(1)
    .countWindow(100, 10) //每 10 个元素统计过去 100 个元素的数量之和
    .sum(1);

Session window

主要是根据活动的事件进行窗口化,他们通常不重叠,也没有一个固定的开始和结束时间。一个 session window 关闭通常是由于一段时间没有收到元素。在这种用户交互事件流中,我们首先想到的是将事件聚合到会话窗口中(一段用户持续活跃的周期),由非活跃的间隙分隔开。

// 静态间隔时间
WindowedStream<MovieRate, Integer, TimeWindow> Rates = rates
                .keyBy(MovieRate::getUserId)
                .window(EventTimeSessionWindows.withGap(Time.milliseconds(10)));
// 动态时间
WindowedStream<MovieRate, Integer, TimeWindow> Rates = rates
                .keyBy(MovieRate::getUserId)
                .window(EventTimeSessionWindows.withDynamicGap(()));

Global window

同 keyed 的元素分配到一个窗口里

WindowedStream<MovieRate, Integer, GlobalWindow> Rates = rates
    .keyBy(MovieRate::getUserId)
    .window(GlobalWindows.create());

Flink 的窗口机制

  • 到达窗口操作符的元素被传递给 WindowAssigner。WindowAssigner 将元素分配给一个或多个窗口,可能会创建新的窗口。窗口本身只是元素列表的标识符,它可能提供一些可选的元信息,例如 TimeWindow 中的开始和结束时间。注意,元素可以被添加到多个窗口,这也意味着一个元素可以同时在多个窗口存在。
  • 每个窗口都拥有一个 Trigger(触发器),该 Trigger(触发器) 决定何时计算和清除窗口。当先前注册的计时器超时时,将为插入窗口的每个元素调用触发器。在每个事件上,触发器都可以决定触发(即清除(删除窗口并丢弃其内容),或者启动并清除窗口。一个窗口可以被求值多次,并且在被清除之前一直存在。注意,在清除窗口之前,窗口将一直消耗内存。
  • 当 Trigger(触发器) 触发时,可以将窗口元素列表提供给可选的 Evictor(驱逐器),Evictor 可以遍历窗口元素列表,并可以决定从列表的开头删除首先进入窗口的一些元素。然后其余的元素被赋给一个计算函数,如果没有定义 Evictor,触发器直接将所有窗口元素交给计算函数。
  • 计算函数接收 Evictor 过滤后的窗口元素,并计算窗口的一个或多个元素的结果。 DataStream API 接受不同类型的计算函数,包括预定义的聚合函数,如 sum(),min(),max(),以及 ReduceFunction,FoldFunction 或 WindowFunction。
    1. 窗口函数就是这四个:ReduceFunction,AggregateFunction,FoldFunction,ProcessWindowFunction。前两个执行得更有效,因为 Flink 可以增量地聚合每个到达窗口的元素。
    2. Flink 在调用函数之前必须内部缓冲窗口中的所有元素,所以使用 ProcessWindowFunction 进行操作效率不高。不过 ProcessWindowFunction 可以跟其他的窗口函数结合使用,其他函数接受增量信息,ProcessWindowFunction 接受窗口的元数据。

这些是构成 Flink 窗口机制的组件。 接下来我们逐步演示如何使用 DataStream API 实现自定义窗口逻辑。 我们从 DataStream [IN] 类型的流开始,并使用 key 选择器函数对其分组,该函数将 key 相同类型的数据分组在一块。

SingleOutputStreamOperator<xxx> data = env.addSource(...);
data.keyBy();

自定义Windows

Window Assigner

负责将元素分配到不同的 window。

Window API 提供了自定义的 WindowAssigner 接口,我们可以实现 WindowAssigner 的方法

public abstract Collection<W> assignWindows(T element, long timestamp)

同时,对于基于 Count 的 window 而言,默认采用了 GlobalWindow 的 window assigner,例如:

keyBy.window(GlobalWindows.create())

Trigger(触发器)

触发器定义了窗口何时准备好被窗口处理。每个窗口分配器默认都有一个触发器,如果默认的触发器不符合你的要求,就可以使用trigger(…)自定义触发器。

通常来说,默认的触发器适用于多种场景。例如,所有的 event-time 窗口分配器都有一个 EventTimeTrigger 作为默认触发器。该触发器在 watermark 通过窗口末尾时触发。

PS:GlobalWindow 默认的触发器是 NeverTrigger,该触发器从不触发,所以在使用 GlobalWindow 时必须自定义触发器。

Evictor(驱逐器-可选)

Evictors 可以在触发器触发之后以及窗口函数被应用之前和/或之后可选择的移除元素。使用 Evictor 可以防止预聚合,因为窗口的所有元素都必须在应用计算逻辑之前先传给Evictor进行处理。

通过 apply WindowFunction 来返回 DataStream 类型数据

利用 Flink 的内部窗口机制和 DataStream API 可以实现自定义的窗口逻辑,例如 session window。

Flink Event Time Watermark

在下文中的例子中,我们有一个带有时间戳的事件流,但是由于某种原因它们并不是按顺序到达的。图中的数字代表事件发生的时间戳。第一个到达的事件发生在时间 4,然后它后面跟着的是发生在更早时间(时间 2)的事件,以此类推:

注意这是一个按照事件时间处理的例子,这意味着时间戳反映的是事件发生的时间,而不是处理事件的时间。事件时间(Event-Time)处理的强大之处在于,无论是在处理实时的数据还是重新处理历史的数据,基于事件时间创建的流计算应用都能保证结果是一样的。

注:可以访问 Apache Flink 文档,了解更多有关时间的概念,如 event-time, processing-time, ingestion-time。

现在假设我们正在尝试创建一个流计算排序算子。也就是处理一个乱序到达的事件流,并按照事件时间的顺序输出事件。

理解 #1:

数据流中的第一个元素的时间是 4,但是我们不能直接将它作为排序后数据流的第一个元素并输出它。因为数据是乱序到达的,也许有一个更早发生的数据还没有到达。事实上,我们能预见一些这个流的未来,也就是我们的排序算子至少要等到 2 这条数据的到达再输出结果。

有缓存,就必然有延迟。

理解 #2:

如果我们做错了,我们可能会永远等待下去。首先,我们的应用程序从看到时间 4 的数据,然后看到时间 2 的数据。是否会有一个比时间 2 更早的数据到达呢?也许会,也许不会。我们可以一直等下去,但可能永远看不到 1 。

最终,我们必须勇敢地输出 2 作为排序流的第一个结果。

理解 #3:

我们需要的是某种策略,它定义了对于任何带时间戳的事件流,何时停止等待更早数据的到来。

这正是 watermark 的作用,他们定义了何时不再等待更早的数据。

Flink 中的事件时间处理依赖于一种特殊的带时间戳的元素,成为 watermark,它们会由数据源或是 watermark 生成器插入数据流中。具有时间戳 t 的 watermark 可以被理解为断言了所有时间戳小于或等于 t 的事件都(在某种合理的概率上)已经到达了。

译注:此处原文是“小于”,译者认为应该是 “小于或等于”,因为 Flink 源码中采用的是 “小于或等于” 的机制。

何时我们的排序算子应该停止等待,然后将事件 2 作为首个元素输出?答案是当收到时间戳为 2(或更大)的 watermark 时。

理解 #4:

我们可以设想不同的策略来生成 watermark。

我们知道每个事件都会延迟一段时间才到达,而这些延迟差异会比较大,所以有些事件会比其他事件延迟更多。一种简单的方法是假设这些延迟不会超过某个最大值。Flink 把这种策略称作 “有界无序生成策略”(bounded-out-of-orderness)。当然也有很多更复杂的方式去生成 watermark,但是对于大多数应用来说,固定延迟的方式已经足够了。


  1. Flink窗口

发表评论