亚洲视频二区_亚洲欧洲日本天天堂在线观看_日韩一区二区在线观看_中文字幕不卡一区

公告:魔扣目錄網(wǎng)為廣大站長提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請(qǐng)做好本站友鏈:【 網(wǎng)站目錄:http://www.430618.com 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會(huì)員:747

RPC全稱Remote Procedure Call,即遠(yuǎn)程過程調(diào)用,對(duì)于調(diào)用者無感知這是一個(gè)遠(yuǎn)程調(diào)用功能。目前流行的開源RPC 框架有阿里的Dubbo、google 的 gRPC、Twitter 的Finagle 等。本次RPC框架的設(shè)計(jì)主要參考的是阿里的Dubbo,這里Netty 基本上是作為架構(gòu)的技術(shù)底層而存在的,主要完成高性能的網(wǎng)絡(luò)通信,從而實(shí)現(xiàn)高效的遠(yuǎn)程調(diào)用。

Dubbo的架構(gòu)與Spring

其實(shí)在之前的文章中《談?wù)劸〇|的服務(wù)框架》,探討過Dubbo的組成和架構(gòu)。

真的夠可以的,基于Netty實(shí)現(xiàn)了RPC框架

 


真的夠可以的,基于Netty實(shí)現(xiàn)了RPC框架

 

另外使用Dubbo最方便的地方在于它可以和Spring非常方便的集成,Dubbo對(duì)于配置的優(yōu)化也是隨著Spring一脈相承的,從最早的XML形式到后來的注解方式以及自動(dòng)裝配,都是在不斷地簡(jiǎn)化開發(fā)過程來提高開發(fā)效率。

Dubbo在Spring框架中的工作流程:

1、Spring的IOC容器啟動(dòng)

2、把服務(wù)注冊(cè)到注冊(cè)中心(zookeeper軟件)中

3、消費(fèi)者啟動(dòng)時(shí)會(huì)把它需要用到的服務(wù)從注冊(cè)中心拉取下來

4、提供者的地址發(fā)生改變時(shí),注冊(cè)中心會(huì)馬上通知消費(fèi)者

5、根據(jù)注冊(cè)中心中的服務(wù)地址直接就可以調(diào)用提供者了,如果調(diào)用了提供者,就會(huì)把提供者的地址主動(dòng)緩存起來

6、監(jiān)控消費(fèi)者調(diào)用提供者的次數(shù)

RPC實(shí)現(xiàn)的關(guān)鍵

1、序列化與反序列化

在遠(yuǎn)程過程調(diào)用時(shí),客戶端跟服務(wù)端是不同的進(jìn)程,甚至有時(shí)候客戶端用JAVA,服務(wù)端用C++。這時(shí)候就需要客戶端把參數(shù)先轉(zhuǎn)成一個(gè)字節(jié)流,傳給服務(wù)端后,再把字節(jié)流轉(zhuǎn)成自己能讀取的格式,這個(gè)過程叫序列化和反序列化,同理,從服務(wù)端返回的值也需要序列化反序列化的過程。在序列化的時(shí)候,我們選擇Netty自身的對(duì)象序列化器。

真的夠可以的,基于Netty實(shí)現(xiàn)了RPC框架

 

2、數(shù)據(jù)網(wǎng)絡(luò)傳輸

解決了序列化的問題,那么剩下的就是如何把數(shù)據(jù)參數(shù)傳到生產(chǎn)者,網(wǎng)絡(luò)傳輸層需要把序列化后的參數(shù)字節(jié)流傳給服務(wù)端,然后再把序列化后的調(diào)用結(jié)果傳回客戶端,雖然大部分RPC框架都采用了TCP作為傳輸協(xié)議,其實(shí)UDP也可以作為傳輸協(xié)議的,基于TCP和UDP我們可以自定義任意規(guī)則的協(xié)議,加之我們要使用NIO通信方式作為高性能網(wǎng)絡(luò)服務(wù)的前提,于是Netty似乎更符合我們Java程序員的口味,Netty真香!

3、告訴注冊(cè)中心我要調(diào)誰

