我们可以从上面的ResultHandler类的complete方法开始看,具体可以参见上面注释,总结起来就是如下几步
- 取消当前ResultHandler对象的超时定时任务
- 调用StreamRecordQueueEntry的complete方法将成员变量completedElements赋值
- 判断OrderedStreamElementQueue队列的队头元素StreamRecordQueueEntry的completedElements成员变量是不是不为空
- 第3步不为空,则调用OrderedStreamElementQueue队列的emitCompletedElement方法移除队列的头元素StreamElementQueueEntry并调用emitResult方法真正向下游吐出数据
5、flinksql自定义AsyncLookupFunction通常flinksql使用外部的数据源都需要引入一个flinksql-connector-xx这种jar包,比如我们想以kafka为流表join一个redis的维表,那么这时候查询redis的维表,通常使用的就是lookup join 。但是网上提供的例子基本都是同步的lookup join,在有些场景下为了提高吞吐就需要使用异步的lookup join 。详细实现可以直接看代码:https://gitee.com/rongdi/flinksql-connector-redis
经验总结扩展阅读
- 好茶叶的特点
- VLQ & Base64 VLQ 编码方式的原理及代码实现
- Spring Boot 配置 jar 包外面的 Properties 配置文件
- 【C++】从零开始的CS:GO逆向分析3——写出一个透视
- 适合隔夜带饭的菜谱
- 驼鸟蛋的营养价值
- 2023年10月13日是提车吉日吗 2023年10月13日是提车的黄道吉日吗
- 一斤糖蒜的糖和醋比例
- 2023年10月13日旅游黄道吉日 2023年10月13日旅游行吗
- 2023年10月13日出差好不好 2023年10月13日是出差的黄道吉日吗