我们前面手写了RPC,但是终究只是个demo,学习一下专业的RPC框架

学习目标:

  • 了解应用架构演进过程

  • 了解RPC远程调用方式

  • 掌握Dubbo框架的架构【重点】

  • 掌握Zookeeper注册中心的基本使用

  • 掌握Dubbo生产者和消费者的开发【重点】

  • 了解Dubbo的管理控制台的使用

  • 了解Dubbo的相关配置

  • 了解Dubbo的负载均衡(4种)

  • 了解Dubbo的配置中心(难点 代码 Watch)

  • 能够掌握Dubbo的架构体系

  • 能够掌握Dubbo的源码环境搭建及控制台使用

  • 能够掌握Dubbo与SpringBoot的集成整合

  • 能够掌握Dubbo的高阶配置运用

前置知识

应用架构的演进过程

主流的互联网技术特点

分布式 、高并发、集群、负载均衡、高可用(故障转移)。

分布式:一件事情拆开来做。

集群:一件事情大家一起做。

负载均衡:将请求平均分配到不同的服务器中,达到均衡的目的。


高并发:同一时刻,处理同一件事情的处理能力(解决方案:分布式、集群、负载均衡)

高可用:系统都是可用的。实现故障转移

image-20230922105822161

架构演变的过程

单一应用架构(all in one)

当网站流量很小时,只需一个应用,将所有功能都部署在一起,以减少部署节点和成本。此时,用于简化增删改查工作量的数据访问框架(ORM)是关键。

image-20230922105830271

  • 架构优点:

    架构简单,前期开发成本低、开发周期短,适合小型项目(OA、CRM、ERP 企业内部应用 jsp)。

  • 架构缺点:

    全部功能集成在一个工程中

    (1)业务代码耦合度高,不易维护。

    (2)维护成本高,不易拓展

    (3)并发量大,不易解决

    (4)技术栈受限,只能使用一种语言开发。

垂直应用架构

当访问量逐渐增大,单一应用增加机器带来的加速度越来越小,将应用拆成互不相干的几个应用,以提升效率。此时,用于加速前端页面开发的Web框架(MVC)是关键。

image-20230922105835220

  • 架构优点:

    (1)业务代码相对解耦

    (2)维护成本相对易于拓展(修改一个功能,可以直接修改一个项目,单独部署)

    (3)并发量大相对易于解决(搭建集群)

    (4)技术栈可扩展(不同的系统可以用不同的编程语言编写)。

  • 架构缺点:

    功能集中在一个项目中,不利于开发、扩展、维护。

    代码之间存在数据、方法的冗余

分布式服务架构

当垂直应用越来越多,应用之间交互不可避免,将核心业务抽取出来,作为独立的服务,逐渐形成稳定的服务中心,使前端应用能更快速的响应多变的市场需求。此时,用于提高业务复用及整合的分布式服务框架(RPC)是关键。

image-20230922105840933

  • 架构优点:

    (1)业务代码完全解耦,并可实现通用

    (2)维护成本易于拓展(修改一个功能,可以直接修改一个项目,单独部署)

    (3)并发量大易于解决(搭建集群)

    (4)技术栈完全扩展(不同的系统可以用不同的编程语言编写)。

  • 架构缺点:

    缺少统一管理资源调度的框架

流动计算架构(SOA)

当服务越来越多,容量的评估,小服务资源的浪费等问题逐渐显现,此时需增加一个调度中心基于访问压力实时管理集群容量,提高集群利用率。此时,用于提高机器利用率的资源调度和治理中心(SOA)是关键。

image-20230922105845399

​ 资源调度和治理中心的框架:dubbo、spring cloudy

  • 架构优点:

    (1)业务代码完全解耦,并可实现通用

    (2)维护成本易于拓展(修改一个功能,可以直接修改一个项目,单独部署)

    (3)并发量大易于解决(搭建集群)

    (4)技术栈完全扩展(不同的系统可以用不同的编程语言编写)。

    ( 5 ) 框架实现了服务治理,不去担心集群的使用情况(失败会尝试其它服务….)

小结

1:单体架构

全部功能集中在一个项目内(All in one)。 打一个war, 一个tomcat

2:垂直架构

按照业务进行切割,形成小的单体项目。 每个业务模块就是一个war,一个tomcat

3 : 分布式:应用调用服务(ip写死),服务挂了就不能用了,缺少服务治理。把业务服务(service+dao)拆分成一个war包,一个tomcat里。

4:SOA架构(项目一)

服务的提供者(以服务为主 service调用dao->数据库),消费者。

服务提供者与消费都注册到中心,由中心统一管理分配,失败重试,负载均衡。。。有服务治理

可以使用dubbo作为调度的工具(RPC协议), Zookeeper作为注册中心

5:微服务架构(项目二 畅购 Springboot+Spring Cloud)

将系统服务层完全独立出来,抽取为一个一个的微服务。 以功能为主(controller->service-dao->数据库 独立)

特点一:抽取的粒度更细,遵循单一原则,数据可以在服务之间完成数据传输(一般使用restful请求调用资源)。

特点二: 采用轻量级框架协议传输。(可以使用spring cloud)(http协议 restful json)

特点三: 每个服务都使用不同的数据库,完全独立和解耦 (分库分表)。

RPC(远程过程调用)

RPC介绍

​ Remote Procedure Call 远程过程调用,是分布式架构的核心,按响应方式分如下两种:

​ 同步调用:客户端调用服务方方法,等待直到服务方返回结果或者超时,再继续自己的操作。

​ 异步调用:客户端把消息发送给中间件,不再等待服务端返回,直接继续自己的操作。

  • 是一种进程间的通信方式

  • 它允许应用程序调用网络上的另一个应用程序中的方法

  • 对于服务的消费者而言,无需了解远程调用的底层细节,是透明的

    需要注意的是RPC并不是一个具体的技术,而是指整个网络远程调用过程。

    RPC是一个泛化的概念,严格来说一切远程过程调用手段都属于RPC范畴。各种开发语言都有自己的RPC框架。Java中的RPC框架比较多,广泛使用的有RMI、Hessian、Dubbo、spring Cloud(restapi http)等。

一台电脑调用另外一台脑上的方法

RPC组件【重点】

简单来说一个RPC架构里包含如下4个组件:

1、 客户端(Client):服务调用者

2、 客户端存根(Client Stub):存放服务端地址信息,将客户端的请求参数打包成网络消息,再通过网络发送给服务方

3、 服务端存根(Server Stub):接受客户端发送过来的消息并解包,再调用本地服务

4、 服务端(Server):服务提供者。

image-20230922105850750

RPC调用

image-20230922105855402

1、 服务调用方(client)调用以本地调用方式调用服务;

2、 client stub接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体(在Java里就是序列化的过程)

3、 client stub找到服务地址,并将消息通过网络发送到服务端;

4、 server stub收到消息后进行解码,在Java里就是反序列化的过程;

5、 server stub根据解码结果调用本地的服务;

6、 本地服务执行处理逻辑;

7、 本地服务将结果返回给server stub;

8、 server stub将返回结果打包成消息,Java里的序列化;

9、 server stub将打包后的消息通过网络并发送至消费方;

10、 client stub接收到消息,并进行解码, Java里的反序列化;

11、 服务调用方(client)得到最终结果。

小结

1:RPC 远程过程调用 调用另一个应用的方法

2:RPC组件及调用过程

客户端->客户端存根->服务端存根->服务端->服务端存根->客户端存根->客户端

3:调用方式 接口调用

  • Web Service 接口 http+soap <xml>
  • Dubbo 接口

什么是长连接?

image-20230922105811318

Dubbo基础使用

概述

Apache Dubbo是一款高性能的Java RPC框架。其前身是阿里巴巴公司开源的一个高性能、轻量级的开源Java RPC 框架,可以和Spring框架无缝集成。

RPC

​ RPC全称为remote procedure call,即远程过程调用。比如两台服务器A和B,A服务器上部署一个应用,B服务器上部署一个应用,A服务器上的应用想调用B服务器上的应用提供的方法,由于两个应用不在一个内存空间,不能直接调用,所以需要通过网络来表达调用的语义和传达调用的数据。

​ 需要注意的是RPC并不是一个具体的技术,而是指整个网络远程调用过程。

​ RPC是一个泛化的概念,严格来说一切远程过程调用手段都属于RPC范畴。各种开发语言都有自己的RPC框架。

Java中的RPC框架比较多,广泛使用的有RMI、Hessian、Dubbo等。

Dubbo官网地址:http://dubbo.apache.org

Dubbo提供了三大核心能力:面向接口的远程方法调用,智能容错和负载均衡,以及服务自动注册和发现(dubbo帮你注册在zookeeper创建节点数据)和发现(订阅 watch, dubbo帮我们做了)。

架构

image-20211129103230584

image-20220607045601676

节点角色说明:

节点 角色名称
Provider 暴露服务的服务提供方 服务方
Consumer 调用远程服务的服务消费方 调用方
Registry 服务注册与发现的注册中心
Monitor 统计服务的调用次数和调用时间的监控中心
Container 服务运行容器
  • 虚线都是异步访问,实线都是同步访问
  • 蓝色虚线:在启动时完成的功能
  • 红色虚线(实线)都是程序运行过程中执行的功能

调用关系说明:

  1. 服务容器负责启动,加载,运行服务提供者。
  2. 服务提供者在启动时,向注册中心注册自己提供的服务。
  3. 服务消费者在启动时,向注册中心订阅自己所需的服务。
  4. 注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者。
  5. 服务消费者,从注册中心拉取服务提供者列表,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行调用,如果调用失败,再选另一台调用。
  6. 服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心。

安装Zookeeper

通过前面的Dubbo架构图可以看到,Registry(服务注册中心)在其中起着至关重要的作用。Dubbo官方推荐使用Zookeeper作为服务注册中心。

Zookeeper介绍

Zookeeper 是 Apache Hadoop 的子项目,是一个树型的目录服务,支持变更推送,适合作为 Dubbo 服务的注册中心,工业强度较高,可用于生产环境,并推荐使用 。

为了便于理解Zookeeper的树型目录服务,我们先来看一下我们电脑的文件系统(也是一个树型目录结构):

image-20211129103910211

​ 我的电脑可以分为多个盘符(例如C、D、E等),每个盘符下可以创建多个目录,每个目录下面可以创建文件,也可以创建子目录,最终构成了一个树型结构。通过这种树型结构的目录,我们可以将文件分门别类的进行存放,方便我们后期查找。而且磁盘上的每个文件都有一个唯一的访问路径,例如:C:\Windows\itcast\hello.txt。

Zookeeper树型目录服务

image-20211129103934434

流程说明:

  • 服务提供者(Provider)启动时: 向 /dubbo/com.foo.BarService/providers 目录下写入自己的 URL 地址
  • 服务消费者(Consumer)启动时: 订阅 /dubbo/com.foo.BarService/providers 目录下的提供者 URL 地址。并向 /dubbo/com.foo.BarService/consumers 目录下写入自己的 URL 地址
  • 监控中心(Monitor)启动时: 订阅 /dubbo/com.foo.BarService 目录下的所有提供者和消费者 URL 地址

安装Zookeeper

下载地址:http://archive.apache.org/dist/zookeeper/

本课程使用的Zookeeper版本为3.4.6,下载完成后可以获得名称为zookeeper-3.4.6.tar.gz的压缩文件。

安装步骤:

1
2
3
4
5
6
7
8
9
# 第一步:安装 jdk(略) 
# 第二步:把 zookeeper 的压缩包(zookeeper-3.4.6.tar.gz)上传到 linux 系统
# 第三步: 解压缩压缩包
tar -zxvf zookeeper-3.4.6.tar.gz
# 第四步:进入zookeeper-3.4.6目录,创建data目录 mkdir data
# 第五步:进入conf目录 ,把zoo_sample.cfg 改名为zoo.cfg
cd conf
mv zoo_sample.cfg zoo.cfg
# 第六步:打开 zoo.cfg文件, 修改data属性:dataDir=/root/zookeeper-3.4.6/data

启动、停止Zookeeper

1
2
3
4
5
6
# 进入Zookeeper的bin目录,启动服务命令 
./zkServer.sh start
# 停止服务命令
./zkServer.sh stop
# 查看服务状态:
./zkServer.sh status

Dubbo快速入门(服务者+消费者)

Dubbo作为一个RPC框架,其最核心的功能就是要实现跨网络的远程调用。本小节就是要创建两个应用,一个作为服务的提供方,一个作为服务的消费方。通过Dubbo来实现服务消费方远程调用服务提供方的方法。

服务提供方开发

pom配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.itheima</groupId>
<artifactId>dubbodemo_provider</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>war</packaging>

<name>dubbodemo_provider Maven Webapp</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<spring.version>5.0.5.RELEASE</spring.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aspects</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- dubbo相关 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.7</version>
</dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
<dependency>
<groupId>javassist</groupId>
<artifactId>javassist</artifactId>
<version>3.12.1.GA</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.6</version>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-spring</artifactId>
<version>1.3.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.tomcat.maven</groupId>
<artifactId>tomcat7-maven-plugin</artifactId>
<!-- 如果报错,加版本试试 -->
<version>2.1</version>
<configuration>
<!-- 指定端口 -->
<port>8083</port>
<!-- 请求路径 -->
<path>/</path>
</configuration>
</plugin>
</plugins>
</build>
</project>

web.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
<!DOCTYPE web-app PUBLIC
"-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
"http://java.sun.com/dtd/web-app_2_3.dtd" >
<web-app>
<display-name>Archetype Created Web Application</display-name>
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:applicationContext*.xml</param-value>
</context-param>
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
</web-app>

log4j

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
### direct log messages to stdout ###
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.err
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L - %m%n

### direct messages to file mylog.log ###
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.File=c:\\mylog.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L - %m%n

### set log levels - for more verbose logging change 'info' to 'debug' ###

log4j.rootLogger=debug, stdout

applciationContext-service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc.xsd
http://code.alibabatech.com/schema/dubbo
http://code.alibabatech.com/schema/dubbo/dubbo.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">

<!--每个dubbo应用(服务提供方和服务消费方)都必须指定一个唯一的名称-->
<dubbo:application name="dubbodemo_provider"></dubbo:application>
<!--指定服务的注册中心-->
<dubbo:registry address="zookeeper://39.97.237.124:2181"></dubbo:registry>
<!--配置协议和端口-->
<dubbo:protocol name="dubbo" port="20881"></dubbo:protocol>
<!--指定包扫描,用于发布dubbo服务-->
<dubbo:annotation package="com.itheima.service.impl"></dubbo:annotation>

<!--数据源-->
<bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
<property name="username" value="root" />
<property name="password" value="root" />
<property name="driverClassName" value="com.mysql.jdbc.Driver" />
<property name="url" value="jdbc:mysql://localhost:3306/test" />
</bean>
<!-- 事务管理器 -->
<bean id="transactionManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
<!--开启事务控制的注解支持-->
<tx:annotation-driven transaction-manager="transactionManager" proxy-target-class="true"/>
</beans>

端口默认20880,这里是环境是20881

接口使用

image-20211129105838812

启动服务 => tomcat7:run

服务消费方

pom配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.itheima</groupId>
<artifactId>dubbodemo_consumer</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>war</packaging>

<name>dubbodemo_consumer Maven Webapp</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<spring.version>5.0.5.RELEASE</spring.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aspects</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- dubbo相关 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.7</version>
</dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
<dependency>
<groupId>javassist</groupId>
<artifactId>javassist</artifactId>
<version>3.12.1.GA</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.tomcat.maven</groupId>
<artifactId>tomcat7-maven-plugin</artifactId>
<version>2.1</version>
<configuration>
<!-- 指定端口 -->
<port>8082</port>
<!-- 请求路径 -->
<path>/</path>
</configuration>
</plugin>
</plugins>
</build>
</project>

dubbo配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc.xsd
http://code.alibabatech.com/schema/dubbo
http://code.alibabatech.com/schema/dubbo/dubbo.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<dubbo:application name="dubbodemo_consumer"></dubbo:application>
<dubbo:registry address="zookeeper://39.97.237.124:2181"></dubbo:registry>
<dubbo:annotation package="com.itheima.controller"></dubbo:annotation>
<dubbo:consumer check="false"></dubbo:consumer>
</beans>

web.mxl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<!DOCTYPE web-app PUBLIC
"-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
"http://java.sun.com/dtd/web-app_2_3.dtd" >

<web-app>
<display-name>Archetype Created Web Application</display-name>
<servlet>
<servlet-name>springmvc</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<!-- 指定加载的配置文件 ,通过参数contextConfigLocation加载 -->
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:applicationContext-web.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>springmvc</servlet-name>
<url-pattern>*.do</url-pattern>
</servlet-mapping>
</web-app>

消费者使用dubbo去zookeeper注册中心查找服务

image-20211129111308374

在浏览器输入http://localhost:8082/demo/hello.do?name=Jack,查看浏览器输出结果

  • 思考一:上面的Dubbo入门案例中我们是将HelloService接口从服务提供者工程(dubbodemo_provider)复制到服务消费者工程(dubbodemo_consumer)中,这种做法是否合适?还有没有更好的方式?

答:这种做法显然是不好的,同一个接口被复制了两份,不利于后期维护。更好的方式是单独创建一个maven工程,将此接口创建在这个maven工程中。需要依赖此接口的工程只需要在自己工程的pom.xml文件中引入maven坐标即可。

  • 思考二:在服务消费者工程(dubbodemo_consumer)中只是引用了HelloService接口,并没有提供实现类,Dubbo是如何做到远程调用的?

答:Dubbo底层是基于代理技术为HelloService接口创建代理对象,远程调用是通过此代理对象完成的。可以通过开发工具的debug功能查看此代理对象的内部结构。另外,Dubbo实现网络传输底层是基于Netty框架完成的。

  • 思考三:上面的Dubbo入门案例中我们使用Zookeeper作为服务注册中心,服务提供者需要将自己的服务信息注册到Zookeeper,服务消费者需要从Zookeeper订阅自己所需要的服务,此时Zookeeper服务就变得非常重要了,那如何防止Zookeeper单点故障呢?

答:Zookeeper其实是支持集群模式的,可以配置Zookeeper集群来达到Zookeeper服务的高可用,防止出现单点故障。

Dubbo与Spring整合

​ Dubbo作为一个RPC框架,其最核心的功能就是要实现跨网络的远程调用,服务提供者、服务消费者会使用共同的接口,故本小节先创建一个父工程,父工程下有4个子模块,一个是公共子模块,一个是接口模块,一个是服务提供者模块,一个是服务消费者模块。通过Dubbo来实现服务消费方远程调用服务提供方的方法。

​ 实现的业务为:根据id查询用户对象

​ 业务描述:页面发送请求:user/findById.do?id=1 根据id从数据库获取用户对象

实现步骤:

 1. 环境准备:创建数据库,创建t_user表
 2. 创建父工程,基于maven,打包方式为pom,工程名:dubbo_parent
 3. 创建公共子模块,创建user实体类,打包方式为jar,工程名:dubbo_common
 4. 创建接口子模块,在父工程的基础上,打包方式为jar,模块名:dubbo_interface
 5. 创建服务提供者子模块,在父工程的基础上,打包方式为war,模块名:dubbo_provider
 6. 创建服务消费者模子块,在父工程的基础上,打包方式为war,模块名:dubbo_consumer

image-20220607045639262

环境准备

创建数据库表

1
2
3
4
5
6
7
8
9
10
11
12
create database itcastdubbo;

CREATE TABLE `t_user` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`username` varchar(20) DEFAULT NULL,
`age` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

INSERT INTO t_user(username,age) VALUES("张三",22);
INSERT INTO t_user(username,age) VALUES("李四",20);
INSERT INTO t_user(username,age) VALUES("王五",25);

创建父工程

​ 父工程,不实现任何代码,主要是添加工程需要的库的依赖管理(DependencyManagement),依赖管理就是解决项目中多个模块间公共依赖的版本号、scope的控制范围。本项目需要使用spring-webmvc,使用dubbo(务必2.6.2以上版本)、zookeeper及其客户端(curator-framework)、Spring、Mybatis依赖库。

  • GroupID:com.itheima

  • ArtifactId:dubbo_parent

    1. 创建完工程后,如图所示:

      image-20230922105631255

    2. 修改pom.xml,抽取版本、增加赖库依赖管理,全部内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.itheima</groupId>
