rpc到自己java实现rpc调用再到rpc框架设计

news/2025/2/23 22:55:34

目录

  • rpc(Remote Procedure Call)
    • rpc一般架构
    • 为什么要引入rpc
    • 自己实现rpc调用
      • 1. 新建一个maven项目,加入hessian依赖
      • 2. 服务端
      • 3. Stub代理
      • 4. 客户端测试输出
      • 5. rpc程序分析
        • 附 请求参数和序列化程序
      • 6. 总结
    • 回顾RPC
      • RPC 序列化协议
      • RPC 网络协议
      • 注册中心的引入
      • dubbo框架看一个rpc框架的实现架构

rpcRemote_Procedure_Call_3">rpc(Remote Procedure Call)

Remote Procedure Call (RPC) is a powerful technique for constructing distributed, client-server based applications. It is based on extending the conventional local procedure calling so that the called procedure need not exist in the same address space as the calling procedure. The two processes may be on the same system, or they may be on different systems with a network connecting them.

rpc_7">rpc一般架构

在这里插入图片描述

  • 客户端(Client):服务调用方
  • 客户端存根(Client Stub):存放服务端地址信息,将客户端的请求参数数据信息打包成网络消息(序列化),再通过网络传输发送给服务端
  • Network Service:底层传输,可以是 TCP 或 HTTP,或其它网络协议
  • 服务端存根(Server Stub):接收客户端发送过来的请求消息并进行解包(反序列化),然后再调用本地服务进行处理
  • 服务端(Server):服务的真正提供者

rpc_18">为什么要引入rpc

两台不同的主机进程要进行通信?

最易想到的最原始做法:tcp/ip通信,二进制数据传输

蛮烦点在于:要写网络相关处理;对方服务进行动态扩展后,客户端又得重新对接处理。最好能能像本地调用localService.doSth()一样,能调用B机器的bService.doSth(), C机器的cService.doSth()

要解决这个问题,提升开发效率,由此开始了rpc的引入,下面通过java编程来完成这一基本目标

rpc_28">自己实现rpc调用

基础知识点如下,其实很基础,就是大一学生学完Java就基本能操作

  • JAVA socket编程基础
  • JAVA反射
  • 代理模式/动态代理
  • 序列化

1. 新建一个maven项目,加入hessian依赖

项目结构如下:
在这里插入图片描述

  • pom.xml
java"><?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.rpc</groupId>
    <artifactId>rpctest</artifactId>
    <version>1.0-SNAPSHOT</version>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>com.caucho</groupId>
            <artifactId>hessian</artifactId>
            <version>4.0.38</version>
        </dependency>
    </dependencies>

</project>

2. 服务端

java">package com;

import com.entity.RpcRequest;
import com.service.impl.ProServiceImpl;
import com.service.impl.UserServiceImpl;
import com.util.HessianSerializerUtil;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * @Author mubi
 * @Date 2020/5/18 23:18
 */
public class Server {
    public static void main(String[] args) throws Exception {
        // 监听指定的端口
        final int port = 55533;
        ServerSocket server = new ServerSocket(port);

        // server将一直等待连接的到来
        System.out.println("server将一直等待连接的到来");
        while (true) {
            Socket client = server.accept();
            System.out.println("accept client:" + client.getPort());
            new Thread(() -> {
                try {
                    // 建立好连接后,从socket中获取客户端传递过来的对象
                    InputStream in = client.getInputStream();
                    RpcRequest rpcRequest = HessianSerializerUtil.deserialize(readInputStream(in));
                    System.out.println("rpcRequest:" + rpcRequest);

                    // 执行方法,
                    // 需要从服务注册中找到具体的类,这里模拟判断
                    Class clazz = null;
                    if (rpcRequest.getClassName().equals("com.service.IUserService")) {
                        clazz = UserServiceImpl.class;
                    }
                    if (rpcRequest.getClassName().equals("com.service.IProService")) {
                        clazz = ProServiceImpl.class;
                    }
                    Method method = clazz.getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
                    Object o = method.invoke(clazz.newInstance(), rpcRequest.getArgs());

                    // 返回对象 二进制形式发送给客户端
                    OutputStream out = client.getOutputStream();
                    out.write(HessianSerializerUtil.serialize(o));
                    out.flush();

                    client.close();
                } catch (Exception e) {

                }
            }).start();
        }
//        server.close();
    }

    public static byte[] readInputStream(InputStream inputStream) throws IOException {
        byte[] buffer = new byte[2048];
        int len;
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        while((len = inputStream.read(buffer)) != -1) {
            bos.write(buffer, 0, len);
        }
        return bos.toByteArray();
    }

}

3. Stub代理

java">package com;

