APP下载

一步一步教你写一个RPC

消息来源:baojiabao.com 作者: 发布时间:2026-05-20

报价宝综合消息一步一步教你写一个RPC

1.1 RPC 是什么

定义:RPC(Remote Procedure Call Protocol)——远端过程呼叫协议 ,RPC协议假定某些传输协议的存在,如TCP或UDP,为通讯程式之间携带资讯资料。在OSI网络通讯模型中,RPC跨越了传输层应用层 ,RPC使得开发包括网络分散式多程式在内的应用程序更加容易。

我的理解:与其说把RPC 看作是一种协议,倒不如把 它看作是一种 客户机/服务器互动的模式,但是 RPC一定是基于 TCP 或者 其他 通讯协议的

下面我们来看一下一个RPC呼叫的流程涉及哪些通讯细节:

服务消费方(client)呼叫以本地呼叫方式呼叫服务;(1)client stub接收到呼叫后负责将方法、引数等组装成能够进行网络传输的讯息体;(2)client stub找到服务地址,并将讯息传送到服务端;(3)server stub收到讯息后进行解码;(4)server stub根据解码结果呼叫本地的服务;(5)本地服务执行并将结果返回给server stub;(6)server stub将返回结果打包成讯息并发送至消费方;(7)client stub接收到讯息,并进行解码;(8)服务消费方得到最终结果。(9)RPC的目标就是要2~8这些步骤都封装起来,让使用者对这些细节透明。

1.2 手动实现

1.2.1 先做一个空界面实现序列化界面

public interface IRpcService extends Serializable{

}

1.2.2 做一个需要被远端呼叫的界面 以及对应的界面实现类

public interface IHelloService extends IRpcService{

String sayHi(String name,String message);

}

public class HelloServiceImpl implements IHelloService{

private static final long serialVersionUID = 146468468464364698L;

@Override

public String sayHi(String name, String message) {

return new StringBuilder().append("hi~!").append(",").append(message).toString();

}

}

1.2.3 需要写一个服务端,主要的作用 是进行服务注册(界面注册) 以及 接收客户端的呼叫引数 执行呼叫请求 返回结果

注:这个地方 我没有采用dom4j 解析配置档案的形式 进行界面注册 有时间的朋友可以多加一层

public interface Server {

//Socket埠

int PORT = 8080;

//启动服务端

void start() throws IOException;

//停止服务端

void stop();

/**

* 服务注册

* -- serviceInterface 对外暴露界面

* -- 内部实现类

*/

void regist(Class extends IRpcService> serviceInterface,Class extends IRpcService> impl);

}

public class ServerCenter implements Server{

/**执行绪池 接收客户端呼叫**/

private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 20, 200, TimeUnit.MILLISECONDS,new ArrayBlockingQueue(10));

/**服务注册快取**/

public static final Map> serviceRegistry = new HashMap();

/**

* 启动服务

*/

@Override

public void start() throws IOException {

ServerSocket server = new ServerSocket();

server.bind(new InetSocketAddress(PORT));

try {

while(true){

executor.execute(new ServiceTask(server.accept()));

}

} finally {

server.close();

}

}

/**

* 停止服务

*/

@Override

public void stop() {

executor.shutdown();

}

/**

* 注册服务

*/

@Override

public void regist(Class extends IRpcService> serviceInterface, Class extends IRpcService> impl) {

serviceRegistry.put(serviceInterface.getName(), impl);

}

private static class ServiceTask implements Runnable{

Socket client = null;

public ServiceTask(Socket client) {

this.client = client;

}

@Override

public void run() {

ObjectInputStream input = null;

ObjectOutputStream output = null;

try {

input = new ObjectInputStream(client.getInputStream());

String serviceName = input.readUTF();

String methodName = input.readUTF();

Class>[] parameterTypes = (Class>[]) input.readObject();

Object[] arguments = (Object[]) input.readObject();

Class> serviceClass = serviceRegistry.get(serviceName);

if(serviceClass == null){

throw new ClassNotFoundException(serviceName + "not found");

}

Method method = serviceClass.getMethod(methodName, parameterTypes);

Object result = method.invoke(serviceClass.newInstance(), arguments);

//将执行结果反序列化 通过socket返给客户端

output = new ObjectOutputStream(client.getOutputStream());

output.writeObject(result);

} catch (Exception e) {

e.printStackTrace();

} finally {

if(input != null){

try {

input.close();

} catch (IOException e) {

e.printStackTrace();

}

}

if(output != null){

try {

output.close();

} catch (IOException e) {

e.printStackTrace();

}

}

if(client != null){

try {

client.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

}

}

public static void main(String[] args) throws Exception {

ServerCenter center = new ServerCenter();

center.regist(IHelloService.class,new HelloServiceImpl().getClass());

center.start();

}

}

1.2.4 写一个客户端,用动态代理 获取被代理界面的 各种引数 传输给 服务端,接收返回结果,打印到控制台

public class Client {

@SuppressWarnings("unchecked")

public static T getRemoteProxyObj(final Class extends IRpcService> serviceInterface,final InetSocketAddress addr){

return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class>[]{serviceInterface}, new InvocationHandler() {

@Override

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

Socket socket = null;

ObjectOutputStream output = null;

ObjectInputStream input = null;

try {

//1.建立Socket客户端,根据指定地址连线远端服务提供者

socket = new Socket();

socket.connect(addr);

//2.将远端服务呼叫所需的界面类、方法名、引数列表等编码后传送给服务提供者

output = new ObjectOutputStream(socket.getOutputStream());

output.writeUTF(serviceInterface.getName());

output.writeUTF(method.getName());

output.writeObject(method.getParameterTypes());

output.writeObject(args);

//3.同步阻塞等待服务器返回应答 获取应答后返回

input = new ObjectInputStream(socket.getInputStream());

return input.readObject();

} finally{

if(socket != null){

socket.close();

}

if(output != null){

output.close();

}

if(input != null){

input.close();

}

}

}

});

}

}

1.2.5 测试

注:测试之前 需要开启服务端

public class RpcTest {

public static void main(String[] args) throws IOException {

IHelloService service = Client.getRemoteProxyObj(IHelloService.class, new InetSocketAddress(8080));

System.out.println(service.sayHi("张三", "新年快乐!"));

}

}

就这样我们实现了一个简陋的RPC

本文意在通过实现简单的RPC,去真正意义上对RPC框架的实现原理有初步的了解,而不是人云亦云。

此RPC实现有诸多缺点,但是 我们只要明白RPC的基座 其他的RPC框架只是完善基座以及扩充套件而已 。

2019-12-29 02:49:00

相关文章