<artifactId>dubbo_parent</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>dubbo_common</module>
<module>dubbo_interface</module>
<module>dubbo_provider</module>
<module>dubbo_consumer</module>
</modules>

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<spring.webmvc.version>5.0.5.RELEASE</spring.webmvc.version>
<dubbo.version>2.6.2</dubbo.version>
<zookeeper.version>3.4.7</zookeeper.version>
<curator.verion>4.0.1</curator.verion>
<mybatis.version>3.4.5</mybatis.version>
<mysql.version>5.1.47</mysql.version>
<mybatis-spring.version>1.3.2</mybatis-spring.version>
<druid.version>1.1.9</druid.version>
<slf4j.version>1.6.6</slf4j.version>
</properties>
<dependencyManagement>
<dependencies>
<!--springmvc的环境-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${spring.webmvc.version}</version>
</dependency>
<!--dubbo的环境-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo</artifactId>
<version>${dubbo.version}</version>
</dependency>
<!--zookeeper的环境-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.verion}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${curator.verion}</version>
</dependency>
<!--mybatis的环境-->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>${mybatis.version}</version>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-spring</artifactId>
<version>${mybatis-spring.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>${druid.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<!--spring的环境-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${spring.webmvc.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring.webmvc.version}</version>
</dependency>
<!--日志的环境-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

创建公共子模块

  • 在当前父工程的基础上创建子模块

  • GroupID:com.itheima

  • ArtifactId:dubbo_common

    image-20230922105639128

1:pom.xml

1
<packaging>jar</packaging>

2:在com.itheima.pojo包中创建User实体类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.itheima.pojo;

import java.io.Serializable;


public class User implements Serializable {
private Integer id;
private String username;
private Integer age;

public Integer getId() {
return id;
}

public void setId(Integer id) {
this.id = id;
}

public String getUsername() {
return username;
}

public void setUsername(String username) {
this.username = username;
}

public Integer getAge() {
return age;
}

public void setAge(Integer age) {
this.age = age;
}

@Override
public String toString() {
return "User{" +
"id=" + id +
", username='" + username + '\'' +
", age=" + age +
'}';
}
}

创建接口子模块

此模块,主要放业务接口的定义,它是服务消费者模块和服务提供者模块的公共依赖模块。

  • 在当前父工程的基础上创建子模块
  • GroupID:com.itheima
  • ArtifactId:dubbo_interface
  1. 创建子模块:

    image-20230922105645847

  2. pom.xml,添加依赖

    1
    2
    3
    4
    5
    6
    7
    8
    <packaging>jar</packaging>
    <dependencies>
    <dependency>
    <groupId>com.itheima</groupId>
    <artifactId>dubbo_common</artifactId>
    <version>1.0-SNAPSHOT</version>
    </dependency>
    </dependencies>
  3. 在包com.itheima.service创建业务接口

1
2
3
4
5
6
7
8
9
package com.itheima.service;

import com.itheima.pojo.User;

public interface UserService {

User findById(Integer id);
}

如图所示:

image-20230922105651214

服务提供者模块

此模块是服务提供者模块,需要在容器启动时,把服务注册到zookeeper,故需要引入spring-webmvc,zookeeper及客户端依赖。

【路径】

实现步骤:

  1. 创建子模块dubbo_provider,使用war 需要依赖dubbo_interface

  2. 代码目录

  3. image-20200618110432078

  4. resources资源目录

    • 创建spring-dubbo.xml 配置文件:

      • 配置dubbo的应用名称

      • 配置dubbo注册中心Zookeeper地址

      • 配置需要暴露的业务接口及实例

    • 创建spring-dao.xml

      • 配置数据源
      • 配置sqlSessionFactory对象
      • 扫描dao包,创建dao接口的动态代理对象
    • 创建spring-service.xml

      • 配置事务(事务管理器,注解事务)
  5. 在web.xml文件中,只要加载spring的配置文件即可

  6. 配置tomcat启动服务提供者 或使用ClassPathXmlApplicationContext加载spring容器

实现过程:

创建子模块dubbo_provider

或者叫做dubbo_service

  • 在当前父工程的基础上创建子模块

  • 打包方式为war

  • 项目坐标如下

    ​ GroupID:com.itheima

    ​ ArtifactId:dubbo_provider

工程创建完成后,如图所示:

image-20230922105700451

修改pom.xml文件,打包方式为war,增加依赖库,新增编译插件tomcat7,端口81

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
<packaging>war</packaging>
<dependencies>
<dependency>
<groupId>com.itheima</groupId>
<artifactId>dubbo_interface</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-spring</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
</dependencies>

service与dao

1:在main下,创建子目录java(刷新maven工程),增加com.itheima.service.impl包及创建UserServiceImpl实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.itheima.service.impl;

import com.itheima.pojo.User;
import com.itheima.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;

public class UserServiceImpl implements UserService {

@Autowired
UserDao userDao;

@Override
public User findById(Integer id) {
return userDao.findById(id);
}
}

2:增加com.itheima.dao包,创建UserDao接口

1
2
3
4
5
6
7
8
package com.itheima.dao;

import com.itheima.pojo.User;

public interface UserDao {
User findById(Integer id);
}

3:在resource下创建com/itheima/dao目录,创建UserDao接口的映射文件,内容如下

1
2
3
4
5
6
7
8
<?xml version="1.0" encoding="utf-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.itheima.dao.UserDao">
<select id="findById" resultType="com.itheima.pojo.User" parameterType="int">
select * from t_user where id = #{id}
</select>
</mapper>

配置文件编写

在main下,创建子目录resources目录,在resources目录下

创建

  • spring-dao.xml
  • jdbc.properties
  • spring-service.xml
  • spring-provider.xml
  • log4j.properties
  • 启动项目
spring-dao.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

<context:property-placeholder location="classpath:jdbc.properties"/>

<!-- 数据源 -->
<bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource">
<property name="driverClassName" value="${jdbc.driver}"/>
<property name="url" value="${jdbc.url}"/>
<property name="password" value="${jdbc.password}"/>
<property name="username" value="${jdbc.user}"/>
</bean>
<!-- 工厂 -->
<bean class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="dataSource"/>
<property name="typeAliasesPackage" value="com.itheima.pojo"/>
</bean>
<!-- dao扫描 -->
<bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<property name="basePackage" value="com.itheima.dao"/>
</bean>
</beans>
jdbc.properties
1
2
3
4
jdbc.driver=com.mysql.jdbc.Driver
jdbc.url=jdbc:mysql://localhost:3306/itcastdubbo
jdbc.user=root
jdbc.password=root
spring-service.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">

<!-- 事务管理器 -->
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
<!-- 事务注解 -->
<tx:annotation-driven/>
<!-- 注入dao -->
<import resource="classpath:spring-dao.xml"/>
</beans>
spring-provider.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">

<!-- 发布服务的名称 -->
<dubbo:application name="dubbo_provide"/>
<!-- 注册中心
zookeeper:
-->
<dubbo:registry address="zookeeper://127.0.0.1:2181"/>
<!-- service: 注册上去服务
interface: 发布服务的接口
ref: spring容器的bean对象
将来通过这个interface调用服务时,就来调用spring容器中的对象的方法
-->
<dubbo:service interface="com.itheima.service.UserService" ref="userService"/>
<!-- 服务真正的执行者 -->
<bean id="userService" class="com.itheima.service.impl.UserServiceImpl"/>

<!-- 注入spring-service.xml -->
<import resource="classpath:spring-service.xml"/>
</beans>
log4j.properties

将资料的log4j.properties配置文件拷贝到resources目录下

效果如下

image-20230922105709229

启动项目

只要能启动spring的容器(加载spring的配置文件)就可以启动项目,因此有3种方式

ClassPathXmlApplication
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.itheima;

import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.io.IOException;

/**
* Description: No Description
* User: Eric
*/
public class ProviderApplication {
public static void main(String[] args) throws IOException {
new ClassPathXmlApplicationContext("classpath:spring-provider.xml");
System.in.read();
}
}

监听器DispatcherServlet

配置web.xml文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
version="3.0">

<!-- 方式一: listener 启动spring容器
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:spring-provider.xml</param-value>
</context-param>
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
-->
<!-- 方式二: 启动mvc的核心控制器 -->
<servlet>
<servlet-name>dispatcherServlet</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:spring-provider.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>dispatcherServlet</servlet-name>
<url-pattern>*.do</url-pattern>
</servlet-mapping>
</web-app>

配置tomcat启动项目

注册中心验证

检查是否注册到zookeeper

(1)启动zookeeper,作为dubbo的注册中心

(2)登录zookeeper客户端,直接查看ls /dubbo/com.itheima.service.UserService节点

image-20200618114311789

  • 如果 /dubbo下面没有这个节点,说明没有注册上,

  • 如果有,内容是空,说明已经掉线

  • ==【正常注册并连接在线】==,如图所示:

image-20200618114402220

注意:

  • 消费者与提供者应用名称不能相同

  • 如果有多个服务提供者,名称不能相同,通信端口也不能相同

  • 只有服务提供者才会配置服务发布的协议,默认是dubbo协议,端口号是20880

    可以在spring-dubbo.xml中配置协议的端口

1
2
<!--发布dubbo协议,默认端口20880-->
<dubbo:protocol name="dubbo" port="20881"></dubbo:protocol>

其实就spring与mybatis整合,多了一dubbo的配置文件(注册Zookeeper)。

服务消费者模块

此模块是服务消费者模块,此模块基于是Web应用,需要引入spring-webmvc,需要在容器启动时,去zookeeper注册中心订阅服务,需要引入dubbo、zookeeper及客户端依赖。

实现步骤:

  1. 创建子模块dubbo_consumer,打包方式为war包,dubbo_interface(用来调用接口方法)

  2. java源代码目录

image-20200618114512958

  1. resources资源目录

    ​ 创建spring-dubbo.xml 配置文件:

    ​ 配置dubbo的应用名称

    ​ 配置dubbo注册中心Zookeeper地址

    ​ 配置需要订阅的业务接口及引用

    ​ 创建spring-mvc.xml

    ​ 开启注解扫描

    ​ 开启mvc注解驱动

  2. 将资料的log4j.properties配置文件拷贝到resources目录下

  3. 在web.xml文件中(拦截*.do),配置SpringMVC

  4. 启动服务消费者,并测试访问

实现过程:

创建子模块dubbo_consumer

或者叫做dubbo_controller

  • 在当前父工程的基础上创建子模块

  • 坐标如下

    ​ GroupID:com.itheima

    ​ ArtifactId:dubbo_consumer

1:创建工程,如图所示:

image-20230922105723963

2:修改pom.xml文件,引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
<packaging>war</packaging>

<dependencies>
<dependency>
<groupId>com.itheima</groupId>
<artifactId>dubbo_interface</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
</dependencies>

Controller

创建com.itheima.controller包及创建UserController控制类,该类调用远程业务UserService,实现调用findById方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.itheima.controller;

import com.itheima.pojo.User;
import com.itheima.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

//
@RestController
@RequestMapping("/user")
public class UserController {

@Autowired
private UserService userService;

@RequestMapping("/findById")
public User findById(int id){
// 调用服务
User user = userService.findById(id);
// 要返回json给前端,必须有@ResponseBody,或类上有@RestController
// 另外springmvc默认使用jaskson来解析java对象为json字符串,注意引入jaskson的依赖
return user;// 返回json数据
}
}

编写配置文件

在main下,创建子目录resources目录,在resources目录下

1:spring-dubbo.xml

2:spring-mvc.xml配置文件

3:log4j.properties

spring-dubbo.xml文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">

<!-- 发布的名称 -->
<dubbo:application name="dubbo_consumer"/>
<!-- 注册中心 -->
<dubbo:registry address="zookeeper://127.0.0.1:2181"/>
<!-- 服务订阅扫包
在controller的服务注入使用@Reference(dubbo)

<dubbo:annotation package="com.itheima"/> -->
<dubbo:reference interface="com.itheima.service.UserService" id="userService"/>

<!-- 启动时是否检查服务提供者是否存在,true: 则会检查【上线时】,没有则报错。false不检查
retries: 失败后的重试次数
-->
<dubbo:consumer check="false" timeout="2000" retries="2"/>

</beans>
spring-mvc.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

<!-- 扫controller-->
<context:component-scan base-package="com.itheima.controller"/>

<!-- 注解驱动 -->
<mvc:annotation-driven/>

<!-- 引入dubbo配置文件-->
<import resource="classpath:spring-dubbo.xml"/>

</beans>
导入log4j.properties

复制log4j.properteis到resources目录下

配置web.xml

配置springmvc的核心控制器,用来加载spring-mvc.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://java.sun.com/xml/ns/javaee"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
id="WebApp_ID" version="3.0">
<!--springmvc的核心控制器-->
<servlet>
<servlet-name>springMVC</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:spring-mvc.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>springMVC</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
<!--解决post请求的乱码过滤器-->
<filter>
<filter-name>CharacterEncodingFilter</filter-name>
<filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
<init-param>
<param-name>encoding</param-name>
<param-value>UTF-8</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>CharacterEncodingFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
</web-app>

启动服务消费者并测试

在浏览器输入http://localhost:80/user/findById?id=3,查看浏览器输出结果

image-20200618163351408

注意:因为是RPC的框架,要求传递的参数和实体类要实现序列化

参数:Integer类型(实现序列化接口java.io.Serializable)

返回值:User(实现序列化接口java.io.Serializable),如果不进行序列化,抛出异常

image-20230922105610974

Zookeeper中存放Dubbo服务结构

作为Dubbo运行的注册中心

Zookeeper树型目录服务:

image-20230922105616687

流程说明:

1:服务提供者(Provider)启动时: 向 /dubbo/com.foo.BarService/providers 目录下写入自己的 URL 地址

2:服务消费者(Consumer)启动时: 订阅 /dubbo/com.foo.BarService/providers 目录下的提供者 URL 地址。并向 /dubbo/com.foo.BarService/consumers 目录下写入自己的 URL 地址

3:监控中心(Monitor)启动时: 订阅 /dubbo/com.foo.BarService 目录下的所有提供者和消费者 URL 地址

小结

  1. 通过id查询用户信息

  2. 分模块工程构建 dubbo_parent 管理依赖的版本

    dubbo2.6.2 仓库中没有,只有引入真正的依赖时才会触发下载,dependencyManagement 是不会触发下载的, dependencies(与dependencyManagement 同级)里才会下载

  3. dubbo_common 实体类

  4. dubbo_interface 消费者与提供者都需要用到它,RPC远程调用 使用接口来调用

  5. dubbo_provider

    • 要查询数据库dao 接口与映射文件 , mybatis整合spring
    • 实现接口的方法UserServiceImpl 事务控制
    • 注册到dubbo上 application name=””, registry zookeeper://ip:端口, dubbo:service 接口名 ref=实现的bean对象 bean 实现类
    • 启动:3种方式:ClasspathXmlApplicationContext, ContextLoaderListener, DispatcherServlet
  6. dubbo_consumer

    • 接收请求响应结果controller, springmvc

    • springmvc

      • 扫controller,注解驱动
      • 导入dubbo的配置
    • dubbo的配置

      • 注册到dubbo上 application name=””, registry zookeeper://ip:端口, dubbo:reference 接口名 id=controller中注入的service属性名
    • web.xml

      • 乱码过滤器
      • 核心控制器

Dobbo管理控制台

我们在开发时,需要知道Zookeeper注册中心都注册了哪些服务,有哪些消费者来消费这些服务。我们可以通过部署一个管理中心来实现。其实管理中心就是一个web应用,部署到tomcat即可。

安装

安装步骤:

(1)将资料中的dubbo-admin-2.6.0.war文件复制到tomcat的webapps目录下

(2)启动tomcat,此war文件会自动解压

(3)修改WEB-INF下的dubbo.properties文件,注意dubbo.registry.address对应的值需要对应当前使用的 Zookeeper的ip

1
2
3
4
地址和端口号 
dubbo.registry.address=zookeeper://192.168.134.129:2181
dubbo.admin.root.password=root
dubbo.admin.guest.password=guest

(4)重启tomcat

image-20211129112547122

  • 如果报错:需要切换到jdk8
1
org.springframework.beans.factory.BeanDefinitionStoreException: Unexpected exception parsing XML document from ServletContext resource [/WEB-INF/webx.xml]; nested exception is java.lang.IllegalArgumentException: Unknown flag 0x1000

使用

启动服务提供者工程和服务消费者工程,可以在查看到对应的信息

访问http://localhost:8080/dubbo-admin-2.6.0/,输入用户名(root)和密码(root)

image-20211129111949714

Dubbo配置

包扫描

1
2
<!--【注意】约束头文件用的是appache的dubbo的约束-->
<dubbo:annotation package="com.itheima.service" />

服务提供者和服务消费者都需要配置,表示包扫描,作用是扫描指定包(包括子包)下的类。

服务提供者和消费者使用xml配置

如果不使用包扫描,也可以通过如下配置的方式来发布服务:

1
2
<bean id="helloService" class="com.itheima.service.impl.HelloServiceImpl" /> 
<dubbo:service interface="com.itheima.api.HelloService" ref="helloService" />

作为服务消费者,可以通过如下配置来引用服务

1
2
<!‐‐ 生成远程服务代理,可以和本地bean一样使用helloService ‐‐> 
<dubbo:reference id="helloService" interface="com.itheima.api.HelloService" />

上面这种方式发布和引用服务,一个配置项(dubbo:service、dubbo:reference)只能发布或者引用一个服务,如果有多个服务,这种方式就比较繁琐了。推荐使用包扫描方式。

如果使用包扫描,可以使用注解方式实现,推荐使用这种方式。

  1. 当扫到 @Service注解时,即会调用dubbo去zookeeper上注册服务。
  2. 当扫到 @Reference时,则会调用dubbo去zookeeper上订阅相应的接口服务

服务提供者,使用注解实现

第一步:在spring-dubbo.xml中配置

1
2
3
4
<!--【注意】约束头文件用的是appache的dubbo的约束
当扫到 @Service注解时,即会调用dubbo去zookeeper上注册服务
-->
<dubbo:annotation package="com.itheima.service" />

服务提供者和服务消费者都需要配置,表示包扫描,作用是扫描指定包(包括子包)下的类。

【注意】:==去掉==以下2项配置:

1
2
3
4
<!--指定暴露的服务接口及实例-->
<dubbo:service interface="com.itheima.service.UserService" ref="userSerivce"/>
<!--配置业务类实例-->
<bean id="userSerivce" class="com.itheima.service.impl.UserServiceImpl"/>

第二步:在HelloServiceImpl的类上使用注解: ==注意包名是dubbo的==

image-20200618163542686

【注意】:其中@Service是dubbo包下(com.alibaba.dubbo.config.annotation.Service)的注解。表示提供服务

服务消费者,使用注解实现

第一步:在spring-dubbo.xml中配置

1
2
<!--当扫到 @Reference时,则会调用dubbo去zookeeper上订阅相应的接口服务-->
<dubbo:annotation package="com.itheima.controller" />

这里的dubbo注解扫描

【注意】==去掉==:

1
2
<!--订阅远程服务对象,id的名称和Controller类中的UserService接口名称要一致-->
<dubbo:reference id="userService" interface="com.itheima.service.UserService"/>

第二步:在Controller类中使用

@Reference注解

image-20200618163705093

==【注意】==:其中@Reference是dubbo包下(com.alibaba.dubbo.config.annotation.Reference)的注解。表示订阅服务

重启服务测试使用

服务接口访问协议【提供方修改】

1
<dubbo:protocol name="dubbo" port="20880"/>

一般在服务提供者一方配置,可以指定使用的协议名称和端口号。

其中Dubbo支持的协议有:dubbo、rmi、hessian、http、webservice、rest、redis等。

推荐使用的是dubbo协议,默认端口号:20880。

dubbo 协议采用单一长连接和 NIO 异步通讯,适合于小数据量大并发的服务调用,以及服务消费者机器数远大于 服务提供者机器数的情况。不适合传送大数据量的服务,比如传文件,传视频等,除非请求量很低。 也可以在同一个工程中配置多个协议,不同服务可以使用不同的协议,例如

1
2
3
4
5
6
7
<!‐‐ 多协议配置 ‐‐> 
<dubbo:protocol name="dubbo" port="20880" />
<dubbo:protocol name="rmi" port="1099" />
<!‐‐ 使用dubbo协议暴露服务 ‐‐>
<dubbo:service interface="com.itheima.api.HelloService" ref="helloService" protocol="dubbo" />
<!‐‐ 使用rmi协议暴露服务 ‐‐>
<dubbo:service interface="com.itheima.api.DemoService" ref="demoService" protocol="rmi" />

dubbo协议:

  • 连接个数:单连接
  • 连接方式:长连接
  • 传输协议:TCP
  • 传输方式:NIO异步传输
  • 序列化:Hessian二进制序列化
  • 适用范围:传入传出参数数据包较小(建议小于100K),消费者比提供者个数多,单一消费者无法压满提供者,尽量不要用dubbo协议传输大文件或超大字符串。
  • 适用场景:常规远程服务方法调用

rmi协议:

  • 连接个数:多连接
  • 连接方式:短连接
  • 传输协议:TCP
  • 传输方式:同步传输
  • 序列化:Java标准二进制序列化
  • 适用范围:传入传出参数数据包大小混合,消费者与提供者个数差不多,可传文件。
  • 适用场景:常规远程服务方法调用,与原生RMI服务互操作

详情使用可通过博客文章:https://www.cnblogs.com/duanxz/p/3555876.html了解

启动时检查

这个配置需要配置在服务消费者一方,如果不配置默认check值为true。Dubbo 缺省会在启动时检查依赖的服务是否可用,不可用时会抛出异常,阻止 Spring 初始化完成,以便上线时,能及早发现问题。可以通过将check值 改为false来关闭检查。

1
<dubbo:consumer check="false"/>

建议在开发阶段将check值设置为false,在生产环境下改为true

如果设置为true,启动服务消费者,会抛出异常,表示没有服务提供者

image-20220305183708736

超时调用

默认的情况下,dubbo调用的时间为一秒钟,如果超过一秒钟就会报错,所以我们可以设置超时时间长些,保证调用不出问题,这个时间需要根据业务来进行确定。

image-20230922105744556

(1)修改消费者 配置文件,增加如下配置:

1
2
<!--超时时间为10秒钟-->
<dubbo:consumer timeout="10000"></dubbo:consumer>

(2)修改提供者配置文件,增加如下配置

1
2
<!--超时时间设置为10秒钟-->
<dubbo:provider timeout="10000"></dubbo:provider>

负载均衡

负载均衡(Load Balance):其实就是将请求分摊到多个操作单元上进行执行,从而共同完成工作任务。

​ 在集群负载均衡时,Dubbo 提供了多种均衡策略(包括随机random、轮询roundrobin、最少活跃调用数leastactive, 一致性哈希consistent hash),缺省【默认】为random随机调用。

​ 配置负载均衡策略,既可以在服务提供者一方配置(@Service(loadbalance = “roundrobin”)),也可以在服务消费者一方配置(@Reference(loadbalance = “roundrobin”)),两者取一。

  • 如下在服务消费者指定负载均衡策略,可在@Reference添加@Reference(loadbalance = “roundrobin”)

image-20211129112820457

可以通过启动多个服务提供者来观察Dubbo负载均衡效果。

注意:因为我们是在一台机器上启动多个服务提供者,所以需要修改tomcat的端口号和Dubbo服务的端口号来防止端口冲突。

在实际生产环境中,多个服务提供者是分别部署在不同的机器上,所以不存在端口冲突问题

测试负载均衡效果

​ 增加一个提供者,提供相同的服务;

​ 正式生产环境中,最终会把服务端部署到多台机器上,故不需要修改任何代码,只需要部署到不同机器即可测试。下面我们通过启动 ProviderApplication 类来做测试

① 修改

image-20200618164707122

② 先启动1个ProviderApplication

③ 设置可以启动多个实例

image-20200618164249526

④ 修改spring-provider.xml 端口依次改为20881,20882,20883

image-20200618164422770

⑤ 启动ProviderApplication,最终有3个,分别端口为20881,20882,20883

image-20200618164931446

其中:

1
2
@Reference(loadbalance = "roundrobin") 		// 表示轮询
@Reference(loadbalance = "random") // 表示随机(默认)
  1. 访问测试

    启动消费者,访问:http://localhost:92/user/findById?id=3

小结

1:负载均衡: 把请求均匀分配到各服务提供者上

可以在@Reference(loadbalance=””) 也可@Service(loadbalance=””)

2:loadbalance:

  • random 随机 默认
  • roundrobin 轮循
  • leastactive 最少活跃数
  • consistenhash 一致哈希

解决Dubbo事务问题

​ 前面我们已经完成了Dubbo的入门案例,通过入门案例我们可以看到通过Dubbo提供的标签配置就可以进行包扫描,扫描到@Service注解的类就可以被发布为服务。

​ 但是我们如果在服务提供者类上加入@Transactional事务控制注解后,服务就发布不成功了。原因是事务控制的底层原理是为服务提供者类创建代理对象,而默认情况下Spring是基于JDK动态代理方式创建代理对象,而此代理对象的完整类名为com.sun.proxy.$Proxy42(最后两位数字不是固定的),导致Dubbo在发布服务前进行包匹配时无法完成匹配,进而没有进行服务的发布。

(1)在pom.xml文件中增加maven坐标

image-20211129113234706

(2)在applicationContext-service.xml配置文件中加入数据源、事务管理器、开启事务注解的相关配置

image-20211129113159865

上面连接的数据库可以自行创建

(3)在HelloServiceImpl类上加入@Transactional注解

(4)启动服务提供者和服务消费者,并访问

image-20211129112907757

查看dubbo管理控制台发现服务并没有发布,如下:

image-20211129113137188

可以通过断点调试的方式查看Dubbo执行过程,Dubbo通过AnnotationBean的postProcessAfterInitialization方法进行处理

image-20211129112925591

解决方案

​ 通过上面的断点调试可以看到,在HelloServiceImpl类上加入事务注解后,Spring会为此类基于JDK动态代理技术创建代理对象,创建的代理对象完整类名为com.sun.proxy.$Proxy35,导致Dubbo在进行包匹配时没有成功(因为我们在发布服务时扫描的包为com.itheima.service),所以后面真正发布服务的代码没有执行。

解决方式操作步骤:

(1)修改applicationContext-service.xml配置文件,开启事务控制注解支持时指定proxy-target-class属性,值为true。其作用是使用cglib代理方式为Service类创建代理对象

1
2
<!‐‐开启事务控制的注解支持‐‐> 
<tx:annotation‐driven transaction‐manager="transactionManager" proxy‐target‐class="true"/>

image-20211129113004215

(2)修改HelloServiceImpl类,在Service注解中加入interfaceClass属性,值为HelloService.class,作用是指定服务的接口类型

image-20211129113029857

此处也是必须要修改的,否则会导致发布的服务接口为SpringProxy,而不是HelloService接口,如下:

image-20211129113038464

环境介绍

​ 数据发布/订阅即所谓的配置中心:发布者将数据发布到ZooKeeper一系列节点上面,订阅者进行数据订阅,当数据有变化时,可以及时得到数据的变化通知,达到动态及时获取数据的目的。

image-20230922105907989

现在项目中有两个提供者,配置了相同的数据源,如果此时要修改数据源,必须同时修改两个才可以。

​ 我们可以将数据源中需要的配置信息配置存储在zookeeper中,如果修改数据源配置,使用zookeeper的watch机制,同时对提供者的数据源信息更新。如下图所示:

image-20230922105922808

实现配置中心

【路径】

1:在zookeeper中添加数据源所需配置

2:在dubbo_common中导入jar包

3:修改数据源,读取zookeeper中数据源所需配置数据

(1)在dubbo_common中创建工具类:SettingCenterUtil,继承PropertyPlaceholderConfigurer

(2)编写载入zookeeper中配置文件,传递到Properties属性中

(3)重写processProperties方法

(4)修改spring-dao.xml

4:watch机制

(1)添加监听

(2)获取容器对象,刷新spring容器:SettingCenterUtil,实现ApplicationContextAware

在zookeeper中添加数据源所需配置

image-20230922105929624

让我们的程序读取Zookeeper中的配置,而不再从程序的中的jdbc.properties文件中读取。

在dubbo_common中导入jar包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-spring</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
</dependency>
</dependencies>

创建测试类,设置zookeeper上的数据库配置信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.itheima;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.Test;

/**
* Description: No Description
* User: Eric
*/
public class CreateJDBCPath {

@Test
public void createJDBCPath() throws Exception {
//1. 重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,1);
// 客户端
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",30000,3000,retryPolicy);
// 启动
client.start();

// 操作
client.create().creatingParentsIfNeeded().forPath("/config/jdbc.driver","com.mysql.jdbc.Driver".getBytes());
client.create().creatingParentsIfNeeded().forPath("/config/jdbc.url","jdbc:mysql://localhost:3306/itcastdubbo".getBytes());
client.create().creatingParentsIfNeeded().forPath("/config/jdbc.user","root".getBytes());
client.create().creatingParentsIfNeeded().forPath("/config/jdbc.password","root".getBytes());
// 关闭
client.close();
}

}

修改数据源,读取zookeeper中数据

在dubbo_common中创建工具类:SettingCenterUtil,继承PropertyPlaceholderConfigurer

image-20230922105936961

编写载入zookeeper中配置文件,传递到Properties属性中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.itheima.utils;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.config.PropertyPlaceholderConfigurer;
import org.springframework.util.StringValueResolver;

import java.util.Properties;

public class SettingCenterUtil extends PropertyPlaceholderConfigurer{

/**
* 读取zookeeper中数据库配置
*/
private void loadZk(Properties props) {
//connectString 连接字符串 host:port
String connectString = "127.0.0.1:2181";
//sessionTimeoutMs session timeout 会话超时时间
int sessionTimeoutMs = 1000;
//connectionTimeoutMs connection timeout 连接超时时间
int connectionTimeoutMs = 1000;
//retryPolicy retry policy to use 重试策略
// baseSleepTimeMs 每次重试间隔时间
// 1.创建重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);
// 2. 创建客户端
CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
// 3. 启动
client.start();// 启动

try {
String driver = new String(client.getData().forPath("/config/jdbc.driver"));
String url = new String(client.getData().forPath("/config/jdbc.url"));
String user = new String(client.getData().forPath("/config/jdbc.user"));
String password = new String(client.getData().forPath("/config/jdbc.password"));

// 设置数据库的配置
props.setProperty("jdbc.driver", driver);
props.setProperty("jdbc.url", url);
props.setProperty("jdbc.user", user);
props.setProperty("jdbc.password", password);

} catch (Exception e) {
e.printStackTrace();
}
}

}

重写processProperties方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 处理properties内容,相当于此标签
* <context:property-placeholder location="classpath:jdbc.properties"></context:property-placeholder>
* @param beanFactoryToProcess
* @param props
* @throws BeansException
*/
@Override
protected void processProperties(ConfigurableListableBeanFactory beanFactoryToProcess, Properties props)
throws BeansException {
// 设置数据库的配置
// 读取zookeeper
loadFormZk(props);
super.processProperties(beanFactoryToProcess, props);
}

修改spring-dao.xml

注释掉:

1
<context:property-placeholder location="classpath:jdbc.properties"></context:property-placeholder>

添加:

1
<bean class="com.itheima.utils.SettingCenterUtil"></bean>

spring-dao.xml中配置

image-20230922105943945

watch机制

在SettingCenterUtil中添加监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 监听数据库配置信息变化时刷新spring容器,触发重新加载数据库连接池
*/
private void addWatch() {
// 重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3, 3000);
// 创建客户端
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 3000, 3000, retryPolicy);
// 启动
client.start();

TreeCache treeCache = new TreeCache(client, "/config");
try {
treeCache.start();
} catch (Exception e) {
e.printStackTrace();
}

treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
if (event.getType() == TreeCacheEvent.Type.NODE_UPDATED) {
String path = event.getData().getPath();
System.out.println("==================================修改路径:" + path);
if(path.startsWith("/config/jdbc")){
// 修改了数据库配置
// 刷新spring容器,触发重新加载数据库连接池
applicationContext.refresh();
}
}
}
});
}