import com.entity.RpcRequest;
import com.util.HessianSerializerUtil;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;

/**
 * @Author mubi
 * @Date 2020/5/18 23:18
 */
public class Stub {

    public static Object getStub(Class clazz){
        // 调用方法处理器
        InvocationHandler h = new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                // 要连接的服务端IP地址和端口
                final String host = "127.0.0.1";
                final int port = 55533;
                // 与服务端建立连接
                Socket socket = new Socket(host, port);

                // 构造请求服务器的对象
                RpcRequest rpcRequest = new RpcRequest(clazz.getName(), method.getName(),
                        method.getParameterTypes(), args);
                // 传递二进制给服务端
                OutputStream outputStream = socket.getOutputStream();
                outputStream.write(HessianSerializerUtil.serialize(rpcRequest));
                socket.shutdownOutput();

                // 直接读取服务端返回的二进制, 反序列化为对象返回
                InputStream inputStream = socket.getInputStream();
                byte[] bytes = readInputStream(inputStream);
                Object o = HessianSerializerUtil.deserialize(bytes);

                inputStream.close();
                outputStream.close();
                socket.close();

                return o;
            }
        };

        Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, h);
        System.out.println(o.getClass().getInterfaces()[0]);
        return o;
    }

    public static byte[] readInputStream(InputStream inputStream) throws IOException {
        byte[] buffer = new byte[2048];
        int len;
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        while((len = inputStream.read(buffer)) != -1) {
            bos.write(buffer, 0, len);
        }
        return bos.toByteArray();
    }

}

4. 客户端测试输出

java">package com;

import com.service.IProService;
import com.service.IUserService;

/**
 * @Author mubi
 * @Date 2020/5/18 23:18
 */
public class Client {

    public static void main(String[] args) {
        IUserService iUserService = (IUserService) Stub.getStub(IUserService.class);
        System.out.println(iUserService.getUserById(12));

        IProService iProService = (IProService) Stub.getStub(IProService.class);
        System.out.println(iProService.getProById(12));
    }

}

输出如下:
在这里插入图片描述

可以看到客户端使用远程服务像本地服务一样的调用了

rpc_261">5. rpc程序分析

java"> public static Object getStub(Class clazz){
        // 调用方法处理器
        InvocationHandler h = new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                // 要连接的服务端IP地址和端口
                final String host = "127.0.0.1";
                final int port = 55533;
                // 与服务端建立连接
                Socket socket = new Socket(host, port);

                // 构造请求服务器的对象
                RpcRequest rpcRequest = new RpcRequest(clazz.getName(), method.getName(),
                        method.getParameterTypes(), args);
                // 传递二进制给服务端
                OutputStream outputStream = socket.getOutputStream();
                outputStream.write(HessianSerializerUtil.serialize(rpcRequest));
                socket.shutdownOutput();

                // 直接读取服务端返回的二进制, 反序列化为对象返回
                InputStream inputStream = socket.getInputStream();
                byte[] bytes = readInputStream(inputStream);
                Object o = HessianSerializerUtil.deserialize(bytes);

                inputStream.close();
                outputStream.close();
                socket.close();

                return o;
            }
        };

        Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, h);
        System.out.println(o.getClass().getInterfaces()[0]);
        return o;
    }

可以看到使用了java动态代理,客户端拿到的ServiceImpl实际上是个代理对象,

调用行为进行了代理,如下几个步骤

  1. 网络请求socket连接服务端
  2. 使用hessian将参数 序列化,转化为字节流,进行socket通信
  3. 服务端socket通信,hessian 反序列化客户端传入的参数,然后反射完成服务的调用,然后 序列化 返回结果
  4. 客户端收到服务端响应,反序列化结果,输出
附 请求参数和序列化程序
  • RpcRequest
java">package com.entity;

import java.io.Serializable;
import java.util.Arrays;

/**
 * rpc请求通用参数结构
 * @Author mubi
 * @Date 2020/5/18 23:10
 */
public class RpcRequest implements Serializable {
    String className;
    String methodName;
    Class[] paramTypes;
    Object[] args;

    public RpcRequest(String className, String methodName, Class[] paramTypes, Object[] args) {
        this.className = className;
        this.methodName = methodName;
        this.paramTypes = paramTypes;
        this.args = args;
    }

    public String getClassName() {
        return className;
    }

    public String getMethodName() {
        return methodName;
    }

    public Class[] getParamTypes() {
        return paramTypes;
    }

    public Object[] getArgs() {
        return args;
    }

    @Override
    public String toString() {
        return "RpcRequest{" +
                "className='" + className + '\'' +
                ", methodName='" + methodName + '\'' +
                ", paramTypes=" + Arrays.toString(paramTypes) +
                ", args=" + Arrays.toString(args) +
                '}';
    }
}

  • HessianSerializerUtil
