我们重点关注一下52行以下的部分,可以看到new了一个StreamElementQueueEntry对象放入了queue队列中,那就需要看一下StreamRecordQueueEntry类了
4.5、StreamRecordQueueEntry@Internalclass StreamRecordQueueEntry<OUT> implements StreamElementQueueEntry<OUT> {@Nonnull private final StreamRecord<?> inputRecord;private Collection<OUT> completedElements;StreamRecordQueueEntry(StreamRecord<?> inputRecord) {this.inputRecord = Preconditions.checkNotNull(inputRecord);}@Overridepublic boolean isDone() {return completedElements != null;}@Nonnull@Overridepublic StreamRecord<?> getInputElement() {return inputRecord;}@Overridepublic void emitResult(TimestampedCollector<OUT> output) {output.setTimestamp(inputRecord);for (OUT r : completedElements) {output.collect(r);}}@Overridepublic void complete(Collection<OUT> result) {this.completedElements = Preconditions.checkNotNull(result);}}
如上之后,现在已经可以有一个大概的认识了,就是随着程序的运行,一个个的数据被封装成了StreamRecordQueueEntry对象,并阻塞的放入了OrderedStreamElementQueue队列中了,这个队列中的StreamRecordQueueEntry对象有一些方法标志性的方法,如:isDone,根据名字就可以知道是否完成的意思;emitResult方法如果写过flink程序的人一看到output.collect(r)这种代码就知道是向下游发出数据的方法;complete方法字母意思就是一个完成动作方法,内容就是把传入的数据判空后赋给了成员变量completedElements 。
我们继续回到processElement方法的主线上来,
// 首先将元素添加到队列中final ResultFuture<OUT> entry = addToWorkQueue(element);final ResultHandler resultHandler = new ResultHandler(element, entry);// 如果配置了timeout,则为条目注册一个超时,这里的timeout也就是测试代码里的10sif (timeout > 0L) {resultHandler.registerTimeout(getProcessingTimeService(), timeout);}userFunction.asyncInvoke(element.getValue(), resultHandler);
关注上面的最后一行,执行了asyncInvoke方法,也就回到了测试代码里覆写的asyncInvoke方法里了
/** * 使用AsyncDataStream构造一个异步顺序流,这里异步顺序流从名字就可以看出来虽然是异步的,但是却可以保持顺序, * 这个后面源码分析可以知道原因 */SingleOutputStreamOperator asyncStream = AsyncDataStream.orderedWait(stream, new AsyncFunction<String, String>() {@Overridepublic void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {/*** 这里调用模拟的获取异步请求结果,并返回一个CompletableFuture*/CompletableFuture<String> future = new AsyncIODemo().pullData(input);/*** 注册一个future处理完成的回调,当future处理完成拿到结果后,调用resultFuture的* complete方法真正吐出数据*/future.whenCompleteAsync((d,t) ->{resultFuture.complete(Arrays.asList(d));});}// 设置最长异步调用超时时间为10秒}, 10, TimeUnit.SECONDS);
这时候我们可以打个断点到如上测试代码的17行上,然后执行进入方法,可以看到实际上回到了org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.ResultHandler这个内部类里的complete方法
private void outputCompletedElement() {/**判断这个OrderedStreamElementQueue队列有没有完成了的元素,参见上面代码@Overridepublic boolean hasCompletedElements() {return !queue.isEmpty() && queue.peek().isDone();}其实就是查看了一下队列头的元素StreamRecordQueueEntry,调用了一下isDone方法@Overridepublic boolean isDone() {return completedElements != null;}就是判断成员变量是不是空,因为上一步已经赋值了,所以这里isDone就返回true了 */if (queue.hasCompletedElements()) {/**调用了一下OrderedStreamElementQueue队列的emitCompletedElement方法,@Overridepublic void emitCompletedElement(TimestampedCollector<OUT> output) {if (hasCompletedElements()) {final StreamElementQueueEntry<OUT> head = queue.poll();head.emitResult(output);}}移除队列的头元素StreamElementQueueEntry,并调用其emitResult方法@Overridepublic void emitResult(TimestampedCollector<OUT> output) {output.setTimestamp(inputRecord);for (OUT r : completedElements) {output.collect(r);}}这里就是真正的循环调用collect把数据吐出到下游去了*/queue.emitCompletedElement(timestampedCollector);// if there are more completed elements, emit them with subsequent mailsif (queue.hasCompletedElements()) {try {mailboxExecutor.execute(this::outputCompletedElement,"AsyncWaitOperator#outputCompletedElement");} catch (RejectedExecutionException mailboxClosedException) {// This exception can only happen if the operator is cancelled which means all// pending records can be safely ignored since they will be processed one more// time after recovery.LOG.debug("Attempt to complete element is ignored since the mailbox rejected the execution.",mailboxClosedException);}}}}/** A handler for the results of a specific input record. */private class ResultHandler implements ResultFuture<OUT> {/** Optional timeout timer used to signal the timeout to the AsyncFunction. */private ScheduledFuture<?> timeoutTimer;/** Record for which this result handler exists. Used only to report errors. */private final StreamRecord<IN> inputRecord;/*** The handle received from the queue to update the entry. Should only be used to inject the* result; exceptions are handled here.*/private final ResultFuture<OUT> resultFuture;/*** A guard against ill-written AsyncFunction. Additional (parallel) invokations of {@link* #complete(Collection)} or {@link #completeExceptionally(Throwable)} will be ignored. This* guard also helps for cases where proper results and timeouts happen at the same time.*/private final AtomicBoolean completed = new AtomicBoolean(false);ResultHandler(StreamRecord<IN> inputRecord, ResultFuture<OUT> resultFuture) {this.inputRecord = inputRecord;this.resultFuture = resultFuture;}@Overridepublic void complete(Collection<OUT> results) {Preconditions.checkNotNull(results, "Results must not be null, use empty collection to emit nothing");// cas修改一下completed的状态,不成功就返回if (!completed.compareAndSet(false, true)) {return;}processInMailbox(results);}private void processInMailbox(Collection<OUT> results) {// move further processing into the mailbox threadmailboxExecutor.execute(() -> processResults(results),"Result in AsyncWaitOperator of input %s",results);}private void processResults(Collection<OUT> results) {/**如果超时的Timer对象不为空,则将定时任务取消掉,因为这里已经是在完成方法里调用了,数据都完成处理了,这个数据的超时任务就可以取消了*/if (timeoutTimer != null) {// canceling in mailbox thread avoids// https://issues.apache.org/jira/browse/FLINK-13635timeoutTimer.cancel(true);}/**这里调用了一下StreamRecordQueueEntry的complete方法将成员变量completedElements赋值了,可以参见上面StreamRecordQueueEntry类*/resultFuture.complete(results);// 这里看上面第1行代码outputCompletedElement();}}
经验总结扩展阅读
- 好茶叶的特点
- VLQ & Base64 VLQ 编码方式的原理及代码实现
- Spring Boot 配置 jar 包外面的 Properties 配置文件
- 【C++】从零开始的CS:GO逆向分析3——写出一个透视
- 适合隔夜带饭的菜谱
- 驼鸟蛋的营养价值
- 2023年10月13日是提车吉日吗 2023年10月13日是提车的黄道吉日吗
- 一斤糖蒜的糖和醋比例
- 2023年10月13日旅游黄道吉日 2023年10月13日旅游行吗
- 2023年10月13日出差好不好 2023年10月13日是出差的黄道吉日吗