注意:

1:不要关闭client,否则无法进行监控

2:修改完成后必须刷新spring容器的对象

获取容器对象,刷新spring容器

1:修改SettingCenterUtil实现ApplicationContextAware接口,重写setApplicationContext方法,获取applicationContext对象。

image-20200618165500049

2:AbstractApplicationContext容器对象父类,提供了refresh方法,可以刷新容器中的对象,故强制转换。

3:在processProperties的方法中,添加addWatch(props);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 标识是否需要监听
*/
private static boolean flag = true;

/**
* 处理properties内容,相当于此标签
* <context:property-placeholder location="classpath:jdbc.properties"></context:property-placeholder>
* @param beanFactoryToProcess
* @param props
* @throws BeansException
*/
@Override
protected void processProperties(ConfigurableListableBeanFactory beanFactoryToProcess, Properties props)
throws BeansException {
// 设置数据库的配置
// 读取zookeeper
loadFormZk(props);
// 订阅数据库配置
if(flag) {
// 已经订阅过了,不需要再次订阅
addWatch();
flag=false;
}
super.processProperties(beanFactoryToProcess, props);
}

4:修改Zookeeper的配置

image-20230922105758133

小结

1:配置中心环境介绍 回看zookeeper配置中心的图

2:实现配置中心

(1)在Zookeeper中添加数据源所需配置

(2)在dubbo-common中导入jar包

(3)修改数据源,读取Zookeeper中数据 继承ProperteisPlaceHolderConfigurer.processProperties, 读取zk中的数据库配置,设置进props参数

3:watch机制

(1)添加监听 TreeCache, event.type=node_update, 注意:判断path 不要关闭客户端,防止重复订阅

(2)获取容器对象(ApplicationContextAware)且刷新

Dubbo源码学习

Dubbo架构系统

框架介绍

概述

Dubbo是阿里巴巴公司开源的一个高性能优秀的服务框架,使得应用可通过高性能的 RPC 实现服务的输出和输入功能,可以和 Spring框架无缝集成。

Dubbo是一款高性能、轻量级的开源Java RPC框架,它提供了三大核心能力:面向接口的远程方法调用,智能容错和负载均衡,以及服务自动注册和发现。

运行架构

dubbo运行架构如下图示:

image-20220113190738999

节点 角色说明
provider 暴露服务的服务提供方
consumer 调用远程服务的服务消费方
register 服务注册与发现的注册中心
monitor 统计服务的调用次数和调用时间的监控中心
container 服务运行容器

调用关系说明:

  1. 服务容器负责启动,加载,运行服务提供者。

  2. 服务提供者在启动时,向注册中心注册自己提供的服务。

  3. 服务消费者在启动时,向注册中心订阅自己所需的服务。

  4. 注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者。

  5. 服务消费者,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行调用,如果调用失败,再选另一台调用。

  6. 服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟

发送一次统计数据到监控中心。

关于dubbo 的特点分别有连通性、健壮性、伸缩性、以及向未来架构的升级性。特点的详细介绍也可以参考官方文档。

整体设计

image-20220113191202515

图例说明:

  • 图中左边淡蓝背景的为服务消费方使用的接口,右边淡绿色背景的为服务提供方使用的接口,位于中轴线上的为双方都用到的接口。
  • 图中从下至上分为十层,各层均为单向依赖,右边的黑色箭头代表层之间的依赖关系,每一层都可以剥离上层被复用,其中,Service 和Config 层为 API,其它各层均为 SPI。
  • 图中绿色小块的为扩展接口,蓝色小块为实现类,图中只显示用于关联各层的实现类。
  • 图中蓝色虚线为初始化过程,即启动时组装链,红色实线为方法调用过程,即运行时调时链,紫色三角箭头为继承,可以把子类看作父类的同一个节点,线上的文字为调用的方法。

各层说明:

  • config 配置层:对外配置接口,以 ServiceConfig , ReferenceConfig 为中心,可以直接初始化配置类,也可以通过spring 解析配置生成配置类
  • proxy 服务代理层:服务接口透明代理,生成服务的客户端 Stub 和服务器端 Skeleton, 以 ServiceProxy 为中心,扩展接口为ProxyFactory
  • registry 注册中心层:封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为 RegistryFactory , Registry , RegistryService
  • cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口为 Cluster , Directory , Router , LoadBalance
  • monitor 监控层:RPC 调用次数和调用时间监控,以 Statistics 为中心,扩展接口为 MonitorFactory , Monitor , MonitorService
  • protocol 远程调用层:封装 RPC 调用,以 Invocation , Result 为中心,扩展接口为 Protocol , Invoker , Exporter
  • exchange 信息交换层:封装请求响应模式,同步转异步,以Request , Response 为中心,扩展接口为 Exchanger , ExchangeChannel , ExchangeClient , ExchangeServer
  • transport 网络传输层:抽象 mina 和 netty 为统一接口,以Message 为中心,扩展接口为 Channel , Transporter , Client , Server , Codec
  • serialize 数据序列化层:可复用的一些工具,扩展接口为Serialization , ObjectInput , ObjectOutput , ThreadPool

环境搭建

接下来逐步对dubbo各个模块的源码以及原理进行解析,目前dubbo框架已经交由Apache基金会进行孵化,被在github开源。

Dubbo 社区目前主力维护的有 2.6.x 和 2.7.x 两大版本,其中,

  • 2.6.x 主要以 bugfix 和少量 enhancements 为主,因此能完全保证稳定性
  • 2.7.x 作为社区的主要开发版本,得到持续更新并增加了大量新 feature和优化,同时也带来了一些稳定性挑战

源码拉取

通过以下的这个命令签出最新的dubbo项目源码,并导入到IDEA中

1
git clone https://github.com/apache/dubbo.git dubbo

image-20220113191546644

可以看到Dubbo被拆分成很多的Maven项目,在后续课程中会介绍左边每个模块的大致作用。

下载源码导入工程后,进行编译,并跳过测试,这个过程中可能会遇到以下问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
[ERROR] ---------- 
[ERROR] 1) com.google.protobuf:protoc:exe:windows- x86_64:3.7.1
[ERROR]
[ERROR] Try downloading the file manually from the project website.
[ERROR]
[ERROR] Then, install it using the command:
[ERROR] mvn install:install-file - DgroupId=com.google.protobuf -DartifactId=protoc - Dversion=3.7.1 -Dclassifier=windows-x86_64 - Dpackaging=exe -Dfile=/path/to/file
[ERROR]
[ERROR] Alternatively, if you host your own repository you can deploy the file there:
[ERROR] mvn deploy:deploy-file - DgroupId=com.google.protobuf -DartifactId=protoc - Dversion=3.7.1 -Dclassifier=windows-x86_64 - Dpackaging=exe -Dfile=/path/to/file -Durl=[url] - DrepositoryId=[id]
[ERROR]
[ERROR] Path to dependency:
[ERROR] 1) org.apache.dubbo:dubbo-serialization- protobuf:jar:2.7.8

