Flink的异步算子的原理及使用( 五 )

我们可以从上面的ResultHandler类的complete方法开始看,具体可以参见上面注释,总结起来就是如下几步

  1. 取消当前ResultHandler对象的超时定时任务
  2. 调用StreamRecordQueueEntry的complete方法将成员变量completedElements赋值
  3. 判断OrderedStreamElementQueue队列的队头元素StreamRecordQueueEntry的completedElements成员变量是不是不为空
  4. 第3步不为空,则调用OrderedStreamElementQueue队列的emitCompletedElement方法移除队列的头元素StreamElementQueueEntry并调用emitResult方法真正向下游吐出数据
从上面可以看出每次随着completableFuture的complete方法的调用,都会判断队头的元素是否处理完,处理完就移除队头元素并向吐出数据 。所以异步算子每次来数据经过processElement方法,就已经将数据元素封装成StreamElementQueueEntry对象并放到了队列中,虽然异步算子执行过程是异步,每个元素的完成时间没有顺序,但是由于每个元素完成后,都是判断的队头元素有没有完成,完成后也是移除队头并向下游吐数据 。所以整体过程还是按照processElement处理顺序也就是上游给过来的数据顺序严格有序的 。
5、flinksql自定义AsyncLookupFunction通常flinksql使用外部的数据源都需要引入一个flinksql-connector-xx这种jar包,比如我们想以kafka为流表join一个redis的维表,那么这时候查询redis的维表,通常使用的就是lookup join 。但是网上提供的例子基本都是同步的lookup join,在有些场景下为了提高吞吐就需要使用异步的lookup join 。详细实现可以直接看代码:https://gitee.com/rongdi/flinksql-connector-redis

经验总结扩展阅读