1.1.1. WatermarkStrategy

为了程序能够基于事件时间处理,Flink 需要准确知道事件发生的时间戳,这意味着数据流中的每个元素需要注册自己的事件时间戳。一般会使用 TimestampAssigner 从数据中的某些字段来提取事件时间。

事件时间的提取影响 watermark 的生成(watermark 代表着事件处理的进度)。我们可以使用 WatermarkGenerator 来生成 watermark。

emmn,貌似直接看上面两句话根本不知道再讲啥。但是 WatermarkStrategy 就是由 TimestampAssigner 和 WatermarkGenerator 组成的。watermark 策略暴露出来的接口就是实现以上两个方法。Flink 提供了内置的 WatermarkStrategy,也允许自定义。

public interface WatermarkStrategy<T> 
    extends TimestampAssignerSupplier<T>,
            WatermarkGeneratorSupplier<T>{

    /**
     * Instantiates a {@link TimestampAssigner} for assigning timestamps according to this
     * strategy.
     */
    @Override
    TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);

    /**
     * Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
     */
    @Override
    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

正如上面提到的,很多时候我们不需要自己实现这个接口,只需要调用内部策略。比如下面这样,使用有界无序的 watermark + 基于 lambda 函数实现的时间注册器。

WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withTimestampAssigner((event, timestamp) -> event.f0);

watermark 可以在读取数据源或者进行后续某些操作后注册。通常我们会选择在读取数据源时注册,这样产生的 watermark 更加准确,也会考虑到初始分区,分片的影响。直接在源端指定 WatermarkStrategy 有时候需要在数据源实现一些特殊的接口,比如 Kafka,这个在下面 WatermarkStrategy && Kafka Connector 会提到。

1.1.2. 处理空闲数据源

在实时数据流处理过程中,数据源往往是经过分区分片的,比如 Kafka,某个分区在一段时间内没有写入数据,这个分区就是一个空闲数据源,读取该分区的 task 在这段时间内是空跑的。空闲数据源带来一个问题,就是 watermark 也不会随之更新。此时,如果其他分区仍然有数据写入,对于下游有多个输入的 operator 来说,watermark 仍然停留在没有数据写入的分区最后一次产生的时间,不会处理其它分区后续写入的数据。

为了解决上述问题,我们需要将某段时间内没有写入的数据源分区标记为空闲,在 watermark 传往下游时,空闲数据源的 watermark 暂时不参与计算。WatermarkStrategy 提供了方便的方法来做这件事情。

// 如果超过 1min 没有写入数据,就标记为空闲
WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withIdleness(Duration.ofMinutes(1));

1.1.3. WatermarkGenerators

TimestampAssigner 方法比较简单,就是从事件数据某些字段中提取转换出事件发生的时间戳,我们往往不需要关注太多。WatermarkGenerator 相对比较复杂,下面是其定义的接口

/**
 * The {@code WatermarkGenerator} generates watermarks either based on events or
 * periodically (in a fixed interval).
 *
 * <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the
 * {@code AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.
 */
@Public
public interface WatermarkGenerator<T> {

    /**
     * Called for every event, allows the watermark generator to examine 
     * and remember the event timestamps, or to emit a watermark based on
     * the event itself.
     */
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);

    /**
     * Called periodically, and might emit a new watermark, or not.
     *
     * <p>The interval in which this method is called and Watermarks 
     * are generated depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
     */
    void onPeriodicEmit(WatermarkOutput output);
}

watermark 生成有两种方式:

  • periodic watermark 基于定时,定时生成会通过 onEvent() 获取每条数据的时间作为待选的 watermark。在框架调用 onPeriodicEmit() 时生成最终的 watermark。
  • punctuated watermark 基于规则(某些特殊事件触发),规则生成也是通过 onEvent() ,在获取到某些特殊事件后,直接生成 watermark,而不通过 onPeriodicEmit()。

基于定时的实现

定时生成器会监听到来的事件,定时触发 watermark 的生成(基于事件时间或者单纯的处理时间都可以)。定时触发的时间间隔可以通过 ExecutionConfig.setAutoWatermarkInterval(...) 来设置,到达时间触发时会调用 onPeriodicEmit() 方法,只要这次的 watermark 不为 null 并且大于上次生成的 watermark,就会成功生成。下面是两个例子:

/**
 * This generator generates watermarks assuming that elements arrive out of order,
 * but only to a certain degree. The latest elements for a certain timestamp t will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 */
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {

    private final long maxOutOfOrderness = 3500; // 3.5 seconds

    private long currentMaxTimestamp;

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // emit the watermark as current highest timestamp minus the out-of-orderness bound
        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
    }

}

/**
 * This generator generates watermarks that are lagging behind processing time 
 * by a fixed amount. It assumes that elements arrive in Flink after a bounded delay.
 */
public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {

    private final long maxTimeLag = 5000; // 5 seconds

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        // don't need to do anything because we work on processing time
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
    }
}

基于规则的实现

基于规则的生成就是当遇到某些特殊事件时才会生成 watermark。

public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        if (event.hasWatermarkMarker()) {
            output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
        }
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // don't need to do anything because we emit in reaction to events above
    }
}

使用规则生成需要主要,有些场景下,连续到来的每个事件可能都会触发 watermark,由此产生过多的 watermark,造成下游的频繁计算,降低性能。

1.1.4. WatermarkStrategy && Kafka Connector

Kafak 是分区内有序,全局乱序的,并且一般都是多线程消费。所以 watermark 的流转和 shuffle 是一样的,可以直接看下面的例子

FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(
        WatermarkStrategy.
                .forBoundedOutOfOrderness(Duration.ofSeconds(20)));

DataStream<MyType> stream = env.addSource(kafkaSource);

1.1.5. operator 是如何处理 watermark 的

operator 在处理完由 watermark 触发的所有计算后,才会将 watermark 继续发往下游。同样的规则适用于 TwoInputStreamOperator,只不过此时发出去的 watermark 是多个输入中最小的。更多的细节可以查看源码中 OneInputStreamOperator#processWatermark, TwoInputStreamOperator#processWatermark1 和 TwoInputStreamOperator#processWatermark2 方法.

1.1.6. 内置的 WatermarkGenerators

单调递增生成 watermark、

单调递增即数据的到来是有序并且是增序的,这意味着当前的时间戳就可以当做 watermark,因为不会存在迟到数据。

WatermarkStrategy.forMonotonousTimestamps();

固定延迟生成 watermark

使用这种方式有个前提,就是需要提前预估好数据流的延迟,否则数据容易产生误差。固定延迟意味着每次生成 watermark 都会往前倒退一段时间作为最终的 watermark 发往下游,超出固定延迟的数据将被丢弃,不会触发计算。

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));

1.1.7. 思考

1. 自己分别实现两种 watermark 的生成,观察一下 watermark 是如何触发计算的?

可以参考测试代码

results matching ""

    No results matching ""