[ERROR] 2) com.google.protobuf:protoc:exe:windows- x86_64:3.7.1
[ERROR]
[ERROR] ----------
[ERROR] 1 required artifact is missing.
[ERROR]
[ERROR] for artifact:
[ERROR] org.apache.dubbo:dubbo-serialization- protobuf:jar:2.7.8
[ERROR]
[ERROR] from the specified remote repositories:
[ERROR] apache.snapshots (https://repository.apache.org/snapshots, releases=false, snapshots=true),
[ERROR] alimaven (http://maven.aliyun.com/nexus/content/groups/public/, releases=true, snapshots=false)

手动下载,按照提示的命令执行

1
mvn install:install-file -DgroupId=com.google.protobuf -DartifactId=protoc -Dversion=3.7.1 -Dclassifier=windows-x86_64 -Dpackaging=exe -Dfile=D:\tools\dubbo\protoc-3.7.1-windows-x86_64.exe

源码结构

通过如下图形可以大致的了解到,dubbo源码各个模块的相关作用:

image-20220113192339274

模块说明:

  • dubbo-common 公共逻辑模块:包括 Util 类和通用模型。
  • dubbo-remoting 远程通讯模块:相当于 Dubbo 协议的实现,如果RPC 用 RMI协议则不需要使用此包。
  • dubbo-rpc 远程调用模块:抽象各种协议,以及动态代理,只包含一对一的调用,不关心集群的管理。
  • dubbo-cluster 集群模块:将多个服务提供方伪装为一个提供方,包括:负载均衡, 容错,路由等,集群的地址列表可以是静态配置的,也可以是由注册中心下发。
  • dubbo-registry 注册中心模块:基于注册中心下发地址的集群方式,以及对各种注册中心的抽象。
  • dubbo-monitor 监控模块:统计服务调用次数,调用时间的,调用链跟踪的服务。
  • dubbo-config 配置模块:是 Dubbo 对外的 API,用户通过 Config 使 用Dubbo,隐藏 Dubbo 所有细节。
  • dubbo-container 容器模块:是一个 Standlone 的容器,以简单的Main 加载 Spring 启动,因为服务通常不需要 Tomcat/JBoss 等 Web容器的特性,没必要用 Web 容器去加载服务。

环境导入

在本次课程中,不仅讲解dubbo源码还会涉及到相关的基础知识,为了方便学员快速理解并掌握各个内容,已经准备好了相关工程,只需导入到IDEA中即可。对于工程中代码的具体作用,在后续课程会依次讲解

测试

(1) 安装zookeeper

(2) 修改官网案例,配置zookeeper地址

(3) 启动服务提供者,启动服务消费者

管理控制台

  1. 下载管理控制台, GITHUB地址

    照着github指引初始化环境https://github.com/apache/dubbo-admin/blob/develop/README_ZH.md

  2. 进入源码目录, 进行打包编译

1
mvn clean package -Dmaven.test.skip=true 

构建成功提示:

image-20220113192512514

如果构建过程中出现nodejs安装包下载错误, 可以将安装包直接放置maven仓库内(资料中已提供安装包)。

1
2
https://nodejs.org/dist/v9.11.1/node-v9.11.1-win-x86.zip
修改为:node-9.11.1-win-x86.zip

实在安装不了直接去网上下载一个jar包

还是安装失败?可能是admin-ui下的包失败了。可以先npm install安装admin-ui,然后在mvn安装 => 成功

  1. 启动后台管理服务
1
2
3
4
java -jar dubbo-admin-0.2.0-SNAPSHOT.jar 
也可以编写成bat文件
@echo off
java -jar dubbo-admin-0.4.0.jar

image-20220113192541261

  1. 管理后台

地址: http://127.0.0.1:8080/

默认账户名和密码都为root

image-20220113192605445

todo: TMD服了,这个管理后台总是起不来

=> tomcat部署dubbo-admin 修改tomcat端口,修改dubbo-admin配置的zookeeper地址,

启动后访问 http://localhost:8081/dubbo-admin-2.6.0/ ,输入配置的用户名密码 , 成功

Dubbo实战运用

Dubbo与SpringBoot的整合

基于Zookeeper实现Dubbo与Spring Boot的集成整合

工程POM依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
<properties>
<dubbo-version>2.7.8</dubbo-version>
<spring-boot.version>2.3.0.RELEASE</spring-boot.version>
</properties>
<dependencyManagement>
<dependencies>
<!--SpringBoot-->
<dependency>

<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--ApacheDubbo -->
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-dependencies-
bom
</artifactId>
<version>${dubbo-version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
<version>${dubbo-version}</version>
<exclusions>
<exclusion>

<groupId>org.springframework</groupId>
<artifactId>spring</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-
api
</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!--DubboSpringBootStarter-->
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-spring-boot-
starter
</artifactId>
<version>${dubbo-version}</version>
</dependency>
<!--Dubbo核心组件-->
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
</dependency>
<!--SpringBoot依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-
web
</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<!--Zookeeper客户端框架-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.1</version>
</dependency>
<!--Zookeeperdependencies-->
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-dependencies-
zookeeper
</artifactId>
<version>${dubbo-version}</version>
<type>pom</type>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

Dubbo采用2.7.8版本, Spring Boot采用的是2.3.0.RELEASE版本。

如果依赖下载出现问题, 可以指定具体的仓库:

1
2
3
4
5
6
7
8
9
10
11
12
13
<repositories>
<repository>
<id>apache.snapshots.https</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/s napshots</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

公用RPC接口工程

为便于客户端与服务端的RPC接口引用, 这里对RPC接口做统一封装

image-20220113193710677

定义了一个订单服务接口, 用于测试验证。

1
2
3
4
public interface OrderService {
/*** 获取订单详情 * @param orderId * @return */
String getOrder(Long orderId);
}

服务端工程

  1. 工程结构

image-20220113193830098

  1. POM依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<dependencies>
<!-- Dubbo Spring Boot Starter -->
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-spring-boot- starter</artifactId>
<version>${dubbo-version}</version>
</dependency>
<!-- Dubbo 核心依赖 -->
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
</dependency>
<!-- 公用RPC接口依赖 -->
<dependency>
<groupId>com.itheima</groupId>
<artifactId>dubbo-spring- interface</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
  1. RPC服务接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@DubboService(version = "${dubbo.spring.provider.version}")
public class OrderServiceImpl implements OrderService {
/*** 服务端口 */
@Value("${server.port}")
private String serverPort;
@Value("${dubbo.spring.provider.version}")
private String serviceVersion;

/*** 获取订单详情 * @param orderId * @return */
public String getOrder(Long orderId) {
String result = "get order detail ,orderId=" + orderId + ",serverPort=" + serverPort + ",serviceVersion=" + serviceVersion;
System.out.println(result);
return result;
}
}

通过DubboService注解, 声明为RPC服务,version可以标识具体的版本号, 消费端需匹配保持一致。

  1. 工程配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 服务端口
server:
port: 18081
# 应用程序名称
spring:
application:
name: dubbo-spring-provider


# dubbo相关配置
dubbo:
scan:
base-packages: com.itheima.service # 服务扫描路径
protocol:
name: dubbo # 通讯协议
port: 20880 # dubbo服务提供方的端口,该默认值 -1代表随机分配
dispatcher: "message"
threads: 300

registry:
# address: zookeeper://192.168.200.129:2181 # 注册中心地址
address: zookeeper://127.0.0.1:2181 # 注册中心地址
file: ${user.home}/dubbo-cache/${spring.application.name}/dubbo.cache
#password:
  1. Spring Boot启动程序
1
2
3
4
5
6
7
@SpringBootApplication
@ComponentScan(basePackages = {"com.itheima"})
public class DubboSpringProviderApplication {
public static void main(String[] args) {
SpringApplication.run(DubboSpringProviderApplicatio n.class, args);
}
}

消费端工程

  1. 工程结构

image-20220113194139740

  1. POM依赖:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
<dependencies>
<!--DubboSpringBootStarter-->
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>${dubbo-version}</version>
</dependency>
<!--公用RPC接口依赖-->
<dependency>
<groupId>com.itheima</groupId>
<artifactId>dubbo-spring-interface</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
  1. 消费端调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Controller
@RequestMapping("/order")
public class OrderController {
private final Logger logger = LoggerFactory.getLogger(getClass());
/*** 订单服务接口 */
@DubboReference(version = "${dubbo.spring.provider.version}")
private OrderService orderService;

/*** 获取订单详情接口 * @param orderId * @return */
@RequestMapping("/getOrder")
@ResponseBody
public String getOrder(Long orderId) {
String result = null;
try {
result = orderService.getOrder(orderId);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return result;
}
}
  1. 工程配置
1
2
3
4
5
6
7
8
#服务端口 
server.port=18084
#服务名称
spring.application.name=dubbo-spring-consumer
#服务版本号
dubbo.spring.provider.version = 1.0.0
#消费端注册器配置信息
dubbo.registry.address=zookeeper://127.0.0.1:2181 dubbo.registry.file = ${user.home}/dubbo- cache/${spring.application.name}/dubbo.cache
  1. Spring Boot启动程序
1
2
3
4
5
6
7
@SpringBootApplication
@ComponentScan(basePackages = {"com.itheima"})
public class DubboSpringConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(DubboSpringConsumerApplicatio n.class, args);
}
}

工程调用验证

  1. 启动ZK注册中心

  2. 启动服务端, 运行DubboSpringProviderApplication

  3. 启动消费端, 运行DubboSpringConsumerApplication

  4. 请求获取订单接口, 地址: http://127.0.0.1:18084/order/getOrder?orderId=1001

调用成功:

image-20220113194435745

Dubbo高阶配置运用

不同配置覆盖关系

  1. 覆盖规则:

image-20220113194524711

配置规则:

  • 方法级优先,接口级次之,全局配置再次之。
  • 如果级别一样,则消费方优先,提供方次之。
  1. 服务端超时设定

增加配置类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
public class DubboCustomConfig {
/**
* 服务端
* @return
*/
@Bean
public ProviderConfig registryConfig() {
ProviderConfig providerConfig = new ProviderConfig();
// 设定超时时间为1S
providerConfig.setTimeout(1000);
return providerConfig;
}
}

修改服务接口实现:

设定模拟睡眠时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 获取订单详情
*
* @return
* @paramorderId
*/
publicStringgetOrder(LongorderId) {
try {
//休眠1.5秒
Thread.sleep(1500L);
} catch (InterruptedExceptione) {
e.printStackTrace();
}
return "GetOrderDetail,Id:" + orderId + ",serverPort:" + serverPort;
}
  1. 客户端调用验证
  • 服务端全局超时设为2秒(不触发超时), 消费端设定超时时间为1秒(触发超时)。

修改调用代码:

1
2
3
4
5
/**
* 订单服务接口
*/
@DubboReference(version = "${dubbo.spring.provider.version}", timeout = 1000)
private OrderService orderService;

调用结果, 触发超时:

image-20220113194804084

表明消费端配置优先。

  • 服务端全局超时设定为1秒(触发超时), 消费端去掉超时时间配置。

image-20220113194839117

触发超时, 表明服务提供方优先级次之。

属性配置优先级

  1. 优先级规则

image-20220113194904213

优先级从高到低:

  • JVM -D 参数;
1
-Ddubbo.protocol.port=20881
  • XML(application.yml/application.properties)配置会重写dubbo.properties 中的,一般配置项目特有的
1
2
3
4
5
dubbo:
protocol:
name: dubbo # 通讯协议
#port: 20880 # dubbo服务提供方的端口,该值 是默认值
port: 20882

Properties默认配置(dubbo.properties),仅仅作用于以上两者没有配置时,一般配置全局公共配置

1
dubbo.protocol.port=20883
  1. JVM参数优先级验证
  • application.properties里面配置了dubbo.protocol.port端口10888
  • JVM运行参数端口设定为-Ddubbo.protocol.port=10889

image-20220113195048586

  • 启动服务

服务启动完成, 可以看到端口优先以JVM参数为准

image-20220113195107993

  1. Properties配置文件验证
  • 注释掉application.properties里面配置的dubbo.protocol.name和dubbo.protocol.port配置
  • dubbo.properties里面配置dubbo.protocol.name和dubbo.protocol.port
  • 在启动参数里, 添加-Ddubbo.properties.file=dubbo.properties

image-20220113195232142

  • 启动服务

查看dubbo.properties配置的端口, 可以看到正常生效:

1
2
3
4
5
C:\Users\hxx68>netstat -ano |findstr 30889
TCP 0.0.0.0:30889 0.0.0.0:0
LISTENING 49816
TCP [::]:30889 [::]:0
LISTENING 49816

重试与容错处理机制

  1. 容错机制:
  • Failfast Cluster
    • 快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。
  • Failsafe Cluster
    • 失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。
  • Failback Cluster
    • 失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。
  • Forking Cluster
    • 并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过forks=”2” 来设置最大并行数。
  • Broadcast Cluster
    • 广播调用所有提供者,逐个调用,任意一台报错则报错。通常用于通知所有提供者更新缓存或日志等本地资源信息。

后面章节还会对其原理做详细讲解。

  1. 调整客户端重试次数
1
2
3
4
5
/**
* 订单服务接口
*/
@DubboReference(version = "${dubbo.spring.provider.version}", retries = 3)
private OrderService orderService;

这里的重试次数设定为3次。

  1. 修改服务端超时时间

模拟超时, 让客户端触发重试。

1
2
3
4
5
6
7
8
9
10
11
/**
* 服务端全局配置
*
* @return
*/
@Bean
public ProviderConfig registryConfig() {
ProviderConfig providerConfig = new ProviderConfig();
providerConfig.setTimeout(1000);
return providerConfig;
}

将超时时间设小一些为1秒。

  1. 客户端调用(单个服务)

http://127.0.0.1:18084/order/getOrder?orderId=123

查看服务端控制台,可以看到触发重试机制:

image-20220113195628056

加上第一次的调用和3次重试, 共4次。

  1. 客户端调用(多个,涉及到负载均衡机制,后面再测试)

允许多个实例运行, 开启多个服务

访问接口, http://127.0.0.1:18084/order/getOrder?orderId=123

image-20220113195644641

第一个服务实例,访问两次, 其他每个服务访问一次。

多版本控制

  1. 启动三个服务端

第一个服务端版本为1.0.0, 第二、三个服务端版本分别为:2.0.0 和3.0.0

主要是修改application.properties配置:

1
2
#服务版本号 
dubbo.spring.provider.version = 2.0.0
  1. 启动三个服务提供者,通过 -Ddubbo.spring.provider.version = 2.0.0 -Dserver.port=18082 来启动三个

相关的端口不能重复

  1. 消费端指定版本号

同样修改application.properties配置:

1
2
#服务版本号 
dubbo.spring.provider.version = 2.0.0

仍然通过 -Ddubbo.spring.provider.version = 2.0.0 来调用不同版本的服务

测试时,注释掉超时的代码

  1. 仍是采用超时配置, 通过重试测试验证

测试验证结果:

请求只会访问至版本号为2.0.0的服务节点上面。

本地存根调用

  1. 实现流程

image-20220113195819897

把 Stub 暴露给用户,Stub 可以决定要不要去调 Proxy。

  1. 客户端存根实现:

增加service接口

1
2
3
4
5
6
7
public class OrderServiceStub implements OrderService {
private final OrderService orderService;

// 构造函数传入真正的远程代理对象
public OrderServiceStub(OrderService orderService) {
this.orderService = orderService;
}
  1. 修改客户端调用配置
1
2
3
/*** 订单服务接口 */
@DubboReference(version = "${dubbo.spring.provider.version}", retries = 3, stub = "com.itheima.dubbo.spring.consumer.service.OrderServiceStub")
private OrderService orderService;

stub要配置存根接口的完整路径。

  1. 测试调用

访问不带参数的地址: http://127.0.0.1:18084/order/getOrder

image-20220113195957524

会进入存根接口的处理逻辑, 提示校验异常。

负载均衡机制

  1. 默认负载策略

Dubbo默认采用的是随机负载策略。

开启三个服务节点,通过消费端访问验证: http://127.0.0.1:18084/order/getOrder?orderId=123

通过控制后台日志输出, 可以看到每个服务节点呈现不规则的调用。

  1. Dubbo 支持的负载均衡策略,可用参看源码: AbstractLoadBalance
  • Random LoadBalance:默认随机,按权重设置随机概率。在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。

  • RoundRobin LoadBalance : 加权轮询负载均衡,按公约后的权重设置轮询比率。存在慢的提供者累积请求的问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。

  • LeastActive LoadBalance : 最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。活跃数其实就是在当前这个服务调用者中当前这个时刻某个invoker(某个服务提供者的某个接口)某个方法的调用并发数,在调用之前+1 调用之后-1的一个计数器,如果出现多个活跃数相等invoker的时候使用随机算法来选取一个

  • ConsistentHash LoadBalance: 一致性 Hash,相同参数的请求总是发到同一提供者。当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。

    一致性Hash负载均衡涉及到两个主要的配置参数为hash.argumentshash.nodes

    • hash.arguments : 当进行调用时候根据调用方法的哪几个参数生成key,并根据key来通过一致性hash算法来选择调用结点

    • hash.nodes: 为结点的副本数

  • ShortestResponseLoadBalance(2.7.7 +新增):最短响应时间负载均衡

从多个服务提供者中选择出调用成功的且响应时间最短的服务提供者,由于满足这样条件的服务提供者有可能有多个。所以当选择出多个服务提供者后要根据他们的权重做分析,如果权重一样,则随机

源码实现:org.apache.dubbo.rpc.cluster.loadbalance.AbstractLoadBalance

Dubbo提供了四种实现:

image-20220113200217674

  1. 四种配置方式

优先级从下至上:

  • 服务端的服务级别:
1
<dubbo:service interface="..." loadbalance="roundrobin" />	
  • 客户端的服务级别:
1
2
<dubbo:reference interface="..." loadbalance="roundrobin" />
# 注意:服务提供者配置后最终也只是同步到消费者端。故一般在消费端配置
  • 服务端方法级别:
1
2
3
<dubbo:service interface="..."> 
<dubbo:method name="..." loadbalance="roundrobin"/>
</dubbo:service>
  • 客户端方法级别:
1
2
3
<dubbo:reference interface="...">
<dubbo:method name="..." loadbalance="roundrobin"/>
</dubbo:reference>

当然这里还有全局配置(略)

  1. 调用验证

修改客户端的负载策略, 改为轮询策略:注意很多配置都有三个级别,针对方法的,针对接口的,全局的。

1
@DubboReference(version = "${dubbo.spring.provider.version}",retries = 3, loadbalance = "roundrobin", stub = "com.itheima.dubbo.spring.consumer.service.OrderServ iceStub")

开启三个服务节点, 进行访问验证: http://127.0.0.1:18084/order/getOrder?orderId=123会依次轮询进行调用。

注意:测试时将服务提供者的版本号都调成一致(1.0.0),超时配置注释掉!

  1. 动态权重调整验证

管理后台地址: http://127.0.0.1:8080/#

通过管理后台修改服务的权重配置:

image-20220113200506566

将两台节点的权重降低至20:

image-20220113200526919

调整后可以看到权重配置已经生效:

image-20220113200538612

通过消费者接口访问, 会发现第一台权重较大的节点, 访问次数会明显增多

服务降级运用

  1. 服务动态禁用

启动单个服务节点,进入Dubbo Admin, 创建动态配置规则:

1
2
3
4
5
6
7
8
9
configVersion: v2.7 
enabled: true
configs:
- side: provider
addresses:
- '0.0.0.0:20880'
parameters:
timeout: 3000
disabled: true

将disabled属性设为true, 服务禁用, 可以错误提示:

image-20220113200730243

将disabled属性改为false,

1
2
3
4
5
6
7
8
9
configVersion: v2.7 
enabled: true
configs:
- side: provider
addresses:
- '0.0.0.0:20880'
parameters:
timeout: 3000
disabled: false

恢复正常访问:

image-20220113200821559

  1. 服务降级
  • 配置规则
1
2
3
RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFa ctory.class).getAdaptiveExtension(); 
Registry registry = registryFactory.getRegistry(URL.valueOf("zook eeper://10.20.153.10:2181"));
registry.register(URL.valueOf("override://0.0 .0.0/com.foo.BarService? category=configurators&dynamic=false&applicat ion=foo&mock=force:return+null"));

mock=force:return+null 表示消费方对该服务的方法调用都直接返回 null 值,不发起远程调用。用来屏蔽不重要服务不可用时对调用方的影响。

还可以改为 mock=fail:return+null 表示消费方对该服务的方法调用在失败后,再返回 null值,不抛异常。用来容忍不重要服务不稳定时对调用方的影响。

  • 降级测试(force方式)

进入Dubbo Admin进行配置:

1
2
3
4
5
6
7
8
9
configVersion: v2.7 
enabled: true
configs:
- side: consumer
addresses:
- '0.0.0.0'
parameters:
timeout: 3000
mock: 'force:retrun null'

客户端调用, 会直接屏蔽, 并且服务端控制台不会有任何调用记录:

image-20220113200959511

  • 降级测试(fail方式)

进入Dubbo Admin配置:

1
2
3
4
5
6
7
8
9
configVersion: v2.7 
enabled: true
configs:
- side: consumer
addresses:
- '0.0.0.0'
parameters:
timeout: 1000
mock: 'fail:retrun null'

这里为了触发调用异常, 超时时间缩短为1秒。

注意这里的超时时间可能不会起作用,最终的超时时间还是得看项目中配置,故在服务提供方将线程休眠时间延长,造成调用超时。

客户端调用, 会出现降级显示为空:

image-20220113201033308

同时服务端会有调用记录显示(请求会进入服务端,但由于超时, 调用是失败):

image-20220113201044237

并发与连接控制

实际运用, 会碰到高并发与峰值场景, Dubbo是可以做到并发与连接数控制。

可使用jmeter进行测试!

并发数控制
  1. 服务端控制
  • 服务级别
1
<dubbo:service interface="com.foo.BarService" executes="10" />

服务器端并发执行(或占用线程池线程数)不能超过 10 个。

  • 方法级别
1
2
3
<dubbo:service interface="com.foo.BarService"> 
<dubbo:method name="sayHello" executes="3" />
</dubbo:service>

限制具体的方法,服务器端并发执行(或占用线程池线程数)不能超过3 个。

  1. 客户端控制
  • 调用的服务控制
1
<dubbo:reference interface="com.foo.BarService" actives="10" />

每客户端并发执行(或占用连接的请求数)不能超过 10 个。

  • 调用的服务方法控制
1
2
3
<dubbo:reference interface="com.foo.BarService">
<dubbo:method name="sayHello" actives="10" />
</dubbo:service>

dubbo:reference比dubbo:service优先。

  1. 客户端负载配置
1
<dubbo:reference interface="com.foo.BarService" loadbalance="leastactive" />

负载策略为最小连接数时, Loadbalance 会调用并发数最小的Provider。

连接数控制
  1. 服务端连接控制
1
<dubbo:provider protocol="dubbo" accepts="10" />

限制服务器端接受的连接不能超过 10 个

  1. 客户端连接控制
1
<dubbo:reference interface="com.foo.BarService" connections="10" />

限制客户端服务使用连接不能超过 10 个

如果 dubbo:service 和 dubbo:reference 都配了 connections,dubbo:reference 优先

Dubbo SPI机制

在 Dubbo 中,SPI 是一个非常重要的模块。基于 SPI,我们可以很容易的对 Dubbo 进行拓展。如果大家想要学习 Dubbo 的源码,SPI 机制务必弄懂。接下来,我们先来了解一下 Java SPI 与 Dubbo SPI 的用法,然后再来分析Dubbo SPI 的源码。

spi的概述

SPI的主要作用

SPI 全称为 Service Provider Interface,是一种服务发现机制。SPI 的本质是将接口实现类的全限定名配置在文件中,并由服务加载器读取配置文件,加载实现类。这样可以在运行时,动态为接口替换实现类。正因此特性,我们可以很容易的通过 SPI 机制为我们的程序提供拓展功能

在面向的对象的设计里,不同模块之间推崇面向接口编程,不建议在模块中对实现类进行硬编码。一旦代码里涉及具体的实现类,就违反了可拔插的原则,如果需要替换一种实现,就需要修改代码。SPI使得程序能在ClassPath路径下的META-INF/services文件夹查找接口的实现类,自动加载文件里所定义的实现类。

场景

数据库驱动java.sql.Driver,Mysql,Oracle都有自己的驱动实现jar包,只需引入不同的jar包即可加载不同的数据库驱动,例如mysql-connector-java中,其META-INF/services下制定了mysql驱动的实现类

image-20220117202641866

java中DriverManager.getConnection,实际上实现类都是各大数据库厂商提供的,比如mysql-conector-java.

然后想让java发现你的驱动类,java spi可以在META-INF下查找 以接口为命名的 配置实现类的 文件

这就是为什么java中只有Driver接口,但是引入驱动包后就可以正常调用数据库了

image-20220117202647009

Java SPI 实际上是“基于接口的编程+策略模式+配置文件”组合实现的动态加载机制。

入门案例

首先,我们定义一个接口,名称为 Robot。

1
2
3
public interface Robot { 
void sayHello();
}

接下来定义两个实现类,分别为 OptimusPrime 和 Bumblebee。

1
2
3
4
5
6
7
8
9
10
11
12
public class OptimusPrime implements Robot { 
@Override
public void sayHello() {
System.out.println("Hello, I am Optimus Prime.");
}
}
public class Bumblebee implements Robot {
@Override
public void sayHello() {
System.out.println("Hello, I am Bumblebee.");
}
}

接下来 META-INF/services 文件夹下创建一个文件,名称为 Robot 的全限定名 com.itheima.java.spi.Robot。文件内容为实现类的全限定的类名,如下:

1
2
com.itheima.java.spi.impl.Bumblebee 
com.itheima.java.spi.impl.OptimusPrime

做好所需的准备工作,接下来编写代码进行测试

1
2
3
4
5
6
7
8
public class JavaSPITest {
@Test
public void sayHello() throws Exception {
ServiceLoader<Robot> serviceLoader = ServiceLoader.load(Robot.class);
System.out.println("Java SPI");
serviceLoader.forEach(Robot::sayHello);
}
}

最后来看一下测试结果,如下:

image-20220117202900601

从测试结果可以看出,我们的两个实现类被成功的加载,并输出了相应的内容。

总结

调用过程

  • 应用程序调用ServiceLoader.load方法,创建一个新的ServiceLoader,并实例化该类中的成员变量
  • 应用程序通过迭代器接口获取对象实例,ServiceLoader先判断成员变量providers对象中(LinkedHashMap<String,S>类型)是否有缓存实例对象,如果有缓存,直接返回。如果没有缓存,执行类的装载,

优点

  • 使用 Java SPI 机制的优势是实现解耦,使得接口的定义与具体业务实现分离,而不是耦合在一起。应用进程可以根据实际业务情况启用或替换具体组件。

缺点

  • 不能按需加载。虽然 ServiceLoader 做了延迟载入,但是基本只能通过遍历全部获取,也就是接口的实现类得全部载入并实例化一遍。如果你并不想用某些实现类,或者某些类实例化很耗时,它也被载入并实例化了,这就造成了浪费。
  • 获取某个实现类的方式不够灵活,只能通过 Iterator 形式获取,不能根据某个参数来获取对应的实现类。
  • 多个并发多线程使用 ServiceLoader 类的实例是不安全的。
  • 加载不到实现类时抛出并不是真正原因的异常,错误很难定位。

Dubbo 中的spi

概述

Dubbo 并未使用 Java SPI,而是重新实现了一套功能更强的 SPI 机制。Dubbo SPI 的相关逻辑被封装在了 ExtensionLoader 类中,通过ExtensionLoader,我们可以加载指定的实现类。

入门案例

与 Java SPI 实现类配置不同,Dubbo SPI 是通过键值对的方式进行配置,这样我们可以按需加载指定的实现类。下面来演示 Dubbo SPI 的用法:

1、在使用Dubbo SPI 时,需要在接口上标注 @SPI 注解

1
2
3
4
@SPI
public interface Robot {
void sayHello();
}

2、Dubbo SPI 所需的配置文件需放置在 META-INF/dubbo 路径下,与Java SPI 实现类配置不同,Dubbo SPI 是通过键值对的方式进行配置(key自己指定,value为实现类的全路径),配置内容如下。

注意:每一对key=value,我们把它称之为dubbo的扩展点

1
2
optimusPrime = org.apache.spi.OptimusPrime 
bumblebee = org.apache.spi.Bumblebee

3、通过 ExtensionLoader,我们可以加载指定的实现类,下面来演示Dubbo SPI :

1
2
3
4
5
6
7
8
9
10
11
12
public class DubboSPITest {
@Test
public void sayHello() throws Exception {
//1、获得接口的ExtentionLoader
ExtensionLoader<Robot> extensionLoader = ExtensionLoader.getExtensionLoader(Robot.class);
//2、根据指定的名字获(key)取对应的实例
Robot optimusPrime = extensionLoader.getExtension("optimusPrime");
optimusPrime.sayHello();
Robot bumblebee = extensionLoader.getExtension("bumblebee");
bumblebee.sayHello();
}
}

这样就可以通过kv结构按需获取了,而这里的按需是可以通过配置文件来调整的!就相当灵活而且不用加载所有类了。

测试结果如下:

image-20220117203137972

Dubbo SPI优点:

  1. 能够实现按需加载,JDK SPI仅仅通过接口类名获取所有实现,在通过迭代器获取指定实现,而ExtensionLoader则通过接口类名和key值获取一个实现

  2. 支持AOP(将实现类包装在Wrapper中,Wrapper中实现公共增强逻辑)

  3. 支持IOC(能够通过set方法注入其他扩展点)

IOC 和 AOP 等特性,这些特性将会在接下来的源码分析章节中一一进行介绍。

源码分析

上一章简单演示了 Dubbo SPI 的使用方法,首先通过 ExtensionLoader 的getExtensionLoader 方法获取一个ExtensionLoader 实例,然后再通过ExtensionLoader 的 getExtension 方法获取拓展类对象。下面我们从ExtensionLoader 的 getExtension 方法作为入口,对拓展类对象的获取过程进行详细的分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public T getExtension(String name) {
if (StringUtils.isEmpty(name)) {
throw new IllegalArgumentException("Extension name == null");
}
if ("true".equals(name)) {
// 获取默认的拓展实现类
return getDefaultExtension();
}
// Holder,顾名思义,用于持有目标对象
Holder<Object> holder = getOrCreateHolder(name);
Object instance = holder.get();
// 双重检查
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
// 创建拓展实例
instance = createExtension(name);
// 设置实例到
holder 中 holder.set(instance);
}
}
} return (T) instance;
}