現(xiàn)在調(diào)用參數(shù)的序列化和網(wǎng)絡(luò)傳輸都已經(jīng)具備,但是還有個(gè)問題,那就是消費(fèi)者要調(diào)用誰的問題,一個(gè)函數(shù)或者方法,我們可以理解為一個(gè)服務(wù),這些服務(wù)注冊(cè)在注冊(cè)中心上面,只有當(dāng)消費(fèi)者告訴注冊(cè)中心要調(diào)用誰,才可以進(jìn)行遠(yuǎn)程調(diào)用。所以不但要把將要調(diào)用的服務(wù)的參數(shù)傳過去,也要把要調(diào)用的服務(wù)信息傳過去。

簡(jiǎn)易R(shí)PC框架的架構(gòu)

真的夠可以的,基于Netty實(shí)現(xiàn)了RPC框架

 

Dubbo 核心模塊主要有四個(gè):Registry 注冊(cè)中心、Provider 服務(wù)提供者、Consumer 服務(wù)消費(fèi)者、Monitor監(jiān)控,為了方便直接砍掉了監(jiān)控模塊,同時(shí)把服務(wù)提供者模塊與注冊(cè)中心模塊寫在一起,通過實(shí)現(xiàn)自己的簡(jiǎn)易IOC容器,完成對(duì)服務(wù)提供者的實(shí)例化。

關(guān)于使用Netty進(jìn)行Socket編程的部分可以參考Netty的官網(wǎng) 或者我之前的博客《Netty編碼實(shí)戰(zhàn)與Channel生命周期》,在這里Netty的編碼技巧和方式不作為本文的重點(diǎn)。

RPC框架編碼實(shí)現(xiàn)

首先需要引入的依賴如下(Netty + Lombok):

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.6.Final</version>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.16.8</version>
</dependency>

1、Registry與Provider

目錄結(jié)構(gòu)如下:

───src
    └─main
        ├─java
        │  └─edu
        │      └─xpu
        │          └─rpc
        │              ├─api
        │              │      IRpcCalc.java
        │              │      IRpcHello.java
        │              │
        │              ├─core
        │              │      InvokerMessage.java
        │              │
        │              ├─provider
        │              │      RpcCalcProvider.java
        │              │      RpcHelloProvider.java
        │              │
        │              └─registry
        │                      MyRegistryHandler.java
        │                      RpcRegistry.java
        │
        └─resources
───pom.xml

IRpcCalc.java與IRpcHello.java是兩個(gè)Service接口。IRpcCalc.java內(nèi)容如下,完成模擬業(yè)務(wù)加、減、乘、除運(yùn)算

public interface IRpcCalc {
    // 加
    int add(int a, int b);

    // 減
    int sub(int a, int b);

    // 乘
    int mul(int a, int b);

    // 除
    int div(int a, int b);
}

IRpcHello.java,測(cè)試服務(wù)是否可用:

public interface IRpcHello {
    String hello(String name);
}

至此API 模塊就定義完成了,非常簡(jiǎn)單的兩個(gè)接口。接下來,我們要確定傳輸規(guī)則,也就是傳輸協(xié)議,協(xié)議內(nèi)容當(dāng)然要自定義,才能體現(xiàn)出Netty 的優(yōu)勢(shì)。

設(shè)計(jì)一個(gè)InvokerMessage類,里面包含了服務(wù)名稱、調(diào)用方法、參數(shù)列表、參數(shù)值,這就是我們自定義協(xié)議的協(xié)議包:

@Data
public class InvokerMessage implements Serializable {
    private String className; // 服務(wù)名稱
    private String methodName; // 調(diào)用哪個(gè)方法
    private Class<?>[] params; // 參數(shù)列表
    private Object[] values; // 參數(shù)值
}

通過定義這樣的協(xié)議類,就能知道我們需要調(diào)用哪個(gè)服務(wù),服務(wù)中的哪個(gè)方法,方法需要傳遞的參數(shù)列表(參數(shù)類型+參數(shù)值),這些信息正確傳遞過去了才能拿到正確的調(diào)用返回值。

接下來創(chuàng)建這兩個(gè)服務(wù)的具體實(shí)現(xiàn)類,IRpcHello的實(shí)現(xiàn)類如下:

public class RpcHelloProvider implements IRpcHello {
    public String hello(String name) {
        return "Hello, " + name + "!";
    }
}

IRpcCalc的實(shí)現(xiàn)類如下:

public class RpcCalcProvider implements IRpcCalc {
    @Override
    public int add(int a, int b) {
        return a + b;
    }

    @Override
    public int sub(int a, int b) {
        return a - b;
    }

    @Override
    public int mul(int a, int b) {
        return a * b;
    }

    @Override
    public int div(int a, int b) {
        return a / b;
    }
}

Registry 注冊(cè)中心主要功能就是負(fù)責(zé)將所有Provider的服務(wù)名稱和服務(wù)引用地址注冊(cè)到一個(gè)容器中(這里為了方便直接使用接口類名作為服務(wù)名稱,前提是假定我們每個(gè)服務(wù)只有一個(gè)實(shí)現(xiàn)類),并對(duì)外發(fā)布。Registry 應(yīng)該要啟動(dòng)一個(gè)對(duì)外的服務(wù),很顯然應(yīng)該作為服務(wù)端,并提供一個(gè)對(duì)外可以訪問的端口。先啟動(dòng)一個(gè)Netty服務(wù),創(chuàng)建RpcRegistry 類,RpcRegistry.java的具體代碼如下:

public class RpcRegistry {
    private final int port;
    public RpcRegistry(int port){
        this.port = port;
    }

    public void start(){
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workGroup)
                    .channel(NIOServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            // 處理拆包、粘包的編解碼器
                            pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                            pipeline.addLast(new LengthFieldPrepender(4));
                            // 處理序列化的編解碼器
                            pipeline.addLast("encoder", new ObjectEncoder());
                            pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                            // 自己的業(yè)務(wù)邏輯
                            pipeline.addLast(new MyRegistryHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true); // 設(shè)置長連接

            ChannelFuture channelFuture = serverBootstrap.bind(this.port).sync();
            System.out.println("RPC Registry start listen at " + this.port);
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
    public static void main(String[] args) {
        new RpcRegistry(8080).start();
    }
}

接下來只需要實(shí)現(xiàn)我們自己的Handler即可,創(chuàng)建MyRegistryHandler.java,內(nèi)容如下:

public class MyRegistryHandler extends ChannelInboundHandlerAdapter {
    // 在注冊(cè)中心注冊(cè)服務(wù)需要有容器存放
    public static ConcurrentHashMap<String, Object> registryMap = new ConcurrentHashMap<>();

    // 類名的緩存位置
    private static final List<String> classCache = new ArrayList<>();

    // 約定,只要是寫在provider下所有的類都認(rèn)為是一個(gè)可以對(duì)完提供服務(wù)的實(shí)現(xiàn)類
    // edu.xpu.rpc.provider

    public MyRegistryHandler(){
        scanClass("edu.xpu.rpc.provider");
        doRegister();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Object result = new Object();
        // 客戶端傳過來的調(diào)用信息
        InvokerMessage request = (InvokerMessage)msg;
        // 先判斷有沒有這個(gè)服務(wù)
        String serverClassName = request.getClassName();
        if(registryMap.containsKey(serverClassName)){
            // 獲取服務(wù)對(duì)象
            Object clazz = registryMap.get(serverClassName);
            Method method = clazz.getClass().getMethod(request.getMethodName(), request.getParams());
            result = method.invoke(clazz, request.getValues());
            System.out.println("request=" + request);
            System.out.println("result=" + result);
        }
        ctx.writeAndFlush(result);
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }


    // 實(shí)現(xiàn)簡(jiǎn)易IOC容器
    // 掃描出包里面所有的Class
    private void scanClass(String packageName){
        ClassLoader classLoader = this.getClass().getClassLoader();
        URL url = classLoader.getResource(packageName.replaceAll("\.", "/"));
        File dir = new File(url.getFile());
        File[] files = dir.listFiles();
        for (File file: files){
            if(file.isDirectory()){
                scanClass(packageName + "." + file.getName());
            }else{
                // 拿出類名
                String className = packageName + "." + file.getName().replace(".class", "").trim();
                classCache.add(className);
            }
        }
    }