java">package com.util;

import com.caucho.hessian.io.HessianInput;
import com.caucho.hessian.io.HessianOutput;
import com.entity.User;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;


public class HessianSerializerUtil {

    public static <T> byte[] serialize(T obj) {
        byte[] bytes = null;
        // 1、创建字节输出流
        ByteArrayOutputStream bos = new ByteArrayOutputStream();

        // 2、对字节数组流进行再次封装

        // step 1. 定义外部序列化工厂
        //ExtSerializerFactory extSerializerFactory = new ExtSerializerFactory();
        //extSerializerFactory.addSerializer(java.time.OffsetDateTime.class, new OffsetDateTimeRedisSerializer());
        //extSerializerFactory.addDeserializer(java.time.OffsetDateTime.class, new OffsetDateTimeRedisDeserializer());
        // step 2. 序列化工厂
        //SerializerFactory serializerFactory = new SerializerFactory();
        //serializerFactory.addFactory(extSerializerFactory);

        HessianOutput hessianOutput = new HessianOutput(bos);
        //hessianOutput.setSerializerFactory(serializerFactory);

        try {
            // 注意,obj 必须实现Serializable接口
            hessianOutput.writeObject(obj);
            bytes = bos.toByteArray();
        } catch (IOException e) {
            e.printStackTrace();
        }

        return bytes;
    }

    public static <T> T deserialize(byte[] data) {
        if (data == null) {
            return null;
        }
        // 1、将字节数组转换成字节输入流
        ByteArrayInputStream bis = new ByteArrayInputStream(data);

        // step 1. 定义外部序列化工厂
        //ExtSerializerFactory extSerializerFactory = new ExtSerializerFactory();
        //extSerializerFactory.addSerializer(java.time.OffsetDateTime.class, new OffsetDateTimeRedisSerializer());
        //extSerializerFactory.addDeserializer(java.time.OffsetDateTime.class, new OffsetDateTimeRedisDeserializer());
        // step 2. 序列化工厂
        //SerializerFactory serializerFactory = new SerializerFactory();
        //serializerFactory.addFactory(extSerializerFactory);
        HessianInput hessianInput = new HessianInput(bis);
        //hessianInput.setSerializerFactory(serializerFactory);
        Object object = null;

        try {
            object = hessianInput.readObject();
        } catch (IOException e) {
            e.printStackTrace();
        }

        return (T) object;
    }

    static void test() throws Exception{
        User user = new User(1, "wang");
        byte[] bytes = HessianSerializerUtil.serialize(user);
        System.out.println(user);
        User user1 = HessianSerializerUtil.deserialize(bytes);
        System.out.println(user1);
    }

    public static void main(String[] args) throws Exception {
        test();
    }
}

6. 总结

程序其实完成了如下图:
在这里插入图片描述

通过最基础的java反射、态代理、socket编程等就实现了简单的类似rpc调用

接下来不管多少种服务;只要双方约定好,客户端直接调用即可,基本不需要修改Stub相关代码; client 调用远程方法,就像调用本地方法一样(客户端同学都不需要懂网络底层,直接服务端有什么,就能用什么)

回顾RPC

在这里插入图片描述

RPC 序列化协议

RPC(Remote Procedure Call,远程过程调用)序列化协议是用于在网络上传输数据的一种机制,特别是在客户端和服务器之间传输函数调用请求和响应时。序列化是将数据结构或对象状态转换为可以存储或传输的格式的过程。在RPC通信中,序列化是关键步骤,因为它使得数据能够在网络上安全传输并被另一端正确解析。

常见的序列化协议包括json、xml、hession、protobuf、thrift、text、bytes等;

  • JSON是一种轻量级的数据交换格式,易于阅读和编写,支持跨语言使用。然而,JSON的序列化和反序列化速度较慢,且序列化后的数据体积较大,不适合对性能要求较高的场景‌

  • Protobuf‌:由谷歌开发,支持多语言平台,序列化后的数据体积小,序列化速度快。它需要预编译IDL文件,适用于需要高效数据传输和存储的场景‌

  • Thrift‌:由Facebook开发,支持跨语言服务开发,序列化速度快,数据体积小。Thrift既是传输协议也是序列化协议,适用于需要高效数据处理的分布式系统‌

  • Hessian‌:主要用于Web服务的序列化和反序列化,支持跨语言调用,但相对于其他协议,Hessian的性能略逊一筹‌

RPC 网络协议

如下通信协议

  • TCP/UDP
  • Web Service
  • Restful(http + json)
  • RMI(Remote Method Invocation)
  • JMS(Java Message Service)
  • RPC(Remote Procedure Call)