上面代码的逻辑比较简单,首先检查缓存,缓存未命中则创建拓展对象。下面我们来看一下创建拓展对象的过程是怎样的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private T createExtension(String name) {
// 从配置文件中加载所有的拓展类,可得到“配置项名称”到“配置 类”的映射关系表
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
// 通过反射创建实例
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
// 向实例中注入依赖
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
// 循环创建 Wrapper 实例
for (Class<?> wrapperClass : wrapperClasses) {
// 将当前 instance 作为参数传给 Wrapper 的构 造方法,并通过反射创建 Wrapper 实例。
// 然后向 Wrapper 实例中注入依赖,最后将 Wrapper 实例再次赋值给 instance 变量
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("...");
}
}

createExtension 方法的逻辑稍复杂一下,包含了如下的步骤:

  1. 通过 getExtensionClasses 获取所有的拓展类

  2. 通过反射创建拓展对象

  3. 向拓展对象中注入依赖

  4. 将拓展对象包裹在相应的 Wrapper 对象中

以上步骤中,第一个步骤是加载拓展类的关键,第三和第四个步骤是Dubbo IOC 与 AOP 的具体实现。由于此类设计源码较多,这里简单的总结下ExtensionLoader整个执行逻辑:

1
2
3
4
5
6
7
8
getExtension(String name) #根据key获取拓展对象
-->createExtension(String name) #创建拓展实例
-->getExtensionClasses #根据路径获取所有的拓展类
-->loadExtensionClasses #加载拓展类
-->cacheDefaultExtensionName #解析@SPI注解
-->loadDirectory #方法加载指定文件夹配置文件
->loadResource #加载资源
-->loadClass #加载类,并通过 loadClass 方法对类进行缓存

spi中的ioc 和 aop

依赖注入

Dubbo IOC 是通过 setter 方法注入依赖。Dubbo 首先会通过反射获取到实例的所有方法,然后再遍历方法列表,检测方法名是否具有 setter 方法特征。若有,则通过 ObjectFactory 获取依赖对象,最后通过反射调用 setter 方法将依赖设置到目标对象中。整个过程对应的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private T injectExtension(T instance) {
try {
if (objectFactory != null) {
// 遍历目标类的所有方法
for (Method method : instance.getClass().getMethods()) {
// 检测方法是否以 set 开头,且方法仅有一个参 数,且方法访问级别为public
if (method.getName().startsWith("set") && method.getParameterTypes().length == 1 && Modifier.isPublic(method.getModifiers())) {
// 获取 setter 方法参数类型
Class<?> pt = method.getParameterTypes()[0];
try {
// 获取属性名,比如 setName 方法对应属性名 name
String property = getSetterProperty(method);
/* getSetterProperty : method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : ""; */
// 从 ObjectFactory 中获取依赖对象
Object object = objectFactory.getExtension(pt, property);
if (object != null) {
// 通过反射调用 setter 方法设置依赖
method.invoke(instance, object);
}
} catch (Exception e) {
logger.error("fail to inject via method...");
}
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} return instance;
}

在上面代码中,objectFactory 变量的类型为 AdaptiveExtensionFactory,AdaptiveExtensionFactory 内部维护了一个 ExtensionFactory 列表,用于存储其他类型的 ExtensionFactory。Dubbo 目前提供了两种 ExtensionFactory,分别是 SpiExtensionFactory 和 SpringExtensionFactory。前者用于创建自适应的拓展,后者是用于从 Spring 的 IOC 容器中获取所需的拓展。这两个类的类的代码不是很复杂,这里就不一一分析了。

Dubbo IOC 目前仅支持 setter 方式注入,总的来说,逻辑比较简单易懂。

动态编译

在用Spring的时候,我们经常会用到AOP功能。在目标类的方法前后插入其他逻辑。比如通常使用Spring AOP来实现日志,监控和鉴权等功能。 Dubbo的扩展机制,是否也支持类似的功能呢?答案是yes。在Dubbo中,有一种特殊的类,被称为Wrapper类。通过装饰者模式,使用包装类包装原始的扩展点实例。在原始扩展点实现前后插入其他逻辑,实现AOP功能。

装饰者模式

装饰者模式:在不改变原类文件以及不使用继承的情况下,动态地将责任附加到对象上,从而实现动态拓展一个对象的功能。它是通过创建一个包装对象,也就是装饰来包裹真实的对象。

image-20220117203847829

一般来说装饰者模式有下面几个参与者:

  • Component:装饰者和被装饰者共同的父类,是一个接口或者抽象类,用来定义基本行为
  • ConcreteComponent:定义具体对象,即被装饰者
  • Decorator:抽象装饰者,继承自Component,从外类来扩展
  • ConcreteComponent。对于ConcreteComponent来说,不需要知道
  • Decorator的存在,Decorator是一个接口或抽象类
  • ConcreteDecorator:具体装饰者,用于扩展ConcreteComponent

注:装饰者和被装饰者对象有相同的超类型,因为装饰者和被装饰者必须是一样的类型,这里利用继承是为了达到类型匹配,而不是利用继承获得行为。

dubbo中的AOP

Dubbo AOP 是通过装饰者模式完成的,接下来通过一个简单的案例来学习dubbo中AOP的实现方式。

首先定义一个接口

1
2
3
4
5
6
7
package com.itheima.dubbo;
import org.apache.dubbo.common.extension.SPI;

@SPI
public interface Phone {
void call();
}

定义接口的实现类,也就是被装饰者

1
2
3
4
5
6
7
8
package com.itheima.dubbo;

public class IphoneX implements Phone {
@Override
public void call() {
System.out.println("iphone正在拨打电话");
}
}

为了简单,这里省略了装饰者接口。仅仅定义一个装饰者,实现phone接口,内部配置增强逻辑方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.itheima.dubbo;

public class MusicPhone implements Phone {
private Phone phone;

public MusicPhone(Phone phone) {
this.phone = phone;
}

@Override
public void call() {
System.out.println("播放彩铃");
this.phone.call();
}
}

添加拓展点配置文件META-INF/dubbo/com.itheima.dubbo.Phone,内容如下

1
2
iphone=com.itheima.dubbo.IphoneX 
wrapper=com.itheima.dubbo.MusicPhone

配置测试方法

1
2
3
4
5
public static void main(String[] args) {
ExtensionLoader<Phone> extensionLoader = ExtensionLoader.getExtensionLoader(Phone.class);
Phone phone = extensionLoader.getExtension("iphone");
phone.call();
}

具体执行效果如下

image-20220117204049952

先调用装饰者增强,再调用目标方法完成业务逻辑。

通过测试案例,可以看到在Dubbo SPI中具有增强AOP的功能,我们只需要关注dubbo源码中这样一行代码就够了

1
2
3
4
5
6
//检查是否具有装饰者类,如果有调用装饰者类的构造方法,并返回实例对 象
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}

动态编译

SPI中的自适应

我们知道在 Dubbo 中,很多拓展都是通过 SPI 机制 进行加载的,比如Protocol、Cluster、LoadBalance、ProxyFactory 等。有时,有些拓展并不想在框架启动阶段被加载,而是希望在拓展方法被调用时,根据运行时参数进行加载,即根据参数动态加载实现类。如下所示:

根据参数动态选择的意思是:

通过ExtensionLoader.getExtensionLoader(XXXClass).getExtension(key)的形式来获取接口的某个实现类。

​ 但这种形式本质上还是通过硬编码的形式在代码中固定的获取了接口的一个实现,诸如Protocol(实现有Dubbo、Redis、Thrift等),或者Transporter(实现有Netty、Mina等)这些接口,我们是可以在Dubbo服务声明时指定具体实现的

image-20220117204219021

这种在运行时,根据方法参数才动态决定使用具体的拓展,在dubbo中就叫做扩展点自适应实例。其实是一个扩展点的代理,将扩展的选择从Dubbo启动时,延迟到RPC调用时。Dubbo中每一个扩展点都有一个自适应类,如果没有显式提供,Dubbo会自动为我们创建一个,默认使用Javaassist。

自适应拓展机制的实现逻辑是这样的

  1. 首先 Dubbo 会为拓展接口生成具有代理功能的代码;

  2. 通过 javassist 或 jdk 编译这段代码,得到 Class 类;

  3. 通过反射创建代理类;

  4. 在代理类中,通过URL对象的参数来确定到底调用哪个实现类;

javassist入门

Javassist是一个开源的分析、编辑和创建Java字节码的类库。是由东京工业大学的数学和计算机科学系的 Shigeru Chiba (千叶滋)所创建的。它已加入了开放源代码JBoss 应用服务器项目,通过使用Javassist对字节码操作为JBoss实现动态AOP框架。javassist是jboss的一个子项目,其主要的优点,在于简单,而且快速。直接使用java编码的形式,而不需要了解虚拟机指令,就能动态改变类的结构,或者动态生成类。为了方便更好的理解dubbo中的自适应,这里通过案例的形式来熟悉下Javassist的基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.itheima.compiler;
import java.io.File;
import java.io.FileOutputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.Modifier;
import javassist.ClassPool;
import javassist.CtClass;
import javassist.CtConstructor;
import javassist.CtField;
import javassist.CtMethod;
import javassist.CtNewMethod;

/*** Javassist是一个开源的分析、编辑和创建Java字节码的类库 * 能动态改变类的结构,或者动态生成类 */
public class CompilerByJavassist {
public static void main(String[] args) throws Exception {
// ClassPool:class对象容器
ClassPool pool = ClassPool.getDefault();
// 通过ClassPool生成一个User类
CtClass ctClass = pool.makeClass("com.itheima.domain.User");
// 添加属性 -- private String username
CtField enameField = new CtField(pool.getCtClass("java.lang.String"), "username", ctClass);
enameField.setModifiers(Modifier.PRIVATE);
ctClass.addField(enameField);
// 添加属性 -- private int age
CtField enoField = new CtField(pool.getCtClass("int"), "age", ctClass);
enoField.setModifiers(Modifier.PRIVATE);
ctClass.addField(enoField);

//添加方法
ctClass.addMethod(CtNewMethod.getter("getUsername", enameField));
ctClass.addMethod(CtNewMethod.setter("setUsername", enameField));
ctClass.addMethod(CtNewMethod.getter("getAge", enoField));
ctClass.addMethod(CtNewMethod.setter("setAge", enoField));

// 无参构造器
CtConstructor constructor = new CtConstructor(null, ctClass);
constructor.setBody("{}");
ctClass.addConstructor(constructor);

// 添加构造函数

//ctClass.addConstructor(new CtConstructor(new CtClass[] {}, ctClass));
CtConstructor ctConstructor = new CtConstructor(new CtClass[]{pool.get(String.class.getName()), CtClass.intType}, ctClass);
ctConstructor.setBody("{\n this.username=$1; \n this.age=$2;\n}");
ctClass.addConstructor(ctConstructor);

// 添加自定义方法
CtMethod ctMethod = new CtMethod(CtClass.voidType, "printUser", new CtClass[]{}, ctClass);

// 为自定义方法设置修饰符
ctMethod.setModifiers(Modifier.PUBLIC);

// 为自定义方法设置函数体
StringBuffer buffer2 = new StringBuffer();
buffer2.append("{\nSystem.out.println(\"用户信息如 下\");\n").append("System.out.println(\"用户名 =\"+username);\n").append("System.out.println(\"年龄 =\"+age);\n").append("}");
ctMethod.setBody(buffer2.toString());
ctClass.addMethod(ctMethod);
//生成一个class
Class<?> clazz = ctClass.toClass();
Constructor cons2 = clazz.getDeclaredConstructor(String.class, Integer.TYPE);
Object obj = cons2.newInstance("itheima", 20);
//反射 执行方法
obj.getClass().getMethod("printUser", new Class[]{}).invoke(obj, new Object[]{});
// 把生成的class文件写入文件
byte[] byteArr = ctClass.toBytecode();
FileOutputStream fos = new FileOutputStream(new File("C://User.class"));
fos.write(byteArr);
fos.close();
}
}

通过以上代码,我们可以知道使用javassist可以方便的在运行时,按需动态的创建java对象,并执行内部方法。而这也是dubbo中动态编译的核心

源码分析

Adaptive注解

在开始之前,我们有必要先看一下与自适应拓展息息相关的一个注解,即Adaptive 注解。

1
2
3
4
5
6
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Adaptive {
String[] value() default {};
}

从上面的代码中可知,Adaptive 可注解在类或方法上。

  • 标注在类上:Dubbo 不会为该类生成代理类。Adaptive 注解在类上的情况很少,在 Dubbo 中,仅有两个类被 Adaptive 注解了,分别是AdaptiveCompiler 和 AdaptiveExtensionFactory。此种情况,表示拓展的加载逻辑由人工编码完成。更多时候,Adaptive 是注解在接口方法上的,表示拓展的加载逻辑需由框架自动生成
  • 标注在方法上:Dubbo 则会为该方法生成代理逻辑,表示当前方法需要根据 参数URL 调用对应的扩展点实现。例如 Protocol的SPI类有injvm dubbo registry filter listener等等 很多扩展未知类,它设计了Protocol$Adaptive的类,通过ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(spi类);来提取对象

获取自适应拓展类

dubbo中每一个扩展点都有一个自适应类,如果没有显式提供,Dubbo会自动为我们创建一个,默认使用Javaassist。 先来看下创建自适应扩展类的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 精简代码如下
public T getAdaptiveExtension() {
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
instance = createAdaptiveExtension();
cachedAdaptiveInstance.set(instance);
}
}
}
return (T) instance;
}

继续看createAdaptiveExtension方法

1
2
3
private T createAdaptiveExtension() {
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
}

继续看getAdaptiveExtensionClass方法

1
2
3
4
5
6
7
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}

继续看createAdaptiveExtensionClass方法,绕了一大圈,终于来到了具体的实现了。看这个createAdaptiveExtensionClass方法,它首先会生成自适应类的Java源码,然后再将源码编译成Java的字节码,加载到JVM中。

1
2
3
4
5
6
7
private Class<?> createAdaptiveExtensionClass() {
String code = createAdaptiveExtensionClassCode();
ClassLoader classLoader = findClassLoader();
org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}

Compiler的代码,默认实现是javassist。

1
2
3
4
@SPI("javassist")
public interface Compiler {
Class<?> compile(String code, ClassLoader classLoader);
}

