Ignite实战( 四 )


2.5.1 获取计算接口运行分布式计算的主要入口点是计算接口,它可以从Ignite.
Ignite ignite = Ignition.start();IgniteCompute compute = ignite.compute();2.5.2 指定计算的节点集计算接口的每个实例都与执行任务的一组节点相关联 。不带参数调用时,ignite.compute()返回与所有服务器节点关联的计算接口 。要获取特定节点子集的实例,请使用Ignite.compute(ClusterGroup group). 在以下示例中,计算接口仅绑定到远程节点,即除运行此代码的节点之外的所有节点 。
Ignite ignite = Ignition.start();IgniteCompute compute = ignite.compute(ignite.cluster().forRemotes());2.5.3 执行任务Ignite 提供了三个接口,可以实现代表一个任务并通过计算接口执行:

  • IgniteRunnable— 其扩展java.lang.Runnable可用于实现没有输入参数且不返回结果的计算 。
  • IgniteCallablejava.util.concurrent.Callable—返回特定值的扩展 。
  • IgniteClosure— 接受参数并返回值的功能接口 。
您可以执行一次任务(在其中一个节点上)或将其广播到所有节点 。
2.5.4 执行一个可运行的任务要执行可运行的任务,请使用run(…?)计算接口的方法 。任务被发送到与计算实例关联的节点之一 。
IgniteCompute compute = ignite.compute();// Iterate through all words and print// each word on a different cluster node.for (String word : "Print words on different cluster nodes".split(" ")) {compute.run(() -> System.out.println(word));}2.5.5 执行可调用任务要执行可调用任务,请使用call(…?)计算接口的方法 。
Collection<IgniteCallable<Integer>> calls = new ArrayList<>();// Iterate through all words in the sentence and create callable jobs.for (String word : "How many characters".split(" "))calls.add(word::length);// Execute the collection of callables on the cluster.Collection<Integer> res = ignite.compute().call(calls);// Add all the word lengths received from cluster nodes.int total = res.stream().mapToInt(Integer::intValue).sum();2.5.6 执行IgniteClosure要执行IgniteClosure,请使用apply(…?)计算接口的方法 。该方法接受任务和任务的输入参数 。IgniteClosure参数在执行时传递给给定的 。
IgniteCompute compute = ignite.compute();// Execute closure on all cluster nodes.Collection<Integer> res = compute.apply(String::length, Arrays.asList("How many characters".split(" ")));// Add all the word lengths received from cluster nodes.int total = res.stream().mapToInt(Integer::intValue).sum();2.5.7 广播任务该方法在与计算实例关联的所有节点broadcast()上执行任务 。
// Limit broadcast to remote nodes only.IgniteCompute compute = ignite.compute(ignite.cluster().forRemotes());// Print out hello message on remote nodes in the cluster group.compute.broadcast(() -> System.out.println("Hello Node: " + ignite.cluster().localNode().id()));2.5.8 异步执行前几节中描述的所有方法都有异步对应物:
  • callAsync(…?)
  • runAsync(…?)
  • applyAsync(…?)
  • broadcastAsync(…?)
异步方法返回一个IgniteFuture表示操作结果的值 。在以下示例中,异步执行一组可调用任务 。
IgniteCompute compute = ignite.compute();Collection<IgniteCallable<Integer>> calls = new ArrayList<>();// Iterate through all words in the sentence and create callable jobs.for (String word : "Count characters using a callable".split(" "))calls.add(word::length);IgniteFuture<Collection<Integer>> future = compute.callAsync(calls);future.listen(fut -> {// Total number of characters.int total = fut.get().stream().mapToInt(Integer::intValue).sum();System.out.println("Total number of characters: " + total);});2.5.9 执行超时任务您可以设置任务执行的超时时间 。如果任务没有在给定的时间范围内完成,它会被停止并取消该任务产生的所有作业 。

经验总结扩展阅读