    // 把掃描到的Class實(shí)例化,放到Map中
    // 注冊(cè)的服務(wù)名稱就叫做接口的名字 [約定優(yōu)于配置]
    private void doRegister(){
        if(classCache.size() == 0) return;
        for (String className: classCache){
            try {
                Class<?> clazz = Class.forName(className);
                // 服務(wù)名稱
                Class<?> anInterface = clazz.getInterfaces()[0];
                registryMap.put(anInterface.getName(), clazz.newInstance());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

在這里還通過反射實(shí)現(xiàn)了簡(jiǎn)易的IOC容器,先遞歸掃描provider包底下的類,把這些類的對(duì)象作為服務(wù)對(duì)象放到IOC容器中進(jìn)行管理,由于IOC是一個(gè)Map實(shí)現(xiàn)的,所以將類名作為服務(wù)名稱,也就是Key,服務(wù)對(duì)象作為Value。根據(jù)消費(fèi)者傳過來的服務(wù)名稱,就可以找到對(duì)應(yīng)的服務(wù),到此,Registry和Provider已經(jīng)全部寫完了。

2、consumer

目錄結(jié)構(gòu)如下:

└─src
    ├─main
    │  ├─java
    │  │  └─edu
    │  │      └─xpu
    │  │          └─rpc
    │  │              ├─api
    │  │              │      IRpcCalc.java
    │  │              │      IRpcHello.java
    │  │              │
    │  │              ├─consumer
    │  │              │  │  RpcConsumer.java
    │  │              │  │
    │  │              │  └─proxy
    │  │              │          RpcProxy.java
    │  │              │          RpcProxyHandler.java
    │  │              │
    │  │              └─core
    │  │                      InvokerMessage.java
    │  │
    │  └─resources
    └─test
        └─java
└─ pom.xml

在看客戶端的實(shí)現(xiàn)之前,先梳理一下RPC流程。API 模塊中的接口只在服務(wù)端實(shí)現(xiàn)了。因此,客戶端調(diào)用API 中定義的某一個(gè)接口方法時(shí),實(shí)際上是要發(fā)起一次網(wǎng)絡(luò)請(qǐng)求去調(diào)用服務(wù)端的某一個(gè)服務(wù)。而這個(gè)網(wǎng)絡(luò)請(qǐng)求首先被注冊(cè)中心接收,由注冊(cè)中心先確定需要調(diào)用的服務(wù)的位置,再將請(qǐng)求轉(zhuǎn)發(fā)至真實(shí)的服務(wù)實(shí)現(xiàn),最終調(diào)用服務(wù)端代碼,將返回值通過網(wǎng)絡(luò)傳輸給客戶端。整個(gè)過程對(duì)于客戶端而言是完全無感知的,就像調(diào)用本地方法一樣,所以必定要對(duì)客戶端的API接口做代理,隱藏網(wǎng)絡(luò)請(qǐng)求的細(xì)節(jié)。

真的夠可以的,基于Netty實(shí)現(xiàn)了RPC框架

 

由上圖的流程圖可知,要讓用戶調(diào)用無感知,必須創(chuàng)建出代理類來完成網(wǎng)絡(luò)請(qǐng)求的操作。

RpcProxy.java如下:

public class RpcProxy {
    public static <T> T create(Class<?> clazz) {
        //clazz傳進(jìn)來本身就是interface
        MethodProxy proxy = new MethodProxy(clazz);
        T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz} , proxy);
        return result;
    }

    private static class MethodProxy implements InvocationHandler {
        private Class<?> clazz;

        public MethodProxy(Class<?> clazz) {
            this.clazz = clazz;
        }
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            // 如果傳進(jìn)來是一個(gè)已實(shí)現(xiàn)的具體類
            if (Object.class.equals(method.getDeclaringClass())) {
                try {
                    return method.invoke(this, args);
                } catch (Throwable t) {
                    t.printStackTrace();
                }
                // 如果傳進(jìn)來的是一個(gè)接口(核心)
            } else {
                return rpcInvoke(method, args);
            }
            return null;
        }

        // 實(shí)現(xiàn)接口的核心方法
        public Object rpcInvoke(Method method, Object[] args) {
            // 傳輸協(xié)議封裝
            InvokerMessage invokerMessage = new InvokerMessage();
            invokerMessage.setClassName(this.clazz.getName());
            invokerMessage.setMethodName(method.getName());
            invokerMessage.setValues(args);
            invokerMessage.setParams(method.getParameterTypes());

            final RpcProxyHandler consumerHandler = new RpcProxyHandler();
            EventLoopGroup group = new NioEventLoopGroup();

            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                                //自定義協(xié)議編碼器
                                pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                                //對(duì)象參數(shù)類型編碼器
                                pipeline.addLast("encoder", new ObjectEncoder());
                                //對(duì)象參數(shù)類型解碼器
                                pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                                pipeline.addLast("handler", consumerHandler);
                            }
                        });
                ChannelFuture future = bootstrap.connect("localhost", 8080).sync();
                future.channel().writeAndFlush(invokerMessage).sync();
                future.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                group.shutdownGracefully();
            }
            return consumerHandler.getResponse();
        }
    }
}