createAdaptiveExtensionClassCode()方法中使用一个StringBuilder来构建自适应类的Java源码。方法实现比较长,这里就不贴代码了。这种生成字节码的方式也挺有意思的,先生成Java源代码,然后编译,加载到jvm中。通过这种方式,可以更好的控制生成的Java类。而且这样也不用care各个字节码生成框架的api等。因为xxx.java文件是Java通用的,也是我们最熟悉的。只是代码的可读性不强,需要一点一点构建xx.java的内容。

示例:以 Protocol 接口为例,Protocol接口定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@SPI("dubbo")
public interface Protocol {
int getDefaultPort();

@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

void destroy();

default List<ProtocolServer> getServers() {
return Collections.emptyList();
}
}

下面给大家展示一下生成的 Protocol$Adaptive 的源码,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package org.apache.dubbo.rpc;
import org.apache.dubbo.common.extension.ExtensionLoader;

public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URLurl = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Failedtogetextension(org.apache.dubbo.rpc.Protocol) namefromurl(" + url.toString() + ")usekeys([protocol])");
org.apache.dubbo.rpc.Protocolextension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}

public java.util.List getServers() {
throw new UnsupportedOperationException("The methodpublicdefaultjava.util.List org.apache.dubbo.rpc.Protocol.getServers()ofinterface org.apache.dubbo.rpc.Protocolisnotadaptivemethod !");
}

public org.apache.dubbo.rpc.Exporter
export(org.apache.dubbo.rpc.Invokerarg0) throws
org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argumentgetUrl() == null");
org.apache.dubbo.common.URLurl = arg0.getUrl();
String extName = (url.getProtocol() == null ?
"dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Failedtogetextension(org.apache.dubbo.rpc.Protocol) namefromurl(" + url.toString() + ")usekeys([protocol])");
org.apache.dubbo.rpc.Protocolextension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}

public void destroy() {
throw new UnsupportedOperationException("The methodpublicabstractvoid org.apache.dubbo.rpc.Protocol.destroy()ofinterface org.apache.dubbo.rpc.Protocolisnotadaptivemethod !");
}

public int getDefaultPort() {
throw new UnsupportedOperationException("The methodpublicabstractint org.apache.dubbo.rpc.Protocol.getDefaultPort()of interfaceorg.apache.dubbo.rpc.Protocolisnotadaptive method !");
}
}

以调用 Protocol 的接口的 refer方法为例,下面给大家看一下自适应拓展的整个过程:

1)先通过 Protocol REF_PROTOCOL =ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); 生成了 Protocol$Adaptive 代理类

2)传参 Url 为:dubbo://192.168.1.247:20887/org.apache.dubbo.config.spring.api.DemoService,调用 Protocol 的 refer 方法,此时直接调用是 Protocol$Adaptive 代理类的 refer 方法

3)在 Protocol$Adaptive 的 refer 方法中先调用 url 中的 getProtocol()方法获取拓展类名称,赋值给 extName 变量

4)然后调用 org.apache.dubbo.rpc.Protocol extension =(org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName); 语句获取到具体的实现类的实例

5)最后执行 extension.refer(arg0, arg1) 语句,调用 4 中获取到的具体实现类的 refer 方法,最终返回结果

服务暴露与发现

概述

dubbo是一个简单易用的RPC框架,通过简单的提供者,消费者配置就能完成无感的网络调用。那么在dubbo中是如何将提供者的服务暴露出去,消费者又是如何获取到提供者相关信息的呢?这就是本章我们要讨论的内容。

Spring 中自定义Schema

在了解dubbo的服务注册和服务发现之前,我们首先需要掌握一个知识点:Spring中自定义Schema。

dubbo Provider 在容器启动后开始暴露服务,并准备接受请求处理。所以可以理解为容器的启动带动dubbo Provider开始工作。

Dubbo 现在的设计是完全无侵入,也就是使用者只依赖于配置契约。在Dubbo 中,可以使用 XML 配置相关信息来引入服务或者导出服务(也可以使用注解)。配置完成,启动工程,Spring 会读取配置文件,生成注入相关Bean。那 Dubbo 如何实现自定义 XML 被 Spring 加载读取呢?(涉及到dubbo和spring的集成)

从 Spring 2.0 开始,Spring 开始提供了一种基于 XML Schema 格式扩展机制,用于自定义和配置 bean,从而被spring框架所解析。

案例使用

学习和使用Spring XML Schema 扩展机制并不难,需要下面几个步骤:

  1. 创建配置属性的JavaBean对象
  2. 创建一个 XML Schema 文件,描述自定义的合法构建模块,也就是xsd文件。
  3. 自定义处理器类,并实现 NamespaceHandler 接口。
  4. 自定义解析器,实现 BeanDefinitionParser 接口(最关键的部分)。
  5. 编写Spring.handlers和spring.schemas文件配置所有部件
    定义JavaBean对象,在spring中此对象会根据配置自动创建
1
2
3
4
5
6
public class User { 
private String id;
private String name;
private Integer age;
//省略getter setter方法
}

在META-INF下定义 user.xsd 文件,使用xsd用于描述标签的规则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<?xml version="1.0" encoding="UTF-8"?>  
<xsd:schema
xmlns="http://www.itheima.com/schema/user"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:beans="http://www.springframework.org/schema/beans"
targetNamespace="http://www.itheima.com/schema/user"
elementFormDefault="qualified"
attributeFormDefault="unqualified">
<xsd:import namespace="http://www.springframework.org/schema/beans" />
<xsd:element name="user">
<xsd:complexType>
<xsd:complexContent>
<xsd:extension base="beans:identifiedType">
<xsd:attribute name="name" type="xsd:string" />
<xsd:attribute name="age" type="xsd:int" />
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
</xsd:element>
</xsd:schema>

Spring读取xml文件时,会根据标签的命名空间找到其对应的NamespaceHandler,我们在NamespaceHandler内会注册标签对应的解析器BeanDefinitionParser。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.itheima.schema;


import org.springframework.beans.factory.xml.NamespaceHandlerSupport;

public class UserNamespaceHandler extends NamespaceHandlerSupport {

public void init() {
/***
* user.xsd文件中 name="user"
* 解析user节点
*/
registerBeanDefinitionParser("user", new UserBeanDefinitionParser());
}
}

BeanDefinitionParser是标签对应的解析器,Spring读取到对应标签时会使用该类进行解析;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.itheima.schema;

import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser;
import org.springframework.util.StringUtils;
import org.w3c.dom.Element;

public class UserBeanDefinitionParser extends AbstractSingleBeanDefinitionParser {

protected Class getBeanClass(Element element) {
return User.class;
}

protected void doParse(Element element, BeanDefinitionBuilder bean) {
// element 在配置文件中配置的 <itheima:user id="user" name="zhangsan" age="12"></itheima:user>
String name = element.getAttribute("name");
String age = element.getAttribute("age");
String id = element.getAttribute("id");

if (StringUtils.hasText(id)) {
bean.addPropertyValue("id", id);
}
if (StringUtils.hasText(name)) {
bean.addPropertyValue("name", name);
}
if (StringUtils.hasText(age)) {
bean.addPropertyValue("age", Integer.valueOf(age));
}
}
}

定义spring.handlers文件,内部保存命名空间与NamespaceHandler类的对应关系;必须放在classpath下的META-INF文件夹中。

1
http\://www.itheima.com/schema/user=com.itheima.schema.UserNamespaceHandler

定义spring.schemas文件,内部保存命名空间对应的xsd文件位置;必须放在classpath下的META-INF文件夹中。

1
http\://www.itheima.com/schema/user.xsd=META-INF/user.xsd

代码准备好了之后,就可以在spring工程中进行使用和测试,定义spring配置文件,导入对应约束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:itheima="http://www.itheima.com/schema/user"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
http://www.itheima.com/schema/user http://www.itheima.com/schema/user.xsd">

<!-- 创建了一个对象,并交给SpringIOC容器管理-->
<itheima:user id="user" name="zhangsan" age="12"></itheima:user>

</beans>

编写测试类,通过spring容器获取对象user

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.itheima.schema.test;

import com.itheima.schema.User;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class SchemaDemoTest {

public static void main(String[] args) {
ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:spring/applicationContext.xml");
User user = (User)ctx.getBean("user");
System.out.println(user);

//for (String beanDefinitionName : ctx.getBeanDefinitionNames()) {
// System.out.println(beanDefinitionName);
//}
}
}

dubbo中的相关对象

Dubbo是运行在spring容器中,dubbo的配置文件也是通过spring的配置文件applicationContext.xml来加载,所以dubbo的自定义配置标签实现,其实同样依赖spring的xml schema机制

1、在 dubbo-demo-xml 模块中可以查看dubbo如何在spring的配置文件中进行配置

image-20220416085047564

2、spring如何来解析这些配置并注册对应的bean呢?通过前面的知识我们知道了是通过spring 的 Schema机制

找到 dubbo-config/dubbo-config-spring 模块,

image-20220416085101716

核心在 DubboNamespaceHandler 中

image-20220416085112875

image-20220416085119009

可以看出Dubbo所有的组件都是由 DubboBeanDefinitionParser 解析,

并通过registerBeanDefinitionParser方法来注册到spring中最后解析对应的对象。这些对象中我们重点关注的有以下两个:

  • ServiceBean:服务提供者暴露服务的核心对象
  • ReferenceBean:服务消费者发现服务的核心对象
  • RegistryConfig:定义注册中心的核心配置对象

image-20230922105958364

服务暴露机制

前面主要探讨了 Dubbo 中 schema 、 XML 的相关原理 , 这些内容对理解框架整体至关重要 , 在此基础上我们继续探讨服务是如何依靠前面的配置进行服务暴露

术语解释

在 Dubbo 的核心领域模型中:

Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。在服务提供方,Invoker用于调用服务提供类。在服务消费方,Invoker用于执行远程调用。

由于 Invoker 是 Dubbo 领域模型中非常重要的一个概念,很多设计思路都是向它靠拢。这就使得 Invoker 渗透在整个实现代码里,对于刚开始接触 Dubbo 的人,确实容易给搞混了。 下面我们用一个精简的图来说明最重要的两种 Invoker :服务提供 Invoker 和服务消费Invoker

image-20220416085439978

Protocol 是服务域,它是 Invoker 暴露和引用的主功能入口,它负责Invoker 的生命周期管理。

  • export:暴露远程服务
  • refer:引用远程服务

proxyFactory:获取一个接口的代理类

  • getInvoker:针对server端,将服务对象,如DemoServiceImpl包装成一个Invoker对象
  • getProxy:针对client端,创建接口的代理对象,例如DemoService接口的代理实现。

Invocation 是会话域,它持有调用过程中的变量,比如方法名,参数等

流程机制

在详细探讨服务暴露细节之前 , 我们先看一下整体dubbo的服务暴露原理

image-20220416085630547

在整体上看,Dubbo 框架做服务暴露分为两大部分 ,

1、第一步将持有的服务实例通过代理转换成 Invoker,

2、第二步会把 Invoker 通过具体的协议 ( 比如 Dubbo ) 转换成Exporter, 框架做了这层抽象也大大方便了功能扩展 。

服务提供方暴露服务的蓝色初始化链,时序图如下:

image-20220416085653889

图解

ServiceConfig.doExportUrlsFor1Protocol—->RegistryProtocol .export

image-20220418092253510

RegistryProtocol .export—>DubboProtocol.export

image-20220418092333232

1、DubboProtocol.export—->HeaderExchanger.bind

image-20220418092349053

HeaderExchanger.bind—->NettyTransporter.bind

image-20220418092426348

NettyTransporter.bind—->NettyServer.doOpen

image-20220418092452768

2、RegistryProtocol.register—>FailbackRegistry.register—>ZookeeperRegistry.doRegister

image-20220418092516302

源码分析

(1) 导出入口

服务导出的入口方法是 ServiceBean 的 onApplicationEvent。onApplicationEvent 是一个事件响应方法,该方法会在收到 Spring 上下文刷新事件后执行服务导出操作。方法代码如下:

1
2
3
4
5
6
7
public void onApplicationEvent(ContextRefreshedEvent event) { 
// 是否有延迟导出 && 是否已导出 && 是不是已被取消导出
if (isDelay() && !isExported() && !isUnexported()) {
// 导出服务
export();
}
}

onApplicationEvent 方法在经过一些判断后,会决定是否调用 export 方法导出服务。在export 根据配置执行相应的动作。最终进入到ServiceConfig.doExportUrls 导出服务方法

1
2
3
4
5
6
7
8
9
10
11
private void doExportUrls() { 
// 加载注册中心链接
List<URL> registryURLs = loadRegistries(true);
// 遍历 protocols,并在每个协议下导出服务
for (ProtocolConfig protocolConfig : protocols) {
.....
//核心,其他略
doExportUrlsFor1Protocol(protocolConfig,
registryURLs);
}
}

关于多协议多注册中心导出服务首先是根据配置,以及其他一些信息组装URL。前面说过,URL 是 Dubbo 配置的载体,通过 URL 可让 Dubbo 的各种配置在各个模块之间传递。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
//服务暴露
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//协议名称
String name = protocolConfig.getName();
if (StringUtils.isEmpty(name)) {
name = DUBBO;
}
//装配配置参数信息--------------start--------------
// 添加 side、版本、时间戳以及进程号等信息到 map 中
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, PROVIDER_SIDE);
appendRuntimeParameters(map);

// 通过反射将对象的字段信息添加到 map 中
appendParameters(map, metrics);
appendParameters(map, application); //<dubbo:application 标签配置
appendParameters(map, module);
// remove 'default.' prefix for configs from ProviderConfig
// appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, provider); // <dubbo:provider 标签配置
appendParameters(map, protocolConfig); //装配协议配置信息
appendParameters(map, this);

// methods 为 MethodConfig 集合,MethodConfig 中存储了 <dubbo:method> 标签的配置信息
if (CollectionUtils.isNotEmpty(methods)) {
for (MethodConfig method : methods) {
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
List<ArgumentConfig> arguments = method.getArguments();
if (CollectionUtils.isNotEmpty(arguments)) {
for (ArgumentConfig argument : arguments) {
// convert argument type
if (argument.getType() != null && argument.getType().length() > 0) {
Method[] methods = interfaceClass.getMethods();
// visit all methods
if (methods != null && methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// target the method, and get its signature
if (methodName.equals(method.getName())) {
Class<?>[] argtypes = methods[i].getParameterTypes();
// one callback in the method
if (argument.getIndex() != -1) {
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
} else {
// multiple callbacks in the method
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
if (argclazz.getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if (argument.getIndex() != -1) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}

}
}
} // end of methods for
}

// 检测 generic 是否为 "true",并根据检测结果向 map 中添加不同的信息
if (ProtocolUtils.isGeneric(generic)) {
map.put(GENERIC_KEY, generic);
map.put(METHODS_KEY, ANY_VALUE);
} else {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put(REVISION_KEY, revision);
}
// 为接口生成包裹类 Wrapper,Wrapper 中包含了接口的详细信息,比如接口方法名数组,字段信息等
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put(METHODS_KEY, ANY_VALUE);
} else {
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(TOKEN_KEY, token);
}
}

//装配配置参数信息--------------end--------------


// export service 获取 host 和 port
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
// 组装 URL
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);

// SPI 自适应加载
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
/*<dubbo:provider export="true" scope="remote" />*/
String scope = url.getParameter(SCOPE_KEY);//服务导出范围
// don't export when none is configured 如果 scope = none,则什么都不做
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

// export to local if the config is not remote (export to remote only when config is remote) cope != remote,导出到本地
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
/**
* 导出服务到本地,injvm
*/
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local) scope != local,导出到远程
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (!isOnlyInJvm() && logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (CollectionUtils.isNotEmpty(registryURLs)) { //有注册中心的配置信息
for (URL registryURL : registryURLs) {
//if protocol is only injvm ,not register
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
// 加载监视器链接
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
// 将监视器链接作为参数添加到 url 中
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}

// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
// 为服务提供类(ref)生成 Invoker
/**
* Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,
* 它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现
*
* 在服务提供方,Invoker用于调用服务提供类。在服务消费方,Invoker用于执行远程调用
*
* Invoker 是由 ProxyFactory 创建而来,Dubbo 默认的 ProxyFactory 实现类是 JavassistProxyFactory
*/
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
// DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfig
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

/**
* 导出服务,并生成 Exporter 与导出服务到本地相比,导出服务到远程的过程要复杂不少,其包含了服务导出与服务注册两个过程
*
* debug此处查看应该走哪个protocol的export
* 走的是 RegistryProtocol
*/
Exporter<?> exporter = protocol.export(wrapperInvoker);
/**
* 将服务暴露返回的Exporter实例存储到ServiceConfig的 List<Exporter<?>> exporters 中
*/
exporters.add(exporter);
}
} else { //无注册中心的配置信息 仅导出服务 不注册
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
/**
* @since 2.7.0
* ServiceData Store
*/
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
metadataReportService.publishProvider(url);
}
}
}
this.urls.add(url);
}

上面的代码首先是将一些信息,比如版本、时间戳、方法名以及各种配置对象的字段信息放入到 map 中,最后将 map 和主机名等数据传给 URL 构造方法创建 URL 对象。前置工作做完,接下来就可以进行服务导出了。服务导出分为导出到本地 (JVM),和导出到远程。在深入分析服务导出的源码前,我们先来从宏观层面上看一下服务导出逻辑。如下:

上面代码根据 url 中的 scope 参数决定服务导出方式,分别如下:

  • scope = none,不导出服务
  • scope != remote,导出到本地
  • scope != local,导出到远程

不管是导出到本地,还是远程。进行服务导出之前,均需要先创建Invoker,这是一个很重要的步骤。因此下面先来分析 Invoker 的创建过程。Invoker 是由 ProxyFactory 创建而来,Dubbo 默认的 ProxyFactory 实现类是JavassistProxyFactory。下面我们到 JavassistProxyFactory 代码中,探索Invoker 的创建过程。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
// 为目标类创建 Wrapper
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
// 创建匿名 Invoker 类对象,并实现 doInvoke 方法。
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable {
// 调用 Wrapper 的 invokeMethod 方法,invokeMethod 最终会调用目标方法
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}

如上,JavassistProxyFactory 创建了一个继承自 AbstractProxyInvoker 类的匿名对象,并覆写了抽象方法 doInvoke。

(2) 导出服务到本地

Invoke创建成功之后,接下来我们来看本地导出

1
2
3
4
5
6
7
8
9
10
11
12
13
private void exportLocal(URL url) { 
// 如果 URL 的协议头等于 injvm,说明已经导出到本地了,无需再 次导出
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL) // 设置协议头为 injvm
.setHost(LOCALHOST)
.setPort(0);
ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
// 创建 Invoker,并导出服务,这里的 protocol 会在运行 时调用 InjvmProtocol 的 export 方法
Exporter<?> exporter = protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
}
}

exportLocal 方法比较简单,首先根据 URL 协议头决定是否导出服务。若需导出,则创建一个新的 URL 并将协议头、主机名以及端口设置成新的值。然后创建 Invoker,并调用 InjvmProtocol 的 export 方法导出服务。下面我们来看一下 InjvmProtocol 的 export 方法都做了哪些事情。

1
2
3
4
5
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { 
// 创建 InjvmExporter
return new InjvmExporter<T>(invoker,
invoker.getUrl().getServiceKey(), exporterMap);
}

如上,InjvmProtocol 的 export 方法仅创建了一个 InjvmExporter,无其他逻辑。到此导出服务到本地就分析完了。

(3) 导出服务到远程(重点)

接下来,我们继续分析导出服务到远程的过程。导出服务到远程包含了服务导出与服务注册两个过程。先来分析服务导出逻辑。我们把目光移动到RegistryProtocol 的 export 方法上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { 
// 1. 导出服务
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
// 2. 获取注册中心 URL
URL registryUrl = getRegistryUrl(originInvoker);
// 3. 根据 URL 加载 Registry 实现类,比如 ZookeeperRegistry
final Registry registry = getRegistry(originInvoker);
// 4. 获取已注册的服务提供者 URL
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
// 5. 获取 register 参数
boolean register = registeredProviderUrl.getParameter("register", true);
// 6. 向服务提供者与消费者注册表中注册服务提供者
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
//7. 根据 register 的值决定是否注册服务
if (register) {
// 向注册中心注册服务(dubbo协议, zookeeper)
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// 8. 获取订阅 URL,比如:
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
// 9. 创建监听器
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
// 10. 向注册中心进行订阅 override 数据
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
// 11. 创建并返回 DestroyableExporter
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl,
registeredProviderUrl);
}

上面代码看起来比较复杂,主要做如下一些操作:

  1. 调用 doLocalExport 导出服务

  2. 向注册中心注册服务

  3. 向注册中心进行订阅 override 数据

  4. 创建并返回 DestroyableExporter