不过本质还是掌握Socket编程,了解网络IO相关知识

注册中心的引入

随着服务数量的增多,各个服务之间的调用变得错综复杂,一个服务可能依赖外部多个服务,当一个服务的域名或IP地址改变了之后如何通知依赖方,或者依赖方如何快速的发现服务提供方的地址变化。

两种方案:

  1. 客户端与服务端自己维护:有多少个服务,客户端就要维护多少个(服务增减,负载均衡,心跳)
  2. 找个代理,客户端有需求找代理,代理维持这些服务,也能给客户通知;(可以看成代理模式

在这里插入图片描述

显然是注册中心的方式更加合理和方便。

服务中心可以进行服务注册,类似维护一个登记簿,它管理系统内所有的服务地址。当新的服务启动后,它会向登记簿交待自己的地址信息。服务的依赖方直接向登记簿要Service Provider地址就行了,或者基于某种约定(负载均衡)拿到服务的一个具体实例进行通信就好了

回顾springboot+dubbo+zookeeper的注册服务和调用实践:https://blog.csdn.net/qq_26437925/article/details/145790590

rpc_517">dubbo框架看一个rpc框架的实现架构

在这里插入图片描述


http://www.niftyadmin.cn/n/5863843.html

相关文章

网络运维学习笔记 017 HCIA-Datacom综合实验01

文章目录 综合实验1实验需求总部特性 分支8分支9 配置一、 基本配置&#xff08;IP二层VLAN链路聚合&#xff09;ACC_SWSW-S1SW-S2SW-Ser1SW-CoreSW8SW9DHCPISPGW 二、 单臂路由GW 三、 vlanifSW8SW9 四、 OSPFSW8SW9GW 五、 DHCPDHCPGW 六、 NAT缺省路由GW 七、 HTTPGW 综合实…

使用 Grafana 监控 Spring Boot 应用

随着软件开发领域的不断发展&#xff0c;监控和可观测性已成为确保系统可靠性和性能的关键实践。Grafana 是一个功能强大的开源工具&#xff0c;能够为来自各种来源的监控数据提供丰富的可视化功能。在本篇博客中&#xff0c;我们将探讨如何将 Grafana 与 Spring Boot 应用程序…

LeetCode 热题 100 73. 矩阵置零

LeetCode 热题 100 | 73. 矩阵置零 大家好&#xff0c;今天我们来解决一道经典的算法题——矩阵置零。这道题在LeetCode上被标记为中等难度&#xff0c;要求我们将矩阵中为0的元素所在的行和列全部置为0。下面我将分别给出非原地算法和原地算法的Python代码实现&#xff0c;并…

共筑金融数字化新生态!YashanDB与恒生电子完成兼容互认证

近日&#xff0c;深圳计算科学研究院的崖山数据库系统YashanDB与恒生电子股份有限公司HUNDSUN资产估值与会计核算软件V6.0成功完成了兼容性互认证。结果显示&#xff0c;双方产品完全兼容&#xff0c;稳定运行&#xff0c;可共同为银行、证券、基金、保险、信托等金融机构提供稳…

21.《SpringBoot 异步编程@Async与CompletableFuture》

SpringBoot 异步编程 文章导读 本文系统讲解 Spring Boot 异步编程的核心技术与实践方案&#xff0c;涵盖从基础使用到高级优化的全链路知识。通过深入剖析 Async 注解原理、线程池配置策略、异步异常处理机制等关键技术点&#xff0c;结合典型业务场景的代码示例&#xff0c…

【网络安全 | 漏洞挖掘】账户接管+PII+原漏洞绕过

文章目录 前言正文前言 本文涉及的所有漏洞测试共耗时约三周,成果如下: 访问管理面板,成功接管目标列出的3000多家公司。 获取所有员工的真实指纹、机密文件及个人身份信息(PII)。 绕过KYC认证,成功接管电话号码。 绕过此前发现的漏洞。 正文 在测试目标时,我发现了一…

chrome扩展程序如何实现国际化

先来看一个 manifest.json 文件的内容例子&#xff1a; { "update_url": "https://clients2.google.com/service/update2/crx ","default_locale": "en","name": "__MSG_appName__","short_name": &q…

lattice hdl实现spi接口

在lattice工具链中实现SPI接口通常涉及以下步骤: 定义硬件SPI接口的管脚。配置SPI时钟和模式。编写SPI主机或从机的控制逻辑。 展示了如何在Lattice工具链中使用HDL语言(例如Verilog)来配置SPI接口: lattice工程 顶层:spi_slave_top.v `timescale 1ns/ 1ps module spi_…