【深入浅出 Yarn 架构与实现】2-2 Yarn 基础库 - 底层通信库 RPC( 二 )

5、构建一个 RPC 客户端
import org.apache.hadoop.ipc.RPC;import org.apache.hadoop.conf.Configuration;import java.io.IOException;import java.net.InetSocketAddress;public class MyClient {public static void main(String[] args) {try {// 获取代理类实例 , 也就是 StubBusinessProtocol proxy = RPC.getProxy(BusinessProtocol.class, BusinessProtocol.versionID,new InetSocketAddress("localhost", 6789), new Configuration());// 通过 Stub 发送请求 , 实际使用就像调用本地方法一样proxy.mkdir("/tmp/ABC");String res = proxy.getName("Simon");System.out.println("从 RPC 服务端接收到的返回值:" + res);} catch (IOException e) {e.printStackTrace();}}}6、测试 , 先启动服务端 , 再启动客户端服务端输出
成功创建了文件夹 :/tmp/ABC成功打了招呼: hello :Simon客户端输出
从 RPC 服务端接收到的返回值:bigdata二)RPC Protobuf 案例实现项目结构如下

【深入浅出 Yarn 架构与实现】2-2 Yarn 基础库 - 底层通信库 RPC

文章插图
对 proto 文件格式不熟悉的同学 , 参考上一篇文章《2-1 Yarn 基础库概述》
MyResourceTrackerMessage.proto 定义数据格式
syntax = "proto3";option java_package = "com.shuofxz.protobuf_rpc.proto";option java_outer_classname = "MyResourceTrackerMessageProto";option java_generic_services = true;option java_generate_equals_and_hash = true;message MyRegisterNodeManagerRequestProto {string hostname = 1;int32 cpu = 2;int32 memory = 3;}message MyRegisterNodeManagerResponseProto {string flag = 1;}MyResourceTracker.proto 定义 rpc 接口
syntax = "proto3";import "com/shuofxz/protobuf_rpc/proto/MyResourceTrackerMessage.proto";option java_package = "com.shuofxz.protobuf_rpc.proto";option java_outer_classname = "MyResourceTrackerProto";option java_generic_services = true;option java_generate_equals_and_hash = true;service MyResourceTrackerService {rpc registerNodeManager(MyRegisterNodeManagerRequestProto) returns (MyRegisterNodeManagerResponseProto);}2、对 proto 文件编译 , 生成 java 类
# 在项目根目录执行 , 路径按照自己的进行修改protoc -I=src/main/java --java_out=src/main/java src/main/java/com/shuofxz/protobuf_rpc/proto/MyResource.protoprotoc -I=src/main/java --java_out=src/main/java src/main/java/com/shuofxz/protobuf_rpc/proto/MyResourceTracker.proto3、定义调用方法接口 MyResourceTracker
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto;import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto;public interface MyResourceTracker {MyRegisterNodeManagerResponseProto registerNodeManager(MyRegisterNodeManagerRequestProto request) throws Exception;}4、对调用方法接口的实现(服务端)
import com.shuofxz.protobuf_rpc.interf.MyResourceTracker;import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto;public class MyResourceTrackerImpl implements MyResourceTracker {@Overridepublic MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto registerNodeManager(MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto request) {// 输出注册的消息String hostname = request.getHostname();int cpu = request.getCpu();int memory = request.getMemory();System.out.println("NodeManager 的注册消息: hostname = " + hostname + ", cpu = " + cpu + ", memory = " + memory);// 省略处理逻辑// 构建一个响应对象 , 用于返回MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto.Builder builder =MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto.newBuilder();// 直接返回 Truebuilder.setFlag("true");MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto response = builder.build();return response;}}

经验总结扩展阅读