下面先来分析 doLocalExport 方法的逻辑,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) { 
String key = getCacheKey(originInvoker);
// 1. 访问缓存
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
// 创建 Invoker 为委托类对象
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
// 调用 protocol 的 export 方法导出服务
exporter = new ExporterChangeableWrapper<T>((Exporter<T>)
protocol.export(invokerDelegete), originInvoker);
// 写缓存
bounds.put(key, exporter);
}
}
}
return exporter;
}

接下来,我们把重点放在 Protocol 的 export 方法上。假设运行时协议为dubbo,此处的 protocol 变量会在运行时加载 DubboProtocol,并调用DubboProtocol 的 export 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { 
URL url = invoker.getUrl();
// 1. 获取服务标识,理解成服务坐标也行。由服务组名,服务名, 服务版本号以及端口组成。比如:
// demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
String key = serviceKey(url);
// 创建 DubboExporter
DubboExporter<T> exporter = new DubboExporter<T> (invoker, key, exporterMap);
// 将 <key, exporter> 键值对放入缓存中
exporterMap.put(key, exporter);
//省略无关代码
// 启动服务器
openServer(url);
// 优化序列化
optimizeSerialization(url);
return exporter;
}

(4) 开启Netty服务

如上,我们重点关注 DubboExporter 的创建以及 openServer 方法,其他逻辑看不懂也没关系,不影响理解服务导出过程。下面分析 openServer 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void openServer(URL url) { 
// 获取 host:port,并将其作为服务器实例的 key,用于标识当前 的服务器实例
String key = url.getAddress();
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
// 访问缓存
ExchangeServer server = serverMap.get(key);
if (server == null) {
// 创建服务器实例
serverMap.put(key, createServer(url));
} else {
// 服务器已创建,则根据 url 中的配置重置服务器
server.reset(url);
}
}
}

接下来分析服务器实例的创建过程。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private ExchangeServer createServer(URL url) { 
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY,
// 1. 添加心跳检测配置到 url 中
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY,String.valueOf(Constants.DEFAULT_HEARTBEAT));
// 2. 获取 server 参数,默认为 netty
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
// 通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
// 添加编码解码器参数
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
// 创建 ExchangeServer
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server...");
}
// 获取 client 参数,可指定 netty,mina
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
// 获取所有的 Transporter 实现类名称集合,比如 supportedTypes = [netty, mina]
Set<String> supportedTypes =
ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
// 检测当前 Dubbo 所支持的 Transporter 实现类名称列表中,
// 是否包含 client 所表示的 Transporter,若不包含,则抛出异常
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type...");
}
}
return server;
}

如上,createServer 包含三个核心的逻辑。第一是检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常。第二是创建服务器实例。

第三是检测是否支持 client 参数所表示的 Transporter 拓展,不存在也是抛出异常。两次检测操作所对应的代码比较直白了,无需多说。但创建服务器的操作目前还不是很清晰,我们继续往下看。

1
2
3
4
5
6
7
8
9
10
11
12
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { 
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 获取 Exchanger,默认为 HeaderExchanger。
// 紧接着调用 HeaderExchanger 的 bind 方法创建 ExchangeServer 实例
return getExchanger(url).bind(url, handler);
}

上面代码比较简单,就不多说了。下面看一下 HeaderExchanger 的 bind方法。

1
2
3
4
5
6
7
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { 
// 创建 HeaderExchangeServer 实例,该方法包含了多个逻辑,分 别如下:
// 1. new HeaderExchangeHandler(handler)
// 2. new DecodeHandler(new HeaderExchangeHandler(handler))
// 3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

HeaderExchanger 的 bind 方法包含的逻辑比较多,但目前我们仅需关心Transporters 的 bind 方法逻辑即可。该方法的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { 
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
// 如果 handlers 元素数量大于1,则创建 ChannelHandler 分发器
handler = new ChannelHandlerDispatcher(handlers);
}
// 获取自适应 Transporter 实例,并调用实例方法
return getTransporter().bind(url, handler);
}

如上,getTransporter() 方法获取的 Transporter 是在运行时动态创建的,类名为 TransporterAdaptive,也就是自适应拓展类。

TransporterAdaptive 会在运行时根据传入的 URL 参数决定加载什么类型的Transporter,默认为 NettyTransporter

调用 NettyTransporter.bind(URL, ChannelHandler) 方法。创建一个NettyServer 实例。调用 NettyServer.doOPen() 方法,服务器被开启,服务也被暴露出来了。

(5) 服务注册

本节内容以 Zookeeper 注册中心作为分析目标,其他类型注册中心大家可自行分析。下面从服务注册的入口方法开始分析,我们把目光再次移到RegistryProtocol 的 export 方法上。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { 
// ${导出服务}
// 省略其他代码
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
// 注册服务
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
// 订阅 override 数据
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
// 省略部分代码
}

RegistryProtocol 的 export 方法包含了服务导出,注册,以及数据订阅等逻辑。其中服务导出逻辑上一节已经分析过了,本节将分析服务注册逻辑,相关代码如下:

1
2
3
4
5
6
public void register(URL registryUrl, URL registedProviderUrl) { 
// 获取 Registry
Registry registry = registryFactory.getRegistry(registryUrl);
// 注册服务
registry.register(registedProviderUrl);
}

register 方法包含两步操作,第一步是获取注册中心实例,第二步是向注册中心注册服务。接下来分两节内容对这两步操作进行分析。

这里以 Zookeeper 注册中心为例进行分析。下面先来看一下 getRegistry方法的源码,这个方法由 AbstractRegistryFactory 实现。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public Registry getRegistry(URL url) { 
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
String key = url.toServiceString();
LOCK.lock();
try {
// 访问缓存
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
// 缓存未命中,创建 Registry 实例
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry...");
}
// 写入缓存
REGISTRIES.put(key, registry);
return registry;
} finally {
LOCK.unlock();
}
}
protected abstract Registry createRegistry(URL url);

如上,getRegistry 方法先访问缓存,缓存未命中则调用 createRegistry 创 建 Registry。在此方法中就是通过 new ZookeeperRegistry(url, zookeeperTransporter) 实例化一个注册中心

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { 
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// 获取组名,默认为 dubbo
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
// group = "/" + group
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
// 创建 Zookeeper 客户端,默认为
CuratorZookeeperTransporter zkClient = zookeeperTransporter.connect(url);
// 添加状态监听器
zkClient.addStateListener(new StateListener() {
@Override
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}

在上面的代码代码中,我们重点关注 ZookeeperTransporter 的 connect方法调用,这个方法用于创建 Zookeeper 客户端。创建好 Zookeeper 客户端,意味着注册中心的创建过程就结束了。接下来,再来分析一下 Zookeeper客户端的创建过程。

1
2
3
4
public ZookeeperClient connect(URL url) { 
// 创建 CuratorZookeeperClient
return new CuratorZookeeperClient(url);
}

继续向下看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher> { 
private final CuratorFramework client;
public CuratorZookeeperClient(URL url) {
super(url);
try {
// 创建 CuratorFramework 构造器
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(url.getBackupAddress())
.retryPolicy(new RetryNTimes(1, 1000))
.connectionTimeoutMs(5000);
String authority = url.getAuthority();
if (authority != null && authority.length() > 0) {
builder = builder.authorization("digest",
authority.getBytes());
}
// 构建 CuratorFramework 实例
client = builder.build();
//省略无关代码
// 启动客户端
client.start();
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e); } } }

CuratorZookeeperClient 构造方法主要用于创建和启动CuratorFramework 实例。至此Zookeeper客户端就已经启动了

下面我们将 Dubbo 的 demo 跑起来,然后通过 Zookeeper 可视化客户端ZooInspector 查看节点数据。如下:

image-20220416092126061

从上图中可以看到DemoService 这个服务对应的配置信息最终被注册到了zookeeper节点下。搞懂了服务注册的本质,那么接下来我们就可以去阅读服务注册的代码了。

1
2
3
4
5
6
7
8
9
10
11
protected void doRegister(URL url) { 
try {
// 通过 Zookeeper 客户端创建节点,节点路径由 toUrlPath 方法生成,路径格式如下:
// /${group}/${serviceInterface}/providers/${url}
// 比如
// /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register...");
}
}

如上,ZookeeperRegistry 在 doRegister 中调用了 Zookeeper 客户端创建服务节点。节点路径由 toUrlPath 方法生成,该方法逻辑不难理解,就不分析了。接下来分析 create 方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void create(String path, boolean ephemeral) { 
if (!ephemeral) {
// 如果要创建的节点类型非临时节点,那么这里要检测节点是否存在
if (checkExists(path)) {
return;
}
}
int i = path.lastIndexOf('/');
if (i > 0) {
// 递归创建上一级路径
create(path.substring(0, i), false);
}
// 根据 ephemeral 的值创建临时或持久节点
if (ephemeral) {
createEphemeral(path);
} else {
createPersistent(path);
}
}

好了,到此关于服务注册的过程就分析完了。整个过程可简单总结为:先创建注册中心实例,之后再通过注册中心实例注册服务。

总结

  1. 在有注册中心,需要注册提供者地址的情况下,ServiceConfig 解析出的 URL 格式为: registry:// registry- host/org.apache.dubbo.registry.RegistryService?export=URL.encode(“dubbo://service-host/{服务名}/{版本号}”)

  2. 基于 Dubbo SPI 的自适应机制,通过 URL registry:// 协议头识别,就调用 RegistryProtocol#export() 方法

    1. 将具体的服务类名,比如 DubboServiceRegistryImpl ,通过 ProxyFactory 包装成 Invoker 实例
    1. 调用 doLocalExport 方法,使用 DubboProtocol 将 Invoker转化为 Exporter 实例,并打开 Netty 服务端监听客户请求
    1. 创建 Registry 实例,连接 Zookeeper,并在服务节点下写入提供者的 URL 地址,注册服务
    1. 向注册中心订阅 override 数据,并返回一个 Exporter 实例
  1. 根据 URL 格式中的 “dubbo://service-host/{服务名}/{版本号}” 中协议头 dubbo:// 识别,调用 DubboProtocol#export() 方法,开发服务端口

  2. RegistryProtocol#export() 返回的 Exporter 实例存放到ServiceConfig 的 List<Exporter> exporters 中

服务发现

在学习了服务暴露原理之后 , 接下来重点探讨服务是如何消费的 。 这里主要讲解如何通过注册中心进行服务发现进行远程服务调用等细节 。

服务发现流程

在详细探讨服务暴露细节之前 , 我们先看一下整体duubo的服务消费原理

image-20220418081038913

在整体上看 , Dubbo 框架做服务消费也分为两大部分 , 第一步通过持有远程服务实例生成Invoker, 这个 Invoker 在客户端是核心的远程代理对象 。 第二步会把 Invoker 通过动态代理转换成实现用户接口的动态代理引用 。服务消费方引用服务的蓝色初始化链,时序图如下:

image-20220418081057003

图解

ReferenceBean.getObject—>RegistryRrotocol.refer

image-20220418092555248

1、引用服务实例得到Invoker,RegistryRrotocol.refer

image-20220418092618364

RegistryDirectory.notify—>DubboProtocol.protocolBindingRefer

image-20220418092637382

HeaderExchanger.connect—>

image-20220418092651921

HeaderExchanger.connect—>NettyClient.doOpen&.doConnect

image-20220418092704175

2、根据invoker创建代理

image-20220418092723779

源码分析

(1) 引用入口
服务引用的入口方法为 ReferenceBean 的 getObject 方法,该方法定义在Spring 的 FactoryBean 接口中,ReferenceBean 实现了这个方法。

1
2
3
4
5
6
7
8
9
10
11
12
public Object getObject() throws Exception { 
return get();
}
public synchronized T get() {
// 检测 ref 是否为空,为空则通过 init 方法创建
if (ref == null) {
// init 方法主要用于处理配置,以及调用 createProxy 生
成代理类
init();
}
return ref;
}

Dubbo 提供了丰富的配置,用于调整和优化框架行为,性能等。Dubbo 在引用或导出服务时,首先会对这些配置进行检查和处理,以保证配置的正确性。

1
2
3
4
private void init() { 
// 创建代理类
ref = createProxy(map);
}

此方法代码很长,主要完成的配置加载,检查,以及创建引用的代理对象。这里要从 createProxy 开始看起。从字面意思上来看,createProxy 似乎只是用于创建代理对象的。但实际上并非如此,该方法还会调用其他方法构建以及合并 Invoker 实例。具体细节如下。

1

上面代码很多,不过逻辑比较清晰。

1、如果是本地调用,直接jvm 协议从内存中获取实例

2、如果只有一个注册中心,直接通过 Protocol 自适应拓展类构建 Invoker实例接口

3、如果有多个注册中心,此时先根据 url 构建 Invoker。然后再通过Cluster 合并多个 Invoker,最后调用 ProxyFactory 生成代理类

(2) 创建客户端
在服务消费方,Invoker 用于执行远程调用。Invoker 是由 Protocol 实现类构建而来。Protocol 实现类有很多,这里分析DubboProtocol
1、在 AbstractProtocol.refer 调用 protocolBindingRefer

1
2
3
4
@Override 
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
return new AsyncToSyncInvoker<> (protocolBindingRefer(type, url));
}

查看 DubboInvoker 中的 refer 方法

1
2
3
4
5
6
7
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { 
optimizeSerialization(url);
// 创建 DubboInvoker
DubboInvoker<T> invoker = new DubboInvoker<T> (serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}

上面方法看起来比较简单,创建一个DubboInvoker。通过构造方法传入远程调用的client对象。默认情况下,Dubbo 使用 NettyClient 进行通信。接下来,我们简单看一下 getClients 方法的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private ExchangeClient[] getClients(URL url) { 
// 是否共享连接
boolean service_share_connect = false;
// 获取连接数,默认为0,表示未配置
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
// 如果未配置 connections,则共享连接
if (connections == 0) {
service_share_connect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect) {
// 获取共享客户端
clients[i] = getSharedClient(url);
} else {
// 初始化新的客户端
clients[i] = initClient(url);
}
}
return clients;
}

这里根据 connections 数量决定是获取共享客户端还是创建新的客户端实例,getSharedClient 方法中也会调用 initClient 方法,因此下面我们一起看一下这个方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private ExchangeClient initClient(URL url) { 
// 获取客户端类型,默认为 netty
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
//省略无关代码
ExchangeClient client;
try {
// 获取 lazy 配置,并根据配置值决定创建的客户端类型
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
// 创建懒加载 ExchangeClient 实例
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
// 创建普通 ExchangeClient 实例
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service...");
}
return client;
}

initClient 方法首先获取用户配置的客户端类型,默认为 netty。下面我们分析一下 Exchangers 的 connect 方法。

1
2
3
4
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { 
// 获取 Exchanger 实例,默认为 HeaderExchangeClient
return getExchanger(url).connect(url, handler);
}

如上,getExchanger 会通过 SPI 加载 HeaderExchangeClient 实例,这个方法比较简单,大家自己看一下吧。接下来分析 HeaderExchangeClient 的实现

1
2
3
4
5
6
7
8
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { 
// 这里包含了多个调用,分别如下:
// 1. 创建 HeaderExchangeHandler 对象
// 2. 创建 DecodeHandler 对象
// 3. 通过 Transporters 构建 Client 实例
// 4. 创建 HeaderExchangeClient 对象
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

这里的调用比较多,我们这里重点看一下 Transporters 的 connect 方法。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException { 
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
// 如果 handler 数量大于1,则创建一个 ChannelHandler分发器
handler = new ChannelHandlerDispatcher(handlers);
}
// 获取 Transporter 自适应拓展类,并调用 connect 方法生成Client 实例
return getTransporter().connect(url, handler);
}

如上,getTransporter 方法返回的是自适应拓展类,该类会在运行时根据客户端类型加载指定的 Transporter 实现类。若用户未配置客户端类型,则默认加载 NettyTransporter,并调用该类的 connect 方法。如下:

1
2
3
4
public Client connect(URL url, ChannelHandler listener) throws RemotingException { 
// 创建 NettyClient 对象
return new NettyClient(url, listener);
}

(3) 注册
这里就已经创建好了NettyClient对象。关于 DubboProtocol 的 refer 方法就分析完了。接下来,继续分析 RegistryProtocol 的 refer 方法逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { 
// 取 registry 参数值,并将其设置为协议头
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY,
Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
// 获取注册中心实例
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// 将 url 查询字符串转为 Map
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
// 获取 group 配置
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
// 通过 SPI 加载 MergeableCluster 实例,并调用
doRefer 继续执行服务引用逻辑
return doRefer(getMergeableCluster(), registry, type, url);
}
}
// 调用 doRefer 继续执行服务引用逻辑
return doRefer(cluster, registry, type, url);
}

上面代码首先为 url 设置协议头,然后根据 url 参数加载注册中心实例。然

后获取 group 配置,根据 group 配置决定 doRefer 第一个参数的类型。这里的

重点是 doRefer 方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { 
// 创建 RegistryDirectory 实例
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
// 设置注册中心和协议
directory.setRegistry(registry);
directory.setProtocol(protocol);
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
// 生成服务消费者链接
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, 1type.getName(), parameters);
// 注册服务消费者,在 consumers 目录下新节点
if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false)));
}
// 订阅 providers、configurators、routers 等节点数据
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY));
// 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}

如上,doRefer 方法创建一个 RegistryDirectory 实例,然后生成服务者消费者链接,并向注册中心进行注册。注册完毕后,紧接着订阅 providers、configurators、routers 等节点下的数据。完成订阅后,RegistryDirectory 会收到这几个节点下的子节点信息。由于一个服务可能部署在多台服务器上,这样就会在 providers 产生多个节点,这个时候就需要 Cluster 将多个服务节点合并为一个,并生成一个 Invoker。

(4)创建代理对象
Invoker 创建完毕后,接下来要做的事情是为服务接口生成代理对象。有了代理对象,即可进行远程调用。代理对象生成的入口方法为 ProxyFactory 的getProxy,接下来进行分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public <T> T getProxy(Invoker<T> invoker) throws RpcException { 
// 调用重载方法
return getProxy(invoker, false);
}

public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
Class<?>[] interfaces = null;
// 获取接口列表
String config = invoker.getUrl().getParameter("interfaces");
if (config != null && config.length() > 0) {
// 切分接口列表
String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
if (types != null && types.length > 0) {
interfaces = new Class<?>[types.length + 2];
// 设置服务接口类和 EchoService.class 到 interfaces 中
interfaces[0] = invoker.getInterface();
interfaces[1] = EchoService.class;
for (int i = 0; i < types.length; i++) {
// 加载接口类
interfaces[i + 1] = ReflectUtils.forName(types[i]);
}
}
}
if (interfaces == null) {
interfaces = new Class<?>[] {invoker.getInterface(), EchoService.class};
}
// 为 http 和 hessian 协议提供泛化调用支持,参考 pull request #1827
if (!invoker.getInterface().equals(GenericService.class) && generic) {
int len = interfaces.length;
Class<?>[] temp = interfaces;
// 创建新的 interfaces 数组
interfaces = new Class<?>[len + 1];
System.arraycopy(temp, 0, interfaces, 0, len);
// 设置 GenericService.class 到数组中
interfaces[len] = GenericService.class;
}
// 调用重载方法
return getProxy(invoker, interfaces);
}
public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);

如上,上面大段代码都是用来获取 interfaces 数组的,我们继续往下看。

getProxy(Invoker, Class<?>[]) 这个方法是一个抽象方法,下面我们到JavassistProxyFactory 类中看一下该方法的实现代码。

