Flink的异步算子的原理及使用( 三 )


Flink的异步算子的原理及使用

文章插图
@Overridepublic void open() throws Exception {super.open();this.isObjectReuseEnabled = getExecutionConfig().isObjectReuseEnabled();if (recoveredStreamElements != null) {for (StreamElement element : recoveredStreamElements.get()) {if (element.isRecord()) {processElement(element.<IN>asRecord());} else if (element.isWatermark()) {processWatermark(element.asWatermark());} else if (element.isLatencyMarker()) {processLatencyMarker(element.asLatencyMarker());} else {throw new IllegalStateException("Unknown record type "+ element.getClass()+ " encountered while opening the operator.");}}recoveredStreamElements = null;}}如上从7行开始貌似是开始处理数据了,但是根据recoveredStreamElements这个名称一看,很显然recovered是恢复的意思,这里判断是否为空,不为空再做,很明显是做修复数据相关的逻辑,我们处理数据的正主都没找到这里很明显没啥意义,属于路边的野花,直接忽略到下一个断点去 。
Flink的异步算子的原理及使用

文章插图
@Overridepublic void processElement(StreamRecord<IN> record) throws Exception {StreamRecord<IN> element;// copy the element avoid the element is reusedif (isObjectReuseEnabled) {//noinspection uncheckedelement = (StreamRecord<IN>) inStreamElementSerializer.copy(record);} else {element = record;}// add element first to the queuefinal ResultFuture<OUT> entry = addToWorkQueue(element);final ResultHandler resultHandler = new ResultHandler(element, entry);// register a timeout for the entry if timeout is configuredif (timeout > 0L) {resultHandler.registerTimeout(getProcessingTimeService(), timeout);}userFunction.asyncInvoke(element.getValue(), resultHandler);}很明显根据方法名称可以知道这里就是在处理真正的数据了,反复断点几次,可以发现,每一条数据都会进来这个方法一次
Flink的异步算子的原理及使用

文章插图
这个方法的参数就是source流里的一个元素,下面我们再看一下addToWorkQueue方法吧
【Flink的异步算子的原理及使用】/** 将给定的流元素添加到操作符的流元素队列中 。该操作会阻塞,直到元素被添加 。*/private ResultFuture<OUT> addToWorkQueue(StreamElement streamElement)throws InterruptedException {Optional<ResultFuture<OUT>> queueEntry;while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) {mailboxExecutor.yield();}return queueEntry.get();}这个方法就是将前面source里的元素放入前面new出来的队列,本例这里是一个有序的队列OrderedStreamElementQueue,并返回了一个ResultFuture对象,我们需要看一下这个对象是个啥
4.3、ResultFuture@PublicEvolvingpublic interface ResultFuture<OUT> {/*** 将所有结果放在Collection中,然后输出 。*/void complete(Collection<OUT> result);/*** 将异常输出*/void completeExceptionally(Throwable error);}我们再来看下tryPut是如何包装出了一个ResultFuture对象的
4.4、OrderedStreamElementQueue@Internalpublic final class OrderedStreamElementQueue<OUT> implements StreamElementQueue<OUT> {private static final Logger LOG = LoggerFactory.getLogger(OrderedStreamElementQueue.class);/** Capacity of this queue. */private final int capacity;/** Queue for the inserted StreamElementQueueEntries. */private final Queue<StreamElementQueueEntry<OUT>> queue;public OrderedStreamElementQueue(int capacity) {Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");this.capacity = capacity;this.queue = new ArrayDeque<>(capacity);}@Overridepublic boolean hasCompletedElements() {return !queue.isEmpty() && queue.peek().isDone();}@Overridepublic void emitCompletedElement(TimestampedCollector<OUT> output) {if (hasCompletedElements()) {final StreamElementQueueEntry<OUT> head = queue.poll();head.emitResult(output);}}@Overridepublic List<StreamElement> values() {List<StreamElement> list = new ArrayList<>(this.queue.size());for (StreamElementQueueEntry e : queue) {list.add(e.getInputElement());}return list;}@Overridepublic boolean isEmpty() {return queue.isEmpty();}@Overridepublic int size() {return queue.size();}@Overridepublic Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement) {if (queue.size() < capacity) {StreamElementQueueEntry<OUT> queueEntry = createEntry(streamElement);queue.add(queueEntry);LOG.debug("Put element into ordered stream element queue. New filling degree "+ "({}/{}).",queue.size(),capacity);return Optional.of(queueEntry);} else {LOG.debug("Failed to put element into ordered stream element queue because it "+ "was full ({}/{}).",queue.size(),capacity);return Optional.empty();}}private StreamElementQueueEntry<OUT> createEntry(StreamElement streamElement) {if (streamElement.isRecord()) {return new StreamRecordQueueEntry<>((StreamRecord<?>) streamElement);}if (streamElement.isWatermark()) {return new WatermarkQueueEntry<>((Watermark) streamElement);}throw new UnsupportedOperationException("Cannot enqueue " + streamElement);}}

经验总结扩展阅读