我們通過傳進(jìn)來的接口對(duì)象,獲得了要調(diào)用的服務(wù)名,服務(wù)方法名,參數(shù)類型列表,參數(shù)列表,這樣就把自定義的RPC協(xié)議包封裝好了,只需要把協(xié)議包發(fā)出去等待結(jié)果返回即可,所以為了接收返回值數(shù)據(jù)還需要自定義一個(gè)接收用的Handler,RpcProxyHandlerdiamante如下:

public class RpcProxyHandler extends ChannelInboundHandlerAdapter {
    private Object result;

    public Object getResponse() {
        return result;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        result = msg;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("client exception is general");
    }
}

這樣就算是完成了整個(gè)流程,下面開始測(cè)試一下吧,測(cè)試的RpcConsumer.java代碼如下:

public class RpcConsumer {
    public static void main(String[] args) {
        // 本機(jī)之間的正常調(diào)用
        // IRpcHello iRpcHello = new RpcHelloProvider();
        // iRpcHello.hello("Tom");

        // 肯定是用動(dòng)態(tài)代理來實(shí)現(xiàn)的
        // 傳給它接口,返回一個(gè)接口的實(shí)例,偽代理
        IRpcHello rpcHello = RpcProxy.create(IRpcHello.class);
        System.out.println(rpcHello.hello("ZouChangLin"));

        int a = 10;
        int b = 5;
        IRpcCalc iRpcCalc = RpcProxy.create(IRpcCalc.class);

        System.out.println(String.format("%d + %d = %d", a, b, iRpcCalc.add(a, b)));
        System.out.println(String.format("%d - %d = %d ", a, b, iRpcCalc.sub(a, b)));
        System.out.println(String.format("%d * %d = %d", a, b, iRpcCalc.mul(a, b)));
        System.out.println(String.format("%d / %d = %d", a, b, iRpcCalc.div(a, b)));
    }
}

3、效果測(cè)試

先開啟Registry,運(yùn)行端口是8080:

真的夠可以的,基于Netty實(shí)現(xiàn)了RPC框架

 

開啟consumer開始調(diào)用

真的夠可以的,基于Netty實(shí)現(xiàn)了RPC框架

 

調(diào)用完成后可以看到調(diào)用結(jié)果正確,并且在Registry這邊也看到了日志:

真的夠可以的,基于Netty實(shí)現(xiàn)了RPC框架

 

可以發(fā)現(xiàn),簡(jiǎn)易R(shí)PC框架順利完工!

作者:zchanglin
鏈接:
https://juejin.cn/post/6948351262668636174
來源:掘金

分享到:
標(biāo)簽:框架 RPC
用戶無頭像

網(wǎng)友整理

注冊(cè)時(shí)間:

網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會(huì)員

趕快注冊(cè)賬號(hào),推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學(xué)四六

運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績(jī)?cè)u(píng)定2018-06-03

通用課目體育訓(xùn)練成績(jī)?cè)u(píng)定