1
2
3
4
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { 
// 生成 Proxy 子类(Proxy 是抽象类)。并调用 Proxy 子类的 newInstance 方法创建 Proxy 实例
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

上面代码并不多,首先是通过 Proxy 的 getProxy 方法获取 Proxy 子类,然后创建 InvokerInvocationHandler 对象,并将该对象传给 newInstance 生成 Proxy 实例。InvokerInvocationHandler 实现 JDK 的 InvocationHandler 接口,具体的用途是拦截接口类调用。下面以org.apache.dubbo.demo.DemoService 这个接口为例,来看一下该接口代理类代码大致是怎样的(忽略 EchoService 接口)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package org.apache.dubbo.common.bytecode; 
public class proxy0 implements org.apache.dubbo.demo.DemoService {
public static java.lang.reflect.Method[] methods;
private java.lang.reflect.InvocationHandler handler;
public proxy0() {
}
public proxy0(java.lang.reflect.InvocationHandler arg0) {
handler = $1;
}
public java.lang.String sayHello(java.lang.String arg0) {
Object[] args = new Object[1];
args[0] = ($w) $1;
Object ret = handler.invoke(this, methods[0], args);
return (java.lang.String) ret;
}
}

总结

好了,到这里代理类生成逻辑就分析完了。整个过程比较复杂,大家需要耐心看一下。

  1. 从注册中心发现引用服务:在有注册中心,通过注册中心发现提供者地址的情况下,ReferenceConfig 解析出的 URL 格式为:

registry://registry-host:/org.apache.registry.RegistryService?refer=URL.encode(“conumer-host/com.foo.FooService?version=1.0.0”) 。

  1. 通过 URL 的registry://协议头识别,就会调用RegistryProtocol#refer()方法

  2. 查询提供者 URL,如 dubbo://service-host/com.foo.FooService?version=1.0.0 ,来获取注册中心

  3. 创建一个 RegistryDirectory 实例并设置注册中心和协议

  4. 生成 conusmer 连接,在 consumer 目录下创建节点,向注册中心注册

  5. 注册完毕后,订阅 providers,configurators,routers 等节点的数据

  6. 通过 URL 的 dubbo:// 协议头识别,调用DubboProtocol#refer() 方法,创建一个 ExchangeClient

客户端并返回 DubboInvoker 实例

  1. 由于一个服务可能会部署在多台服务器上,这样就会在 providers 产生多个节点,这样也就会得到多个 DubboInvoker 实例,就需要RegistryProtocol 调用 Cluster 将多个服务提供者节点伪装成一个节点,并返回一个 Invoker

  2. Invoker 创建完毕后,调用 ProxyFactory 为服务接口生成代理对象,返回提供者引用

Dubbo高可用集群

服务集群的概述

概述

为了避免单点故障,现在的应用通常至少会部署在两台服务器上,这样就组成了集群。集群就是单机的多实例,在多个服务器上部署多个服务,每个服务就是一个节点,部署N个节点,处理业务的能力就提升 N倍(大约),这些节点的集合就叫做集群。

image-20220418133007571

调用过程

在对集群相关代码进行分析之前,这里有必要先来介绍一下集群容错的所有组件。包含 Cluster、Cluster Invoker、Directory、Router 和 LoadBalance等。

image-20220418133029754

集群工作过程可分为两个阶段,第一个阶段是在服务消费者初始化期间,集群 Cluster 实现类为服务消费者创建 Cluster Invoker 实例,即上图中的merge 操作。第二个阶段是在服务消费者进行远程调用时。以FailoverClusterInvoker 为例,该类型 Cluster Invoker 首先会调用 Directory的 list 方法列举 Invoker 列表(可将 Invoker 简单理解为服务提供者)。Directory 的用途是保存 Invoker,可简单类比为 List。其实现类RegistryDirectory 是一个动态服务目录,可感知注册中心配置的变化,它所持有的 Invoker 列表会随着注册中心内容的变化而变化。每次变化后,RegistryDirectory 会动态增删 Invoker,并调用 Router 的 route 方法进行路由,过滤掉不符合路由规则的 Invoker。当 FailoverClusterInvoker 拿到Directory 返回的 Invoker 列表后,它会通过 LoadBalance 从 Invoker 列表中选择一个 Invoker。最后 FailoverClusterInvoker 会将参数传给 LoadBalance选择出的 Invoker 实例的 invoke 方法,进行真正的远程调用。

组件介绍

Directory:它代表多个Invoker,从methodInvokerMap提取,但是他的值是动态,例如注册中心的变更。
Router:负责从多个Invoker中按路由规则选出子集,例如应用隔离或读写分离或灰度发布等等
Cluster:将Directory中的多个Invoker伪装成一个Invoker,来容错,调用失败重试。
LoadBalance:从多个Invoker选取一个做本次调用,具体包含很多种负载均衡算法。
Invoker:Provider中的一个可调用接口。例如DemoService

集群容错机制

在分布式系统中,集群某个某些节点出现问题是大概率事件,因此在设计分布式RPC框架的过程中,必须要把失败作为设计的一等公民来对待。一次调用失败之后,应该如何选择对失败的选择策略,这是一个见仁见智的问题,每种策略可能都有自己独特的应用场景。因此,作为框架来说,应当针对不同场景提供多种策略,供用户进行选择。
在Dubbo设计中,通过Cluster这个接口的抽象,把一组可供调用的Provider信息组合成为一个统一的 Invoker 供调用方进行调用。经过路由规则过滤,负载均衡选址后,选中一个具体地址进行调用,如果调用失败,则会按照集群配置的容错策略进行容错处理。

内置集群容错策略

Dubbo默认内置了若干容错策略,如果不能满足用户需求,则可以通过自定义容错策略进行配置
Dubbo主要内置了如下几种策略:

  • Failover(失败自动切换)
  • Failsafe(失败安全)
  • Failfast(快速失败)
  • Failback(失败自动恢复)
  • Forking(并行调用)
  • Broadcast(广播调用)

这些名称比较相似,概念也比较容易混淆,下面逐一进行解释。

  1. Failover(失败自动切换) Failover 是高可用系统中的一个常用概念,服务器通常拥有主备两套机器配置,如果主服务器出现故障,则自动切换到备服务器中,从而保证了整体的高可用性。

Dubbo也借鉴了这个思想,并且把它作为Dubbo 默认的容错策略 。当调用出现失败的时候,根据配置的重试次数,会自动从其他可用地址中重新选择一个可用的地址进行调用,直到调用成功,或者是达到重试的上限位置

Dubbo里默认配置的重试次数是2,也就是说,算上第一次调用,最多会调用3次。

其配置方法,容错策略既可以在服务提供方配置,也可以服务调用方进行配置。而重试次数的配置则更为灵活,既可以在服务级别进行配置,也可以在方法级别进行配置。具体优先顺序为:

1
服务调用方方法级配置 > 服务调用方服务级配置 > 服务提供方方法级配置 > 服务提供方服务级配置

以XML方式为例,具体配置方法如下:

服务提供方,服务级配置

1
<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" cluster="failover" retries="2" />

服务提供方,方法级配置

1
2
<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService"cluster="failover"> <dubbo:method name="sayHello" retries="2" /> 
</dubbo:service>

服务调用方,服务级配置

1
<dubbo:reference id="demoService" interface="org.apache.dubbo.demo.DemoService" cluster="failover" retries="1"/>

服务调用方,方法级配置

1
2
<dubbo:reference id="demoService" interface="org.apache.dubbo.demo.DemoService" cluster="failover"><dubbo:method name="sayHello" retries="3" /> 
</dubbo:reference>

Failover可以自动对失败进行重试,对调用者屏蔽了失败的细节,但是Failover策略也会带来一些副作用:

  • 重试会额外增加一下开销,例如增加资源的使用,在高负载系统下,额外的重试可能让系统雪上加霜。
  • 重试会增加调用的响应时间。
  • 某些情况下,重试甚至会造成资源的浪费。考虑一个调用场景,A->B->C,如果A处设置了超时100ms,再B->C的第一次调用完成时已经超过了100ms,但很不幸B->C失败,这时候会进行重试,但其实这时候重试已经没有意义,因此在A看来这次调用已经超时,A可能已经开始执行其他逻辑。

2. Failsafe(失败安全)

失败安全策略的核心是即使失败了也不会影响整个调用流程。通常情况下用于旁路系统或流程中,它的失败不影响核心业务的正确性。在实现上,当出现调用失败时,会忽略错误,并记录一条日志,同时返回一个空结果,在上游看来调用是成功的。

应用场景,可以用于写入审计日志等操作。

具体配置方法:

服务提供方,服务级配置

1
<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" cluster="failsafe" />

服务调用方,服务级配置

1
<dubbo:reference id="demoService" interface="org.apache.dubbo.demo.DemoService" cluster="failsafe"/>

其中服务调用方配置优先于服务提供方配置。

3. Failfast(快速失败)

某些业务场景中,某些操作可能是非幂等的,如果重复发起调用,可能会导致出现脏数据等。例如调用某个服务,其中包含一个数据库的写操作,如果写操作完成,但是在发送结果给调用方的过程中出错了,那么在调用方看来这次调用失败了,但其实数据写入已经完成。这种情况下,重试可能并不是一个好策略,这时候就需要使用到 Failfast 策略,调用失败立即报错。让调用方来决定下一步的操作并保证业务的幂等性。

具体配置方法:

服务提供方,服务级配置

1
<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" cluster="failfast" />

服务调用方,服务级配置

1
<dubbo:reference id="demoService" interface="org.apache.dubbo.demo.DemoService" cluster="failfast"/>

其中服务调用方配置优先于服务提供方配置。

4. Failback(失败自动恢复)

Failback 通常和 Failover 两个概念联系在一起。在高可用系统中,当主机发生故障,通过 Failover 进行主备切换后,待故障恢复后,系统应该具备自动恢复原始配置的能力。

Dubbo中的 Failback 策略中,如果调用失败,则此次失败相当于Failsafe ,将返回一个空结果。而与 Failsafe 不同的是,Failback策略会将这次调用加入内存中的失败列表中,对于这个列表中的失败调用,会在另一个线程中进行异步重试,重试如果再发生失败,则会忽略,即使重试调用成功,原来的调用方也感知不到了。因此它通常适合于,对于实时性要求不高,且不需要返回值的一些异步操作

具体配置方法:

服务提供方,服务级配置

1
<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" cluster="failback" />

服务调用方,服务级配置

1
<dubbo:reference id="demoService" interface="org.apache.dubbo.demo.DemoService" cluster="failback"/>

其中服务调用方配置优先于服务提供方配置。

按照目前的实现,Failback策略还有一些局限,例如内存中的失败调用列表没有上限,可能导致堆积(2.7.X版本默认是3次),异步重试的执行间隔无法调整,默认是5秒,可参看配置

org.apache.dubbo.rpc.cluster.Constants

5. Forking(并行调用)

上述几种策略中,主要都是针对调用失败发生后如何进行弥补的角度去考虑的,而 Forking 策略则跟上述几种策略不同,是一种典型的用成本换时间的思路。即第一次调用的时候就同时发起多个调用,只要其中一个调用成功,就认为成功。在资源充足,且对于失败的容忍度较低的场景下,可以采用此策略;

具体配置方法:

服务提供方,服务级配置

1
<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" cluster="forking" />

服务调用方,服务级配置

1
<dubbo:reference id="demoService" interface="org.apache.dubbo.demo.DemoService" cluster="forking"/>

其中服务调用方配置优先于服务提供方配置。

6. Broadcast(广播调用)

在某些场景下,可能需要对服务的所有提供者进行操作,此时可以使用广播调用策略。此策略会逐个调用所有提供者,只要任意有一个提供者出错,则认为此次调用出错。通常用于通知所有提供者更新缓存或日志等本地资源信息

具体配置方法:

服务提供方,服务级配置

1
<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" cluster="broadcast" />

服务调用方,服务级配置

1
<dubbo:reference id="demoService" interface="org.apache.dubbo.demo.DemoService" cluster="broadcast"/>

其中服务调用方配置优先于服务提供方配置。

集群容错调优

下表对各种策略做一个简单对比,

image-20220418133823910

综上我们得知,不同的容错策略往往对应不同的业务处理,这里做一个总结如下:

Failover :通常用于对调用retry不敏感的场景,如读操作;但重试会带来更长延迟

Failfast :通常用于非幂等性操作,需要快速感知失败的场景;比如新增记录

Failsafe :通常用于旁路系统,失败不影响核心流程正确性的场景;如日志记录

Failback :通常用于对于实时性要求不高,且不需要返回值的一些异步操作的场景

Forking :通常用于资源充足,且对于失败的容忍度较低,实时性要求高的读操作,但需要浪费更多服务资源

Broadcast:如通知所有提供者更新缓存或日志等本地资源信息

源码分析

我们在上一章看到了两个概念,分别是集群接口 Cluster 和 Cluster Invoker,这两者是不同的。

Cluster 是接口,而 Cluster Invoker 是一种 Invoker。服务提供者的选择逻辑,以及远程调用失败后的的处理逻辑均是封装在 Cluster Invoker 中。

那么 Cluster 接口和相关实现类有什么用呢?用途比较简单,仅用于生成Cluster Invoker。下面我们来看一下源码。

1
2
3
4
5
6
7
8
public class FailoverCluster implements Cluster { 
public final static String NAME = "failover";
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
// 创建并返回 FailoverClusterInvoker 对象
return new FailoverClusterInvoker<T>(directory);
}
}

如上,FailoverCluster 总共就包含这几行代码,用于创建FailoverClusterInvoker 对象,很简单。下面再看一个。

1
2
3
4
5
6
7
8
public class FailbackCluster implements Cluster { 
public final static String NAME = "failback";
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
// 创建并返回 FailbackClusterInvoker 对象
return new FailbackClusterInvoker<T>(directory);
}
}

如上,FailbackCluster 的逻辑也是很简单,无需解释了。所以接下来,我们把重点放在各种 Cluster Invoker 上

1. AbstractClusterInvoker

我们首先从各种 Cluster Invoker 的父类 AbstractClusterInvoker 源码开始说起。

前面说过,集群工作过程可分为两个阶段,第一个阶段是在服务消费者初始化期间,这个在服务引用那篇文章中分析过,就不赘述。

第二个阶段是在服务消费者进行远程调用时,此时AbstractClusterInvoker 的 invoke 方法会被调用。列举 Invoker,负载均衡等操作均会在此阶段被执行。因此下面先来看一下 invoke 方法的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public Result invoke(final Invocation invocation) throws RpcException { 
checkWhetherDestroyed();
LoadBalance loadbalance = null;
// 绑定 attachments 到 invocation 中.
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}
// 列举 Invoker
List<Invoker<T>> invokers = list(invocation);
if (invokers != null && !invokers.isEmpty()) {
// 加载 LoadBalance
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getE
xtension(invokers.get(0).getUrl()
.getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// 调用 doInvoke 进行后续操作
return doInvoke(invocation, invokers, loadbalance);
}
// 抽象方法,由子类实现
protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException;

AbstractClusterInvoker 的 invoke 方法主要用于列举 Invoker,以及加载LoadBalance。最后再调用模板方法 doInvoke 进行后续操作。下面我们来看一下 Invoker 列举方法 list(Invocation) 的逻辑,如下:

1
2
3
4
5
protected List<Invoker<T>> list(Invocation invocation)  throws RpcException { 
// 调用 Directory 的 list 方法列举 Invoker
List<Invoker<T>> invokers = directory.list(invocation);
return invokers;
}

如上,AbstractClusterInvoker 中的 list 方法做的事情很简单,只是简单的调用了 Directory 的 list 方法,没有其他更多的逻辑了。Directory 即相关实现类在前文已经分析过,这里就不多说了。接下来,我们把目光转移到AbstractClusterInvoker 的各种实现类上,来看一下这些实现类是如何实现doInvoke 方法逻辑的。

2. FailoverClusterInvoker

FailoverClusterInvoker 在调用失败时,会自动切换 Invoker 进行重试。默认配置下,Dubbo 会使用这个类作为缺省 Cluster Invoker。下面来看一下该类的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> { 
// 省略部分代码
@Override
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
// 获取重试次数
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
RpcException le = null;
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size());
Set<String> providers = new HashSet<String>(len);
// 循环调用,失败重试
for (int i = 0; i < len; i++) {
if (i > 0) {
checkWhetherDestroyed();
// 在进行重试前重新列举 Invoker,这样做的好处是,如果某个服务挂了,
// 通过调用 list 可得到最新可用的 Invoker 列表
copyinvokers = list(invocation);
// 对 copyinvokers 进行判空检查
checkInvokers(copyinvokers, invocation);
}
// 通过负载均衡选择 Invoker
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
// 添加到 invoker 到 invoked 列表中
invoked.add(invoker);
// 设置 invoked 到 RPC 上下文中
RpcContext.getContext().setInvokers((List) invoked);
try {
// 调用目标 Invoker 的 invoke 方法
Result result = invoker.invoke(invocation);
return result;
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
// 若重试失败,则抛出异常
throw new RpcException(..., "Failed to invoke the method ...");
}
}

如上,FailoverClusterInvoker 的 doInvoke 方法首先是获取重试次数,然后根据重试次数进行循环调用,失败后进行重试。在 for 循环内,首先是通过负载均衡组件选择一个 Invoker,然后再通过这个 Invoker 的 invoke 方法进行远程调用。如果失败了,记录下异常,并进行重试。重试时会再次调用父类的 list方法列举 Invoker。整个流程大致如此,不是很难理解。下面我们看一下 select方法的逻辑。

1
2
3
4
5
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { 
if (invokers == null || invokers.isEmpty())
return null;
// 获取调用方法名
================ 补全 =========

如上,select 方法的主要逻辑集中在了对粘滞连接特性的支持上。首先是获取 sticky 配置,然后再检测 invokers 列表中是否包含 stickyInvoker,如果

不包含,则认为该 stickyInvoker 不可用,此时将其置空。这里的 invokers 列表可以看做是存活着的服务提供者列表,如果这个列表不包含 stickyInvoker,

那自然而然的认为 stickyInvoker 挂了,所以置空。如果 stickyInvoker 存在于invokers 列表中,此时要进行下一项检测 — 检测 selected 中是否包含

stickyInvoker。如果包含的话,说明 stickyInvoker 在此之前没有成功提供服务(但其仍然处于存活状态)。此时我们认为这个服务不可靠,不应该在重试期

间内再次被调用,因此这个时候不会返回该 stickyInvoker。如果 selected 不包含 stickyInvoker,此时还需要进行可用性检测,比如检测服务提供者网络连通

性等。当可用性检测通过,才可返回 stickyInvoker,否则调用 doSelect 方法选择 Invoker。如果 sticky 为 true,此时会将 doSelect 方法选出的 Invoker 赋值

给 stickyInvoker。

以上就是 select 方法的逻辑,这段逻辑看起来不是很复杂,但是信息量比较大。不搞懂 invokers 和 selected 两个入参的含义,以及粘滞连接特性,这段

代码是不容易看懂的。所以大家在阅读这段代码时,不要忽略了对背景知识的理解。关于 select 方法先分析这么多,继续向下分析。

1
2
3
4
5
6
7
8
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (invokers == null || invokers.isEmpty())
return null;
if (invokers.size() == 1)
return invokers.get(0);
if (loadbalance == null) {
// 如果 loadbalance 为空,这里通过 SPI 加载 Loadbalance,默认为 RandomLoadBalance
======= 补全 ==========

doSelect 主要做了两件事,第一是通过负载均衡组件选择 Invoker。第二

是,如果选出来的 Invoker 不稳定,或不可用,此时需要调用 reselect 方法进

行重选。若 reselect 选出来的 Invoker 为空,此时定位 invoker 在 invokers 列

表中的位置 index,然后获取 index + 1 处的 invoker,这也可以看做是重选逻

辑的一部分。下面我们来看一下 reselect 方法的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
private Invoker<T> reselect(LoadBalance loadbalance, 
Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected,
boolean availablecheck) throws RpcException {
List<Invoker<T>> reselectInvokers = new ArrayList<Invoker<T>>(invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());
// 下面的 if-else 分支逻辑有些冗余,pull request #2826 对这段代码进行了简化,可以参考一下
// 根据 availablecheck 进行不同的处理
if (availablecheck) {
// 遍历 invokers 列表
for (Invoker<T> invoker : invokers) {
// 检测可用性
if (invoker.isAvailable()) {
// 如果 selected 列表不包含当前 invoker,则将其添加到 reselectInvokers 中
if (selected == null || !selected.contains(invoker)) {
reselectInvokers.add(invoker);
}
}
}
// reselectInvokers 不为空,此时通过负载均衡组件进行选择
if (!reselectInvokers.isEmpty()) {
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
// 不检查 Invoker 可用性
} else {
for (Invoker<T> invoker : invokers) {
// 如果 selected 列表不包含当前 invoker,则将其添加到 reselectInvokers 中
if (selected == null || !selected.contains(invoker)) {
reselectInvokers.add(invoker);
}
}
if (!reselectInvokers.isEmpty()) {
// 通过负载均衡组件进行选择
return loadbalance.select(reselectInvokers,getUrl(), invocation);
}
}
{
// 若线程走到此处,说明 reselectInvokers 集合为空,此时不会调用负载均衡组件进行筛选。
// 这里从 selected 列表中查找可用的 Invoker,并将其添加到 reselectInvokers 集合中
if (selected != null) {
for (Invoker<T> invoker : selected) {
if ((invoker.isAvailable()) && !reselectInvokers.contains(invoker)) {
reselectInvokers.add(invoker);
}
}
}
if (!reselectInvokers.isEmpty()) {
// 再次进行选择,并返回选择结果
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
}
return null;
}

reselect 方法总结下来其实只做了两件事情,第一是查找可用的 Invoker,并将其添加到 reselectInvokers 集合中。第二,如果 reselectInvokers 不为空,则通过负载均衡组件再次进行选择。其中第一件事情又可进行细分,一开始,reselect 从 invokers 列表中查找有效可用的 Invoker,若未能找到,此时再到 selected 列表中继续查找。关于 reselect 方法就先分析到这,继续分析其他的 Cluster Invoker。

3. FailbackClusterInvoker

FailbackClusterInvoker 会在调用失败后,返回一个空结果给服务消费者。并通过定时任务对失败的调用进行重传,适合执行消息通知等操作。下面来看一下它的实现逻辑。