【深入浅出 Yarn 架构与实现】2-2 Yarn 基础库 - 底层通信库 RPC( 三 )

5、编写 proto 的协议接口
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerProto;import org.apache.hadoop.ipc.ProtocolInfo;@ProtocolInfo(protocolName = "com.shuofxz.blablabla", protocolVersion = 1)public interface MyResourceTrackerPB extends MyResourceTrackerProto.MyResourceTrackerService.BlockingInterface {}6、编写 proto 的协议接口实现(服务端)
import com.google.protobuf.RpcController;import com.google.protobuf.ServiceException;import com.shuofxz.protobuf_rpc.interf.MyResourceTracker;import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto;import com.shuofxz.protobuf_rpc.interf.MyResourceTrackerPB;public class MyResourceTrackerServerSidePB implements MyResourceTrackerPB {final private MyResourceTracker server;public MyResourceTrackerServerSidePB(MyResourceTracker server) {this.server = server;}@Overridepublic MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto registerNodeManager(RpcController controller, MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto request) throws ServiceException {try {return server.registerNodeManager(request);} catch (Exception e) {e.printStackTrace();}return null;}}7、RPC Server 的实现
import com.shuofxz.protobuf_rpc.interf.MyResourceTrackerPB;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.ipc.ProtobufRpcEngine;import org.apache.hadoop.ipc.RPC;import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerProto;import java.io.IOException;public class ProtobufRpcServer {public static void main(String[] args) throws IOException {Configuration conf = new Configuration();RPC.setProtocolEngine(conf, MyResourceTrackerPB.class, ProtobufRpcEngine.class);// 构建 Rpc ServerRPC.Server server = new RPC.Builder(conf).setProtocol(MyResourceTrackerPB.class).setInstance(MyResourceTrackerProto.MyResourceTrackerService.newReflectiveBlockingService(new MyResourceTrackerServerSidePB(new MyResourceTrackerImpl()))).setBindAddress("localhost").setPort(9998).setNumHandlers(1).setVerbose(true).build();// Rpc Server 启动server.start();}}8、RPC Client 的实现
import com.google.protobuf.ServiceException;import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.ipc.ProtobufRpcEngine;import org.apache.hadoop.ipc.RPC;import com.shuofxz.protobuf_rpc.interf.MyResourceTrackerPB;import java.io.IOException;import java.net.InetSocketAddress;public class ProtobufRpcClient {public static void main(String[] args) throws IOException {// 设置 RPC 引擎为 ProtobufRpcEngineConfiguration conf = new Configuration();String hostname = "localhost";int port = 9998;RPC.setProtocolEngine(conf, MyResourceTrackerPB.class, ProtobufRpcEngine.class);// 获取代理MyResourceTrackerPB protocolProxy = RPC.getProxy(MyResourceTrackerPB.class, 1, new InetSocketAddress(hostname, port), conf);// 构建请求对象MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto.Builder builder =MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto.newBuilder();MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto bigdata02 =builder.setHostname("bigdata02").setCpu(64).setMemory(128).build();// 发送 RPC 请求 , 获取响应MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto response = null;try {response = protocolProxy.registerNodeManager(null, bigdata02);} catch (ServiceException e) {e.printStackTrace();}// 处理响应String flag = response.getFlag();System.out.println("最终注册结果: flag = " + flag);}}9、测试先启动服务端 , 在启动客户端 。
四、总结本节介绍了 Hadoop 底层通信库 RPC 。首先介绍了 RPC 的框架和原理 , 之后对 Hadoop 自己实现的 RPC 进行了介绍 , 并给出了两个 demo 实践 。强烈建议了解基础知识后 , 跟着 demo 实现一个案例出来 , 可以更好的帮助你理解 。文中 Demo:https://github.com/Simon-Ace/hadoop_rpc_demo

经验总结扩展阅读