如上从测试代码开始调用链为:AsyncDataStream.orderedWait -> addOperator,然后addOperator中new了一个AsyncWaitOperatorFactory 。然后到这里其实可以告一段落了,因为没有必要往下看了,这个时候就需要猜了,一般我们类名叫XXFactory基本都是工厂模式,然后工厂生产的就是XX了,这里就是生成AsyncWaitOperator对象的工厂了,然后我们就可以直接在AsyncWaitOperator类的构造方法第一行打个断点,看看啥时候会进去了 。为啥要这样做,因为我们看到的Flink源码其实并不是一个线性的执行过程,架构图如下
文章插图
他的代码实际上并不是都在一个节点执行,虽然我们在本地调试,但是也是在模拟的一个本地集群中执行,怎么模拟出不同的节点呢,很明显要通过线程,也就是说不同的节点用不同的线程来代表并执行 。所以我们无脑断点是没法看到全貌的 。看代码的一个技巧,根据各方面的经验猜测,比如这里就是根据类名的特点进行猜测 。
4.2、AsyncWaitOperator我们在AsyncWaitOperator类的所有公共方法和构造方法里打个断点,debug运行程序进入调试
文章插图
文章插图
很明显这个构造方法,在一个独立的sink线程中运行,如果还按照上面的方式断点,估计找一辈子都找不到了
public AsyncWaitOperator(@Nonnull AsyncFunction<IN, OUT> asyncFunction,long timeout,int capacity,@Nonnull AsyncDataStream.OutputMode outputMode,@Nonnull ProcessingTimeService processingTimeService,@Nonnull MailboxExecutor mailboxExecutor) {super(asyncFunction);setChainingStrategy(ChainingStrategy.ALWAYS);Preconditions.checkArgument(capacity > 0, "The number of concurrent async operation should be greater than 0.");this.capacity = capacity;this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode");this.timeout = timeout;this.processingTimeService = Preconditions.checkNotNull(processingTimeService);this.mailboxExecutor = mailboxExecutor;}
我们看一下构造方法的内容,发现都是一些初始化操作,看着没啥营养,看代码的另外一个技巧:抓大放小,路边的野花不要理睬,忽略一些不重要的初始化和参数校验等代码,重点关注大的流程的东西 。我们继续直接放开往下运行,直到下一个断点
文章插图
@Overridepublic void setup(StreamTask<?, ?> containingTask,StreamConfig config,Output<StreamRecord<OUT>> output) {super.setup(containingTask, config, output);this.inStreamElementSerializer =new StreamElementSerializer<>(getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader()));switch (outputMode) {case ORDERED:queue = new OrderedStreamElementQueue<>(capacity);break;case UNORDERED:queue = new UnorderedStreamElementQueue<>(capacity);break;default:throw new IllegalStateException("Unknown async mode: " + outputMode + '.');}this.timestampedCollector = new TimestampedCollector<>(super.output);}
一眼望去就发现下面switch case那里比较有用,根据名字可以知道,这里根据outputMode判断分别实例化有序的队列和无需的队列,联想到AsyncDataStream类里的几个orderedWait和unorderedWait方法,很快就能想到是否有序这个队列就是关键了 。好了没什么可以留恋了,继续执行到下一个断点吧!文章插图
初始化状态,没啥可留恋的,先跳过继续到下一个断点
经验总结扩展阅读
- 好茶叶的特点
- VLQ & Base64 VLQ 编码方式的原理及代码实现
- Spring Boot 配置 jar 包外面的 Properties 配置文件
- 【C++】从零开始的CS:GO逆向分析3——写出一个透视
- 适合隔夜带饭的菜谱
- 驼鸟蛋的营养价值
- 2023年10月13日是提车吉日吗 2023年10月13日是提车的黄道吉日吗
- 一斤糖蒜的糖和醋比例
- 2023年10月13日旅游黄道吉日 2023年10月13日旅游行吗
- 2023年10月13日出差好不好 2023年10月13日是出差的黄道吉日吗