Nacos实战应用

能力目标:

  • 能够实现Nacos安装
  • 基于Nacos能实现应用负载均衡
  • 能基于Nacos实现配置管理
    • 配置管理
    • 负载均衡
    • 多环境切换
    • 配置共享
    • 配置刷新
    • 灰度发布
  • 掌握Nacos集群部署

Nacos概要

Nacos是Alibaba微服务生态组件中的重要组件之一,主要用它实现应用的动态服务发现、配置管理、服务管理。

https://github.com/alibaba/spring-cloud-alibaba/wiki/Nacos-discovery

Nacos是什么

image-20220424133028764

Nacos 致力于帮助您发现、配置和管理微服务。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据及流量管理。

Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。 Nacos 是构建以“服务”为中心的现代应用架构 (例如微服务范式、云原生范式) 的服务基础设施。

Nacos 支持几乎所有主流类型的“服务”的发现、配置和管理

  • 1:Kubernetes Service
  • 2:gRPC & Dubbo RPC Service
  • 3:Spring Cloud RESTful Service

Nacos特性

服务发现和服务健康监测

Nacos 支持基于 DNS 和基于 RPC 的服务发现。服务提供者使用 原生 SDK、OpenAPI、或一个独立的Agent TODO注册 Service 后,服务消费者 可以使用DNS TODO 或HTTP&API查找和发现服务。

Nacos 提供对服务的实时的健康检查,阻止向不健康的主机或服务实例发送 请求。Nacos 支持传输层 (PING 或 TCP)和应用层 (如 HTTP、MySQL、 用户自定义)的健康检查。 对于复杂的云环境和网络拓扑环境中(如 VPC、 边缘网络等)服务的健康检查,Nacos 提供了 agent 上报模式和服务端主 动检测2种健康检查模式。Nacos 还提供了统一的健康检查仪表盘,帮助您根 据健康状态管理服务的可用性及流量。

动态配置服务

动态配置服务可以让您以中心化、外部化和动态化的方式管理所有环境的应用配置和服务配置。

动态配置消除了配置变更时重新部署应用和服务的需要,让配置管理变得更加高效和敏捷。

配置中心化管理让实现无状态服务变得更简单,让服务按需弹性扩展变得更容易。

Nacos 提供了一个简洁易用的UI (控制台样例 Demo) 帮助您管理所有的服 务和应用的配置。Nacos 还提供包括配置版本跟踪、金丝雀发布、一键回滚配置以及客户端配置更新状态跟踪在内的一系列开箱即用的配置管理特性,帮助您更安全地在生产环境中管理配置变更和降低配置变更带来的风险。

动态DNS服务

动态 DNS 服务支持权重路由,让您更容易地实现中间层负载均衡、更灵活的路由策略、流量控制以及数据中心内网的简单DNS解析服务。动态DNS服务还能让您更容易地实现以 DNS 协议为基础的服务发现,以帮助您消除耦合到厂商私有服务发现 API 上的风险。

Nacos 提供了一些简单的 DNS APIs TODO 帮助您管理服务的关联域名和可用的 IP:PORT 列表.

服务及其元数据管理

Nacos 能让您从微服务平台建设的视角管理数据中心的所有服务及元数据,包括管理服务的描述、生命周期、服务的静态依赖分析、服务的健康状态、服务的流量管理、路由及安全策略、服务的 SLA 以及最首要的 metrics 统计数据

Nacos版图

image-20220424133333390

  • 特性大图:要从功能特性,非功能特性,全面介绍我们要解的问题域的特性诉求
  • 架构大图:通过清晰架构,让您快速进入 Nacos 世界
  • 业务大图:利用当前特性可以支持的业务场景,及其最佳实践
  • 生态大图:系统梳理 Nacos 和主流技术生态的关系
  • 优势大图:展示 Nacos 核心竞争力
  • 战略大图:要从战略到战术层面讲 Nacos 的宏观优势

Nacos生态图

image-20220424133403706

Nacos 无缝支持一些主流的开源生态:

  • 1:Spring Cloud
  • 2:Apache Dubbo and Dubbo Mesh
  • 3:Kubernetes and CNCF。

使用 Nacos 简化服务发现、配置管理、服务治理及管理的解决方案,让微服务的发现、管理、共享、组合更加容易。

Nacos架构

参考官方文档:https://nacos.io/zh-cn/docs/architecture.html

Nacos安装

关于Nacos安装,可以直接参考官网安装 https://nacos.io/zh-cn/docs/quick-start.html,我们接下来学习基于Docker实现Nacos单机安装和基于Docker实现Nacos集群安装。

采用Docker-Compose安装Nacos要更方便,所以大家可以先学习一下Docker-Compose

Nacos安装模式有多种:

  • 单机模式 Derby:
    • 这种模式是极简模式,数据没法持久化存储,适合开发环境。
  • 单机模式 MySQL:(支持MySQL5.7和MySQL8.0,我们这里学习MySQL5.7安装模式,因为当前主流还是MySQL5.7)
    • 这种模式支持数据持久化,数据会存储到MySQL中,适合生产环境。
  • 集群模式:
    • 这种模式适合生产环境并且服务节点个数较多,不存在单点故障问题。

克隆项目:

1
2
3
4
5
6
7
8
9
10
11
12
13
#克隆项目 

git clone https://github.com/nacos-group/nacos-docker.git

#进入nacos-docker目录

cd nacos-docker

#查看文件列表

ce example

ll

example 中文件列表如下:

image-20220424133618992

Nacos Derby安装

derby是java的内存数据库,还是很方便的,可以直接一件启动nacos,但是是单机版,配置也无法存储,只是用来浅尝一番

安装Nacos生产环境会结合prometheus和grafana实现对Nacos的监控,我们这里不做它们的监控操作,需要将docker-compose的配置注释掉,修改example/standalone-derby.yaml ,配置如下:

image-20220424133652995

进入到 example 目录下执行如下命令:

1
2
3
4
5
6
# 启动
docker-compose -f standalone-derby.yaml up -d
# 停止
docker-compose -f standalone-derby.yaml stop
# 删除
docker-compose -f standalone-derby.yaml down

安装完成后,我们可以直接访问它的控制台 http://192.168.200.129:8848/nacos,账号密码都是nacos,效果如下:

image-20220424133730599

关于控制台的使用,我们在后面详细讲解。

Nacos MySQL版安装

1、我们先停掉之前安装的服务,并删掉之前的容器

1
2
3
4
5
6
#停掉容器 
docker stop nacos-standalone
#删掉容器
docker rm nacos-standalone
#或者一步到位
docker-compose -f standalone-derby.yaml down

2、使用 example/standalone-mysql-5.7.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
version: "2"
services:
nacos:
image: nacos/nacos-server:${NACOS_VERSION}
container_name: nacos-standalone-mysql
env_file:
- ../env/nacos-standlone-mysql.env
volumes:
- ./standalone-logs/:/home/nacos/logs
- ./init.d/custom.properties:/home/nacos/init.d/custom.properties
ports:
- "8848:8848"
- "9848:9848"
- "9555:9555"
depends_on:
- mysql
restart: on-failure
mysql:
container_name: mysql
image: nacos/nacos-mysql:5.7
env_file:
- ../env/mysql.env
volumes:
- ./mysql:/var/lib/mysql
ports:
- "3306:3306"
# prometheus:
# container_name: prometheus
# image: prom/prometheus:latest
# volumes:
# - ./prometheus/prometheus-standalone.yaml:/etc/prometheus/prometheus.yml
# ports:
# - "9090:9090"
# depends_on:
# - nacos
# restart: on-failure
# grafana:
# container_name: grafana
# image: grafana/grafana:latest
# ports:
# - 3000:3000
# restart: on-failure

3、在 example 目录下使用 docker-compose 命令启动

1
docker-compose -f standalone-mysql-5.7.yaml up -d

数据库脚本在nacos的 nacos\config\src\main\resources\META-INF 工程中有对应脚本,也可以通过下面的网址获取SQL:

https://github.com/alibaba/nacos/blob/develop/config/src/main/resources/META-INF/nacos-db.sql

创建数据库 nacos_config ,并执行初始化操作,初始化脚本后,数据库数据如下:

image-20220424134059927

4、新建配置,查看存储

此时访问后台 http://192.168.200.129:8848/nacos ,并创建一个配置信息,如下图:

image-20220424134112488

此时我们可以随意填写些配置,如下图:

image-20220424134122725

配置填写后,数据会添加到数据库中的 config_info 表中,如下表:

image-20220424134131286

Docker 安装Nacos

获取nacos,下载到~/nacos

1
2
3
4
5
mkdir ~/nacos
cd ~/nacos
wget https://github.com/nacos-group/nacos-docker/archive/refs/tags/v2.0.3.zip
unzip v2.0.3.zip
mv nacos-docker-2.0.3 nacos-docker

我们也可以直接采用Docker的方式安装Nacos,这样安装不要克隆项目也不需要修改配置文件,会更方便,安装命令如下:

1
2
3
4
5
6
7
8
9
10
11
docker run -d \ 
-e MODE=standalone \
-e SPRING_DATASOURCE_PLATFORM=mysql \
-e MYSQL_SERVICE_HOST=192.168.200.129 \
-e MYSQL_SERVICE_PORT=3306 \
-e MYSQL_SERVICE_USER=root \
-e MYSQL_SERVICE_PASSWORD=root \
-e MYSQL_SERVICE_DB_NAME=nacos_config \
-p 8848:8848 \
--restart=always \
--name nacos nacos/nacos-server

注意:需要提取准备好msyql数据库

访问 http://192.168.200.129:8848/nacos 效果如下:

image-20220424134629493

Nacos功能应用

Nacos服务注册与发现

服务发现是微服务架构体系中最关键的组件之一。如果尝试着用手动的方式来给每一个客户端来配置所有服务提供者的服务列表是一件非常困难的事,而且也不利于 服务的动态扩缩容。Nacos Discovery Starter 可以帮助您将服务自动注册到 Nacos 服务端并且能够动态感知和刷新某个服务实例的服务列表。除此之外,Nacos Discovery Starter 也将服务实例自身的一些元数据信息-例如host,port,健康检查URL,主页等-注册到 Nacos 。

接下来我们学习一下如何使用Nacos作为服务的注册中心,并实现服务注册和服务发现。当前项目开发主流技术是SpringBoot,我们就讲解基于SpringBoot如何使用Nacos实现服务注册与发现。

image-20220424180551470

如上图,我们以打车项目为例,当用户打车成功的时候,会调用 hailtaxi- order , hailtaxi-order 会下订单,同时修改司机状态,修改状态需要调用hailtaxi-driver ,我们把 hailtaxi-order 服务和 hailtaxi-driver 服务都注册到Nacos中,并实现服务调用,如果整个调用都没有问题,就说明服务注册发现没问题。

关于SpringCloud Alibaba和SpringBoot的版本,我们可以通过 https://start.spring.io/actuator/info 查看。

image-20220424180622691

SpringBoot我们修改成2.2.10版本,SpringCloud版本改成Hoxton.SR11版本。要使用Nacos需要引入依赖包,并且将Consul依赖移除。

1、注释掉 hailtaxi-api 和 hailtaxi-gateway中的consul依赖包:

1
2
3
4
5
6
7
8
9
10
 <!--consul-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-config</artifactId>
<version>2.2.1.RELEASE</version>
</dependency>

另外在 hailtaxi-order 和 hailtaxi-pay 中也注意要注释掉consul的相关依赖坐标

2、在 hailtaxi-api 和 hailtaxi-gateway 中引入如下依赖:

1
2
3
4
5
6
7
8
9
10
11
<!--更换为nacos-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>

3、配置 bootstrap.yml

​ bootstrap.yml ( bootstrap.properties )用来在程序引导时执行,应用于更加早期配置信息读取,如可以使用来配置 application.yml 中使用到参数等。

​ application.yml ( application.properties ) 应用程序特有配置信息,可以用来配置后续各个模块中需使用的公共参数等。

​ bootstrap.yml 先于 application.yml 加载。

​ 项目中如果使用Nacos,需要使用 bootstrap.yml ,因此我们需要将项目中的 application.yml 换成 bootstrap.yml 。

​ 在 hailtaxi-gateway 中创建 bootstrap.yml 配置文件,添加如下配置:

1
2
3
4
5
6
7
8
9
10
11
spring:
application:
name: hailtaxi-gateway
cloud:
# Nacos配置
nacos:
discovery:
# nacos 服务注册地址
server-addr: 192.168.213.130:8848
config:
server-addr: 192.168.213.130:8848

4、在项目启动类上添加开启服务发现的注解

1
2
3
4
5
6
7
8
@EnableDiscoveryClient
@SpringBootApplication
public class GatewayApplication implements CommandLineRunner {

public static void main(String[] args) {
SpringApplication.run(GatewayApplication.class,args);
}
}

此时我们运行 hailtaxi-gateway ,可以发现在Nacos中已经注册了相关服务,如下图:

image-20220424181829787

5、按相同方式配置 hailtaxi-driver , hailtaxi-order , hailtaxi-pay 三个项目

6、启动各个服务,查看服务列表

注意:启动前,将 hailtaxi-driver 中的异常错误处理掉!!

image-20220424181905608

7、测试

使用postman测试:

http://localhost:8001/driver/info/1

http://localhost:8001/order

此时服务调用没有任何问题,说明服务注册和服务发现正常

负载均衡

image-20220424185451863

如上图,如果此时用户打车成功,会调用订单服务,订单服务会修改司机状态,此时会调用 hailtaxi-driver ,如果是生产环境,每个节点一定是集群状态,比如有2个 hailtaxi-driver 节点,此时如何实现负载均衡?

image-20220424185503628

我们可以发现服务注册到Nacos中,有一个权重属性,这个权重属性就是Nacos的负载均衡机制,此时需要用到Nacos的负载均衡策略 NacosRule ,我们可以在程序中先初始化负载均衡算法,再到bootstrap.yml中配置权重。

1、初始化负载均衡算法

在 hailtaxi-order 中初始化负载均衡算法:

1
2
3
4
5
6
7
8
9
10
11
12
@Configuration
public class LoadBalanceConfiguration {
/**
* Nacos负载均衡算法
* @return
*/
@Bean
@Scope(value = "prototype")
public IRule nacosRule() {
return new NacosRule();
}
}

2、权重配置,因为是基于权重的

为了演示集群效果,我们启动多个 hailtaxi-driver ,并在bootstrap.yml 中配置权重

1
2
3
4
5
6
7
8
9
10
11
12
spring:
application:
name: hailtaxi-driver
cloud:
# Nacos配置
nacos:
discovery:
# nacos 服务注册地址
server-addr: 192.168.213.130:8848
weight: 1
config:
server-addr: 192.168.213.130:8848

这是默认18082的,对于18085和18086的配置如下

1
2
3
# 默认是1,另起2个服务
-Dspring.cloud.nacos.discovery.weight=2 -Dserver.port=18085
-Dspring.cloud.nacos.discovery.weight=3 -Dserver.port=18086

image-20220424185616474

image-20220424185621715

最终可以在nacos控制台查看服务的权重信息

image-20220424185639297

为了方便查看调用了哪个节点,我们把每个节点的端口号输出,我们请求打车测试

http://localhost:8001/order ,执行6次,应该会按1:2:3 的比例来调用

注意:如果 hailtaxi-order 模块通过 openfeign 调用超时,可以设置它的超时时间

由于 openfeign 底层默认使用的是 ribbon ,故可以采用如下配置:

1
2
3
4
5
ribbon: 
# 处理请求的超时时间
ReadTimeout: 5000
# 连接建立的超时时长
ConnectTimeout: 5000

如果我们把算法 NacosRule 注释,默认就是和 Ribbon 集成,和 Ribbon 默认开启,可以通过如下配置实现关闭或开启:

1
2
3
ribbon: 
nacos:
enabled: true

配置中心

Nacos 提供用于存储配置和其他元数据的 key/value 存储,为分布式系统中的外部化配置提供服务器端和客户端支持。使用 Spring Cloud AlibabaNacos Config,您可以在 Nacos Server 集中管理你 Spring Cloud 应用的外部属性配置。

Spring Cloud Alibaba Nacos Config 是 Config Server 和 Client 的替代方案,客户端和服务器上的概念与 Spring Environment 和 PropertySource 有着一致的抽象,在特殊的 bootstrap 阶段,配置被加载到 Spring 环境中。当应用程序通过部署管道从开发到测试再到生产时,您可以管理这些环境之间的配置,并确保应用程序具有迁移时需要运行的所有内容。

配置管理

1、我们可以在Nacos控制台配置项目的配置数据,先打开Nacos控制台,

在 命名空间 中点击 新建命名空间 ,如下图:

命名空间可以用于数据隔离,默认在public 命名空间中!!!

image-20220424185814983

2、修改 hailtaxi-driver 中的 bootstrap.yml 配置文件,指定命名空间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
spring:
application:
name: hailtaxi-driver
cloud:
# Nacos配置
nacos:
discovery:
# nacos 服务注册地址
server-addr: 192.168.213.130:8848
weight: 1
# 指定命名空间的id
namespace: 1ebba5f6-49da-40cc-950b-f75c8f7d07b3
config:
server-addr: 192.168.213.130:8848
# 指定命名空间的id
namespace: 1ebba5f6-49da-40cc-950b-f75c8f7d07b3
# 如果将配置信息保存到nacos,指定配置文件扩展名
file-extension: yaml
# nacos config dataid name 默认加载 ${spring.application.name}.${file-extension},当然也可指定
#name: hailtaxi-driver.yaml

# 最佳实践就是我们nacos上的文件铭文应用名.yaml,最省事了

image-20220424185926035

3、在nacos中添加配置

在 配置管理>配置列表 中添加,如下图:

image-20220424185937239

将hailtaxi-driver原来在 application.yml 中的配置全部填写到下面表单中,然后将 application.yml 配置文件删除,或者删除所有配置,如下图:

Data ID:默认加载${spring.application.name}.${file-extension}另外对于web服务的端口 server.port 一般留在 application.yml 中

image-20220424190006867

注意 Data ID 的配置。

4、启动测试:

我们启动 hailtaxi-driver 服务,默认加载${spring.application.name}.${file-extension:properties} 配置,加载完成后,配置数据会生效,并访问

http://localhost:18081/driver/info/1 测试,效果如下:

image-20220424190024427

5、配置 hailtaxi-gateway , hailtaxi-order , hailtaxi-pay 等几个项目

按照相同的方式,将各个项目中 application.yml 中的配置配置到nacos中,并注释/删除掉本地 application.yml 中的配置(端口的配置一般留在项目中)

最后启动所有项目,测试走网关的API。

多环境切换

spring-cloud-starter-alibaba-nacos-config 在加载配置的时候,不仅仅加载以 dataid 为 ${spring.application.name}.${file-extension:properties} 为前缀的基础配置,还加载dataid为 ${spring.application.name}-${profile}.${file-extension:properties} 的基础配置。在日常开发中如果遇到多套环境下的不同配置,可以通过Spring 提供的 ${spring.profiles.active} 这个配置项来配置。

1、在nacos中创建 hailtaxi-driver-dev.yaml 作为开发环节的配置,创建 hailtaxi-driver-test.yaml 作为测试环境的配置文件,创建如下:

hailtaxi-driver-test.yaml

image-20220424190122519

hailtaxi-driver-dev.yaml

image-20220424190134688

这样多出来两份配置信息如下:

image-20220424190143527

2、在 hailtaxi-driver 项目的 bootstrap.yml 中激活配置,如下:

image-20220424190153209

启动项目,看是否能正常启动!!!

image-20220424190203817

访问:http://localhost:18081/driver/info/1

image-20220424190215854

3、将 active 换成 test ,启动后访问:http://localhost:18081/driver/info/1

image-20220424190236970

共享/扩展 配置

在实际的业务场景中应用和共享配置间的关系可能, Spring Cloud Alibaba Nacos Config 从 0.2.1 版本后,可支持自定义 Data Id 的配置,通过它可以解决配置共享问题

共享配置:

每个服务与每个服务直接共享的信息

1、创建一个Data ID 为: datasource.yaml 的配置,用于配置数据库连接,如下图:

image-20220424190300515

2、在 hailtaxi-driver-dev.yaml 中将数据库的配置信息删除,如下图:

image-20220424190309796

3、在 bootstrap.yml 中引入共享配置需要使用 shared-configs 属性,配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
spring:
cloud:
# Nacos配置
nacos:
discovery:
# nacos 服务注册地址
server-addr: 192.168.213.130:8848
namespace: b24962bb-2277-40b2-a08d-a9da0053d135
config:
server-addr: 192.168.213.130:8848
namespace: b24962bb-2277-40b2-a08d-a9da0053d135
# 如果将配置信息保存到nacos,指定配置文件扩展名
file-extension: yaml
# nacos config dataid name 默认加载 ${spring.application.name}.${file-extension},当然也可指定
#name: hailtaxi-driver.yaml
# 加载共享配置信息
shared-configs:
- dataId: datasource.yaml
refresh: true

image-20220424194101840

配置信息的加载由NacosConfigProperties完成

image-20220424194123318

image-20220424194128348

启动测试访问 http://localhost:18081/driver/info/1,此时能访问数据库,

同时也能获取 hailtaxi-driver-dev.yaml 中的配置,效果如下:

image-20220424194142558

扩展配置:

每个服务与每个服务特有的信息,好像没什么用,完全可以放到主配置文件里,应用名-环境.yaml

nacos除了支持读取以上所支持的的配置信息外,用户还可以自定义扩展配置

1、在nacos中 创建一个Data ID 为: custom.yaml 的配置,配置信息如下图:

1
2
app:
version: v1.0

image-20220424194331280

2、在 hailtaxi-driver 模块的 bootstrap.yml 中加载该配置,使用extension-configs 属性,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
spring:
cloud:
# Nacos配置
nacos:
discovery:
# nacos 服务注册地址
server-addr: 192.168.213.130:8848
namespace: b24962bb-2277-40b2-a08d-a9da0053d135
config:
server-addr: 192.168.213.130:8848
namespace: b24962bb-2277-40b2-a08d-a9da0053d135
# 如果将配置信息保存到nacos,指定配置文件扩展名
file-extension: yaml
# nacos config dataid name 默认加载 ${spring.application.name}.${file-extension},当然也可指定
#name: hailtaxi-driver.yaml
# 加载共享配置信息
shared-configs:
- dataId: datasource.yaml
refresh: true
# 加载扩展配置
extension-configs:
- dataId: custom.yaml

3、在应用程序中去读该配置,在 DriverController 中添加如下代码

1
2
3
4
5
6
@Value("${app.version}") 
private String version;
@GetMapping("/appinfo")
public String getAppInfo() {
return version;
}

4、测试访问:http://localhost:18081/driver/appinfo 查看结果

配置刷新

配置自动刷新对程序来说非常重要,Nacos支持配置自动刷新,并且提供了多种刷新机制。

Environment自动刷新spring-cloud-starter-alibaba-nacos-config 支持配置的动态更新,Environment能实时更新到最新的配置信息,启动 Spring Boot 应用测试的代码如下:

1、在 hailtaxi-driver-dev.yaml 中添加一个配置项,如下

1
2
app:
name: itheima

image-20220424194536780

2、在 hailtaxi-driver 的启动类中添加一段测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@SpringBootApplication 
@EnableDiscoveryClient
@MapperScan(basePackages = "com.itheima.driver.mapper")
public class DriverApplication {
public static void main(String[] args) {
ApplicationContext applicationContext = SpringApplication.run(DriverApplication.class,args);
while(true) {
//当动态配置刷新时,会更新到 Enviroment中,
String name = applicationContext.getEnvironment().getProperty("app.name");
String version = applicationContext.getEnvironment().getProperty("app.version");
System.out.println("app.name="+name+";app.version=" + version);
try {
TimeUnit.SECONDS.sleep(5); // 每隔5秒中从 Enviroment中获取一下
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

3、启动查看控制台输出,

4、在nacos中修改 app.name 和 app.version ,再次查看控制台输出

5、默认情况下, shared-configs 和 extension-configs 是不自动刷新的,【其他配置可以】,如果要支持刷新,需要添加 refresh 属性,如下

1
2
3
4
5
6
7
8
# 加载共享配置信息  
shared-configs[0]:
dataId: datasource.yaml
refresh: true
# 加载扩展配置
extension-configs:
- dataId: custom.yaml
refresh: true

@Value刷新

注意,如果知识配置了refresh: true,nacos会推送到程序后台,但是springboot不会刷新,还要配置RefreshScope才能达到自动刷新

程序中如果写了 @Value 注解,可以采用 @RefreshScope 实现刷新,只需要在指定类上添加该注解即可,如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@RestController
@RequestMapping(value = "/driver")
@RefreshScope
public class DriverController {
@Value("${app.version}")
private String version;
@Value("${app.name}")
private String appname;
@GetMapping("/appinfo")
public String getAppInfo() {
return appname + ";"+version;
}
}
# 或者通过springboot的,也是可以拿到最新的配置信息的
@Autowired
Environment environment;
environment.getProperty("app.version")

# 用配置类的方式,也能自动更新
@Data
@Configuration
@ConfigurationProperties(prefix="app)
public class AppProperteis {
private String version;
}

3种方式

启动测试: http://localhost:18081/driver/appinfo

修改完配置信息后再次测试,查看是否能动态刷新!

灰度发布

灰度配置指的是指定部分客户端IP进行新配置的下发,其余客户端配置保持不变,用以验证新配置对客户端的影响,保证配置的平稳发布。灰度配置是生产环境中一个比较重要的功能,对于保证生产环境的稳定性非常重要。在1.1.0中,Nacos支持了以I为粒度的灰度配置,具体使用步骤如下:

1、将 hailtaxi-driver 项目打包,上传到服务器,并启动

1
java -jar hailtaxi-driver.jar

2、本地也启动

查看 nacos 中的服务列表

image-20220424194751072

image-20220424194756787

可以看到有一个实例是来自远程服务器上的

3、在nacos中,找到 hailtaxi-driver-dev.yaml 中,编辑配置,勾选“Beta发布”,在文本框里填入要下发配置配置的IP,多个IP用逗号分隔,操作如下:

image-20220424194809305

配置Beta发布,针对192.168.200.129生效

修改配置内容,点击“发布Beta”按钮,即可完成灰度配置的发布,点击“发 布Beta”后,“发布Beta”按钮变灰,此时可以选择“停止Beta”或者“发布”。“停止Beta”表示取消停止灰度发布,当前灰度发布配置的IP列表和配置内容都会删除,页面回到正常发布的样式。“发布”表示将灰度配置在所有客户端生效,之前的配置也会被覆盖,同时页面回到正常发布的样式:

image-20220424194823372

4、测试:访问:

http://localhost:18081/driver/appinfo

http://192.168.200.129:18081/driver/appinfo

访问192.168.200.129就会访问到Beta中配置的结果

Nacos集群

在生产环境Nacos一般都不是单节点存在,如果是单节点,很容易存在单点故障,因此生产环境一般都以集群形式存在

集群架构

Nacos集群模式有多种,但其实无论哪种都是将3个Nacos服务进行集群发布。

集群需要采用数据共享模式进行配置信息共享,也就是要将数据存入到同一个数据库中,我们对每种集群模式进行说明:

1)直连模式

http://ip1:port/openAPI 直连ip模式,机器挂则需要修改ip才可以使用。

比如我现在有3个Nacos,每次操作数据的时候,都需要使用IP:端口的模式,这种模式效率极低,并且一旦节点故障无法识别,因此官方不推荐这种模式。

2)VIP模式

http://VIP:port/openAPI 挂载VIP模式,直连vip即可,下面挂server真实ip,可读性不好。

3)域名模式

http://nacos.com:port/openAPI 域名 + VIP模式,可读性好,而且换ip方便,因此官方推荐该模式,该模式的结构图如下:

image-20220424202007644

Nacos集群部署

我们搭建Nacos集群环境,集群环境配置如下:

image-20220424202030228

1)服务下载

https://github.com/alibaba/nacos/releases/ 下载需要的服务,

当前使用的是1.4.1, 我们可以选择下载1.4.1版本,版本如下:

image-20220424202045291

解压压缩包后,包结构如下:

image-20220424202057248

2)配置数据库

修改 conf/application.properties 配置数据库,配置如下:

image-20220424202108285

3)集群配置

修改 conf/cluster.conf 配置集群:

1
2
3
192.168.211.145:8848 
192.168.211.146:8848
192.168.211.147:8848

真集群ip不同,端口相同;伪集群ip相同,端口不同

4)节点同步

将修改好的服务分别上传到 192.168.211.146 、 192.168.211.147 服务:

1
2
scp -r nacos 192.168.211.146:/usr/local/server/alibaba/ 
scp -r nacos 192.168.211.147:/usr/local/server/alibaba/

5)启动每个节点

进入到每个节点 nacos/bin 目录下,执行启动:sh startup.sh

访问任何一个单节点,信息如下:

image-20220424202205863

注:

学习阶段,为了节省资源,可以直接使用 nacos-docker 中已配置好的集群启动 yml 进行启动

image-20220424203511375

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
version: "2"
services:
nacos1:
image: nacos/nacos-server:${NACOS_VERSION}
container_name: nacos1
networks:
nacos_net:
ipv4_address: 172.16.238.10
volumes:
- ./cluster-logs/nacos1:/home/nacos/logs
ports:
- "8848:8848"
- "9848:9848"
- "9555:9555"
env_file:
- ../env/nacos-ip.env
restart: on-failure
depends_on:
- mysql

nacos2:
image: nacos/nacos-server:${NACOS_VERSION}
container_name: nacos2
networks:
nacos_net:
ipv4_address: 172.16.238.11
volumes:
- ./cluster-logs/nacos2:/home/nacos/logs
ports:
- "8849:8848"
- "9849:9848"
env_file:
- ../env/nacos-ip.env
restart: always
depends_on:
- mysql
nacos3:
image: nacos/nacos-server:${NACOS_VERSION}
container_name: nacos3
networks:
nacos_net:
ipv4_address: 172.16.238.12
volumes:
- ./cluster-logs/nacos2:/home/nacos/logs
ports:
- "8850:8848"
- "9850:9848"
env_file:
- ../env/nacos-ip.env
restart: always
depends_on:
- mysql
mysql:
container_name: mysql
image: nacos/nacos-mysql:5.7

172.16.238.12是nacos 分配的固定ip,不用改

image-20220424203723696

1
docker-compose -f cluster-ip.yaml up -d

访问ip:8848/49/50,就可以访问了

客户端接入Nacos集群

客户端接入,不建议写多个节点的IP:Port,建议以域名的方式连接

Nacos,因此需要配置Nacos域名,在 192.168.211.145 节点中配置域名nacos.hailtaxi.com , nginx 配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
#负载均衡池配置 
upstream hailtaxi-nacos{
server 192.168.211.145:8848;
server 192.168.211.146:8848;
server 192.168.211.147:8848;
}
server {
listen 80;
server_name nacos.hailtaxi.com;
location / {
proxy_pass http://hailtaxi-nacos;
}
}

配置 nacos.hailtaxi.com 域名映射:

1
2
3
4
#修改hosts文件 
vi /etc/hots
#添加如下映射关系
192.168.211.145 hailtaxinacos.com

保存Nginx配置,并启动Nginx,修改本地

C:\Windows\System32\drivers\hosts ,添加如下配置:

1
192.168.211.145 hailtaxinacos.com

访问 http://hailtaxinacos.com/nacos ,效果如下:

image-20220424202351106

项目中使用:

方式1:

项目中Nacos地址可以把多个地址写到一起,用逗号隔开,如下代码:

image-20220424202403533

Nacos效果如下:

image-20220424202413433

ip万一更换,也是增加维护成本

方式2:

通过Nginx访问即可,配置如下:

192.168.211.145:80

image-20220424205339036

访问域名,域名到hosts中的配置(本地),走负载均衡(本地nginx)

nacos源码

  • 掌握Nacos服务通信源码原理
  • 掌握Nacos数据一致性原理
  • 掌握Nacos数据全量同步/增量同步源码

源码环境搭建

Nacos源码有很多值得我们学习的地方,为了深入理解Nacos,我们剖析源码,分析如下2个知识点:

  • 1:Nacos对注册中心的访问原理

  • 2:Nacos注册服务处理流程

1、从官方项目上克隆下来,并且检出 1.4.1 版本,导入idea。nacos源码环境搭建起来比较轻松,几乎不会报什么错误,导入后编译安装到本地环境即可

image-20220219093648098

编译之后可能找不到类,要将consistency下的java和grpc-java设置为生成目录

image-20220425210928193

2、找到 config 模块中找到 \resources\META-INF\nacos-db.sql ,在本地mysql中创建数据库 nacos-config ,将该脚本导入执行创建表。

3、找到 console 模块下的配置文件 application.properties ,修改数据库相关配置

1
2
3
4
5
6
7
8
9
#*************** Config Module Related Configurations ***************# 
### If use MySQL as datasource:
spring.datasource.platform=mysql
### Count of DB:
db.num=1
### Connect URL of DB:
db.url.0=jdbc:mysql://127.0.0.1:3306/nacos-config? characterEncoding=utf8&connectTimeout=1000&socketTimeout=3 000&autoReconnect=true&useUnicode=true&useSSL=false&server Timezone=UTC
db.user.0=nacos
db.password.0=nacos

4、找到 console 模块下的启动类,启动nacos的服务端,启动时添加启动参数,指定启动模式为非集群启动

1
-Dnacos.standalone=true

image-20220219093816493

5、访问本地的nacos:http://localhost:8848/nacos

至此,源码环境搭建成功!

nacos 客户端

首先要搞清楚:nacos的客户端其实在我们自己的服务里,我们引入了nacos的相关坐标依赖,nacos客户端以jar包的形式在我们的服务中工作

image-20220219093835919

对于nacos的客户端,它都要帮我们实现什么功能呢?

它的主要工作有:服务注册、服务发现、服务下线操作、服务订阅操作等相关操作。

客户端与注册中心服务端的交互,主要集中在服务注册、服务下线、服务发现、订阅某个服务,其实使用最多的就是服务注册和服务发现,下面我会从源码的角度分析一下这四个功能。

在Nacos源码中 nacos-example 中 com.alibaba.nacos.example.NamingExample 类分别演示了这4个功能的操作,我们可以把它当做入口,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.example;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingFactory;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.AbstractEventListener;
import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.api.naming.listener.NamingEvent;

import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* Nacos naming example.
*
* @author nkorange
*/
public class NamingExample {

/*
nacso client 完成服务的注册,发现,取消注册,订阅请按照:step1,step2,step3,step4依次来看
集群模式下Distro协议数据同步
1、集群节点的互相发现:com.alibaba.nacos.core.cluster.ServerMemberManager.ServerMemberManager
2、启动后开始分区数据定时校验和启动加载全量快照数据:com.alibaba.nacos.core.distributed.distro.DistroProtocol.DistroProtocol
3、运行过程中的增量数据同步:节点数据发生变更后进行同步
比如:有服务注册后 在nacos server端会发生数据同步,入口:com.alibaba.nacos.naming.controllers.InstanceController.register
*/
public static void main(String[] args) throws NacosException {

Properties properties = new Properties();
/*properties.setProperty("serverAddr", System.getProperty("serverAddr"));
properties.setProperty("namespace", System.getProperty("namespace"));*/

properties.setProperty("serverAddr", "127.0.0.1:8848");
properties.setProperty("namespace", "demo");//在控制台创建好命名空间
/**
* 获取命名服务对象: NacosNamingService 每个命名空间对应一个 命名服务对象
*/
NamingService naming = NamingFactory.createNamingService(properties);
/**
* (step1:)注册服务实例 registerInstance方法的定义有很多
*/
naming.registerInstance("nacos.test.1", "192.168.200.10", 8888);
naming.registerInstance("nacos.test.1", "192.168.200.10", 7777);
naming.registerInstance("nacos.test.2", "DEFAULT_GROUP","192.168.200.10", 9999);
/**
* 获取/发现服务实例
* (step2 )
*/
System.out.println(naming.getAllInstances("nacos.test.1"));

/**
* 服务注销(下线)
* (step3)
*/
naming.deregisterInstance("nacos.test.1", "192.168.200.10", 7777);

System.out.println(naming.getAllInstances("nacos.test.1"));


Executor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("test-thread");
return thread;
}
});

/**
* 服务订阅
* (step4)
*/
naming.subscribe("nacos.test.3", new AbstractEventListener() {

//EventListener onEvent is sync to handle, If process too low in onEvent, maybe block other onEvent callback.
//So you can override getExecutor() to async handle event.
@Override
public Executor getExecutor() {
return executor;
}

@Override
public void onEvent(Event event) {
System.out.println(((NamingEvent) event).getServiceName());
System.out.println(((NamingEvent) event).getInstances());
}
});
}
}

这里要在本地nacos创建一个命名空间demo

image-20220425211543059

服务注册

我们沿着案例中的服务注册方法调用找到 nacos-api 中的NamingService.registerInstance() 并找到它的实现类和方法com.alibaba.nacos.client.naming.NacosNamingService ,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* 完整的服务注册方法
* @param serviceName name of service 服务名称
* @param groupName group of service 分组名称
* @param ip instance ip 实例ip
* @param port instance port 实例端口
* @param clusterName instance cluster name 集群名称
* @throws NacosException
*/
@Override
public void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName)
throws NacosException {
// 创建一个服务实例对象,填充相关属性
Instance instance = new Instance();
instance.setIp(ip);
instance.setPort(port);
instance.setWeight(1.0);
instance.setClusterName(clusterName);
//注册服务实例
registerInstance(serviceName, groupName, instance);
}

@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
// 将groupName和serviceName组装到一起变成:groupName@serviceName
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
if (instance.isEphemeral()) {
//创建心跳消息 并添加心跳
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
/**
* 对每个服务 添加心跳任务 默认5s 一次心跳
* 心跳如果发现 nacos server 端不存在服务资源则自动重新注册
*/
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
// 注册该服务实例
serverProxy.registerService(groupedServiceName, groupName, instance);
}

注册主要做了两件事,第一件事:为注册的服务设置一个定时心跳任务。

第二件事:将服务注册到服务端。

1:启动一个定时心跳任务,时间间隔为5s,如果服务正常,不做处理,如果不正常,重新注册

2:发送http请求给注册中心服务端,调用服务注册接口,注册服务

上面代码我们可以看到定时任务添加,但并未完全看到远程请求,serverProxy.registerService() 方法如下,会先封装请求参数,接下来调用 reqApi() 而 reqApi() 最后会调用 callServer() ,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* register a instance to service with specified instance properties.
* 向nacos server 注册一个服务实例
* @param serviceName name of service 服务名称
* @param groupName group of service 分组名称
* @param instance instance to register 实例对象(包含了实例诸多信息)
* @throws NacosException nacos exception
*/
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
instance);
// 组装一些参数信息,用以向 nacos server 注册
final Map<String, String> params = new HashMap<String, String>(16);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp()); // 服务实例IP
params.put("port", String.valueOf(instance.getPort())); // 服务实例端口
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
/**
* 向nacos server 进行注册
* 所谓的向 nacos server 注册,其实就是nacos server 提供了一个 REST API,
* 客户端通过调用这些API,将参数信息传递给它,nacos server 保存这些参数信息,这样就完成了注册
*
* 这里nacos server 提供的用以服务注册的API是
* POST /nacos/v1/ns/instance
*
* 下一个入口:nacos server 接收注册请求完成注册
* 在`naming`模块中找到:com.alibaba.nacos.naming.controllers.InstanceController#register(javax.servlet.http.HttpServletRequest)
*/
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);

}

往下跟reqApi最后掉到callServer方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* Call server.
* 调用 nacos server api
* @param api api
* @param params parameters
* @param body body
* @param curServer ?
* @param method http method
* @return result
* @throws NacosException nacos exception
*/
public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer,
String method) throws NacosException {
long start = System.currentTimeMillis();
long end = 0;
injectSecurityInfo(params);
Header header = builderHeader();
// 基于http 协议发起调用
String url;
if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
url = curServer + api;
} else {
if (!IPUtil.containsPort(curServer)) {
curServer = curServer + IPUtil.IP_PORT_SPLITER + serverPort;
}
url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
}
try {
/**
* 开始真正发起调用 接下来此处的代码就无需再往下跟了
*/
HttpRestResult<String> restResult = nacosRestTemplate
.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
end = System.currentTimeMillis();

MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode()))
.observe(end - start);

if (restResult.ok()) {
return restResult.getData();
}
if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
return StringUtils.EMPTY;
}
throw new NacosException(restResult.getCode(), restResult.getMessage());
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to request", e);
throw new NacosException(NacosException.SERVER_ERROR, e);
}
}

执行远程Http请求的对象是 NacosRestTemplate ,该对象就是封装了普通的Http请求,大家可以自己查阅一下。

服务发现

两个入口:

1、 NamingService.getAllInstances(serviceName)

我们沿着案例中的服务发现方法调用找到 nacos-api 中的NamingService.getAllInstances() 并找到它的实现类和方法com.alibaba.nacos.client.naming.NacosNamingService.getAllInstances() ,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 获取服务下的实例列表信息
* @param serviceName name of service 服务名称
* @param groupName group of service 分组名称
* @param clusters list of cluster 集群信息
* @param subscribe if subscribe the service 是否订阅(缓存)
* @return
* @throws NacosException
*/
@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
if (subscribe) {
// 获取 服务信息 走缓存
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
} else {
// 直接从nacos server 获取服务信息 不走缓存
serviceInfo = hostReactor
.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
}
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
}
return list;
}

上面的代码调用了 hostReactor.getServiceInfo() 方法,该方法会先调用 getServiceInfo0() 方法从本地缓存获取数据,缓存没有数据,就构建实例更新到Nacos,并从Nacos中获取最新数据, getServiceInfo0() 方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* 获取并订阅服务信息
* @param serviceName
* @param clusters
* @return
*/
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
// service key 服务key
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
// failover-mode
return failoverReactor.getService(key);
}
// 从本地缓存:serviceInfoMap 中获取服务信息
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
// 本地缓存中没有服务信息 则从远程更新服务信息
if (null == serviceObj) {
serviceObj = new ServiceInfo(serviceName, clusters);

serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());
// 从nacos server 获取服务信息 并更新到本地缓存 ******重点看这*******
updateServiceNow(serviceName, clusters);

updatingMap.remove(serviceName);

} else if (updatingMap.containsKey(serviceName)) {

if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER
.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}

// 添加定时更新服务信息的任务
scheduleUpdateIfAbsent(serviceName, clusters);

return serviceInfoMap.get(serviceObj.getKey());
}

updateServiceNow(serviceName, clusters); 主从从远程服务器获取更新数据,最终会调用 updateService() 方法,在该方法中完成远程请求和数据处理,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* Update service now.
* 更新服务信息
* @param serviceName service name
* @param clusters clusters
*/
public void updateService(String serviceName, String clusters) throws NacosException {
// 从本地缓存 serviceInfoMap 获取服务信息
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
// 从 nacos server 获取服务信息
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
if (StringUtils.isNotEmpty(result)) {
// 处理 nacos server 返回的数据 更新本地缓存的服务信息数据 result是一个json 数据
processServiceJson(result);
}
} finally {
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}
}

服务下线

我们沿着案例中的服务下线方法调用找到 nacos-api 中的NamingService.deregisterInstance() 并找到它的实现类和方法NacosNamingService.deregisterInstance() ,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 下线服务 (取消注册)
* @param serviceName name of service 服务名称
* @param groupName group of service 分组名称
* @param ip instance ip 实例ip
* @param port instance port 实例端口
* @param clusterName instance cluster name 实例所在集群名称
* @throws NacosException
*/
@Override
public void deregisterInstance(String serviceName, String groupName, String ip, int port, String clusterName)
throws NacosException {
// 创建实例对象
Instance instance = new Instance();
instance.setIp(ip);
instance.setPort(port);
instance.setClusterName(clusterName);
// 取消注册
deregisterInstance(serviceName, groupName, instance);
}
@Override
public void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException {
if (instance.isEphemeral()) {
// 取消服务心跳
beatReactor.removeBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), instance.getIp(),
instance.getPort());
}
// 取消服务注册
serverProxy.deregisterService(NamingUtils.getGroupedName(serviceName, groupName), instance);
}

服务下线方法比较简单,和服务注册做的事情正好相反,也做了两件事,第一件事:不在进行心跳检测。 第二件事:请求服务端服务下线接口。

服务订阅

我们可以查看订阅服务的案例,会先创建一个线程池,接下来会把线程池封装到监听器中,而监听器中可以监听指定实例信息,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 服务订阅
* (step4)
*/
naming.subscribe("nacos.test.3", new AbstractEventListener() {

//EventListener onEvent is sync to handle, If process too low in onEvent, maybe block other onEvent callback.
//So you can override getExecutor() to async handle event.
@Override
public Executor getExecutor() {
return executor;
}

@Override
public void onEvent(Event event) {
System.out.println(((NamingEvent) event).getServiceName());
System.out.println(((NamingEvent) event).getInstances());
}
});

我们沿着案例中的服务订阅方法调用找到 nacos-api 中的NamingService.subscribe() 并找到它的实现类和方法NacosNamingService.deregisterInstance() ,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* subscribe instancesChangeEvent.
* 订阅 服务实例的变化事件
* @param serviceName combineServiceName, such as 'xxx@@xxx'
* @param clusters clusters, concat by ','. such as 'xxx,yyy'
* @param eventListener custom listener
*/
public void subscribe(String serviceName, String clusters, EventListener eventListener) {
// 注册监听器
notifier.registerListener(serviceName, clusters, eventListener);
// 获取服务信息
getServiceInfo(serviceName, clusters);
}

此时会注册监听,注册监听,注册监听就是将当前的监听对象信息注入到listenerMap集合中,在监听对象的指定方法onEvent中可以读取实例信息,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* register listener.
* 注册监听器
* @param serviceName combineServiceName, such as 'xxx@@xxx'
* @param clusters clusters, concat by ','. such as 'xxx,yyy'
* @param listener custom listener
*/
public void registerListener(String serviceName, String clusters, EventListener listener) {
// 获取服务key值
String key = ServiceInfo.getKey(serviceName, clusters);
// 获取该服务的监听器集合
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
if (eventListeners == null) {
synchronized (lock) {
eventListeners = listenerMap.get(key);
if (eventListeners == null) {
eventListeners = new ConcurrentHashSet<EventListener>();
listenerMap.put(key, eventListeners);
}
}
}
// 将当前监听器添加到 该服务的监听器集合
eventListeners.add(listener);
}

nacos 服务端

注册中心服务端的主要功能包括,接收客户端的服务注册,服务发现,服务下线的功能,但是除了这些和客户端的交互之外,服务端还要做一些更重要的事情,就是我们常常会在分布式系统中听到的AP和CP,作为一个集群,nacos即实现了AP也实现了CP,其中AP使用的自己实现的Distro协议,而CP是采用raft协议实现的,这个过程中牵涉到心跳、选主等操作。

我们来学习一下注册中心服务端接收客户端服务注册的功能。

注册处理

我们先来学习一下Nacos的工具类 WebUtils ,该工具类在 nacos-core 工程下,该工具类是用于处理请求参数转化的,里面提供了2个常被用到的方法required() 和 optional() :

required方法通过参数名key,解析HttpServletRequest请求中的参数, 并转码为UTF-8编码。

optional方法在required方法的基础上增加了默认值,如果获取不到,则 返回默认值。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
   /**
* required方法通过参数名key,解析HttpServletRequest请求中的参 数,并转码为UTF-8编码。
*/
public static String required(final HttpServletRequest req, final String key) {
String value = req.getParameter(key);
if (StringUtils.isEmpty(value)) {
throw new IllegalArgumentException("Param '" + key + "' is required.");
}
String encoding = req.getParameter("encoding");
return resolveValue(value, encoding);
}

// optional方法在required方法的基础上增加了默认值,如果获取不 到,则返回默认值。
public static String optional(final HttpServletRequest req, final String key, final String defaultValue) {
if (!req.getParameterMap().containsKey(key) || req.getParameterMap().get(key)[0] == null) {
return defaultValue;
}
String value = req.getParameter(key);
if (StringUtils.isBlank(value)) {
return defaultValue;
}
String encoding = req.getParameter("encoding");
return resolveValue(value, encoding);
}

nacos 的 server 与 client 使用了 http 协议来交互,那么在 server 端必定提供了 http 接口的入口,并且在 core 模块看到其依赖了 spring boot starter ,所以它的http接口由集成了Spring的web服务器支持,简单地说就是像我们平时写的业务服务一样,有controller层和service层。

以OpenAPI作为入口来学习,我们找到 /nacos/v1/ns/instance 服务注册接口,在 nacos-naming 工程中我们可以看到 InstanceController 正是我们要找的对象,如下图:

image-20220425220042872

处理服务注册,我们直接找对应的POST方法即可,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
// 获取 namespaceId
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
// 从请求中获取 serviceName
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
//从 request 对象中解析出 服务实例对象 Instance
final Instance instance = parseInstance(request);
// nacos server 服务管理器进行服务实例注册
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}

如上图,该方法主要用于接收客户端注册信息,并且会校验参数是否存在问题,如果不存在问题就创建服务的实例,服务实例创建后将服务实例注册到Nacos中,注册的方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
// 如果没有对应的服务 则创建服务 有则不作操作
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
// 获取对应的服务
Service service = getService(namespaceId, serviceName);

if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
// 向服务中添加具体的服务实例
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

注册的方法中会先创建该实例对象,创建前先检查本地缓存是否存在该实例对象,如果不存在就创建,最后注册该服务,并且该服务会和实例信息捆绑到一起。

Distro协议介绍

Distro是阿里巴巴的私有协议, 是一种分布式一致性算法,目前流行的Nacos服务管理框架就采用了Distro协议。Distro 协议被定位为 临时数据的一致性协议:该类型协议, 不需要把数据存储到磁盘或者数据库,因为临时数据通常和服务器保持一个session会话, 该会话只要存在,数据就不会丢失

Distro 协议保证写必须永远是成功的,即使可能会发生网络分区。当网络恢复时,把各数据分片的数据进行合并。

Distro 协议具有以下特点:

  • 1:专门为了注册中心而创造出的协议;
  • 2:客户端与服务端有两个重要的交互,服务注册与心跳发送;
  • 3:客户端以服务为维度向服务端注册,注册后每隔一段时间向服务端发送一 次心跳,心跳包需要带上注册服务的全部信息,在客户端看来,服务端节点 对等,所以请求的节点是随机的;
  • 4:客户端请求失败则换一个节点重新发送请求;
  • 5:服务端节点都存储所有数据,但每个节点只负责其中一部分服务,在接收 到客户端的“写”(注册、心跳、下线等)请求后,服务端节点判断请求的服务是否为自己负责,如果是,则处理,否则交由负责的节点处理;
  • 6:每个服务端节点主动发送健康检查到其他节点,响应的节点被该节点视为 健康节点;
  • 7:服务端在接收到客户端的服务心跳后,如果该服务不存在,则将该心跳请 求当做注册请求来处理;
  • 8:服务端如果长时间未收到客户端心跳,则下线该服务;
  • 9:负责的节点在接收到服务注册、服务心跳等写请求后将数据写入后即返 回,后台异步地将数据同步给其他节点;
  • 10:节点在收到读请求后直接从本机获取后返回,无论数据是否为最新。

Distro寻址

Distro协议主要用于nacos 服务端节点之间的相互发现,nacos使用寻址机制来实现服务端节点的管理。在Nacos中,寻址模式有三种:

  • 单机模式(StandaloneMemberLookup)
  • 文件模式(FileConfigMemberLookup)
  • 服务器模式(AddressServerMemberLookup)

三种寻址模式如下图:

image-20220425220214350

单机模式

在 com.alibaba.nacos.core.cluster.lookup.LookupFactory 中有创建寻址方式,可以创建集群启动方式、单机启动方式,不同启动方式就决定了不同寻址模式,如果是集群启动,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public static MemberLookup createLookUp(ServerMemberManager memberManager) throws NacosException {
/*
判断是否是集群模式启动
*/
if (!EnvUtil.getStandaloneMode()) {
// 获取配置中的:nacos.core.member.lookup.type
String lookupType = EnvUtil.getProperty(LOOKUP_MODE_TYPE);
// 选择 类型
LookupType type = chooseLookup(lookupType);
// 根据类型创建 MemberLookup 默认 FILE_CONFIG
LOOK_UP = find(type);
currentLookupType = type;
} else {
// 非集群,standalone
LOOK_UP = new StandaloneMemberLookup();
}
LOOK_UP.injectMemberManager(memberManager);
Loggers.CLUSTER.info("Current addressing mode selection : {}", LOOK_UP.getClass().getSimpleName());
return LOOK_UP;
}

private static MemberLookup find(LookupType type) {
// 创建 FileConfigMemberLookup 对象
if (LookupType.FILE_CONFIG.equals(type)) {
LOOK_UP = new FileConfigMemberLookup();
return LOOK_UP;
}
// 创建 AddressServerMemberLookup 对象
if (LookupType.ADDRESS_SERVER.equals(type)) {
LOOK_UP = new AddressServerMemberLookup();
return LOOK_UP;
}
// unpossible to run here
throw new IllegalArgumentException();
}

单节点寻址模式会直接创建 StandaloneMemberLookup 对象,而文件寻址模式会创建 FileConfigMemberLookup 对象,服务器寻址模式会创建AddressServerMemberLookup ;

文件寻址模式

image-20220425220245984

文件寻址模式主要在创建集群的时候,通过 cluster.conf 来配置集群,程序可以通过监听 cluster.conf 文件变化实现动态管理节点,FileConfigMemberLookup 源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.core.cluster.lookup;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.core.cluster.AbstractMemberLookup;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberUtil;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.alibaba.nacos.sys.file.FileChangeEvent;
import com.alibaba.nacos.sys.file.FileWatcher;
import com.alibaba.nacos.sys.file.WatchFileCenter;
import com.alibaba.nacos.core.utils.Loggers;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

/**
* Cluster.conf file managed cluster member node addressing pattern.
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public class FileConfigMemberLookup extends AbstractMemberLookup {

private FileWatcher watcher = new FileWatcher() {
@Override
public void onChange(FileChangeEvent event) {
readClusterConfFromDisk();
}

@Override
public boolean interest(String context) {
return StringUtils.contains(context, "cluster.conf");
}
};

/**
* 启动
* @throws NacosException
*/
@Override
public void start() throws NacosException {
if (start.compareAndSet(false, true)) {
// 从磁盘读取 cluster.conf 配置文件获取集群节点信息
readClusterConfFromDisk();
// Use the inotify mechanism to monitor file changes and automatically
// trigger the reading of cluster.conf
try {
// 监听文件的变更
WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);
} catch (Throwable e) {
Loggers.CLUSTER.error("An exception occurred in the launch file monitor : {}", e.getMessage());
}
}
}

@Override
public void destroy() throws NacosException {
WatchFileCenter.deregisterWatcher(EnvUtil.getConfPath(), watcher);
}

private void readClusterConfFromDisk() {
Collection<Member> tmpMembers = new ArrayList<>();
try {
// 读取配置文件 获取集群节点信息
List<String> tmp = EnvUtil.readClusterConf();
// 将每个节点配置转换成 Member 对象
tmpMembers = MemberUtil.readServerConf(tmp);
} catch (Throwable e) {
Loggers.CLUSTER
.error("nacos-XXXX [serverlist] failed to get serverlist from disk!, error : {}", e.getMessage());
}
// 寻址之后的一些操作
afterLookup(tmpMembers);
}
}

服务器寻址模式

使用地址服务器存储节点信息,会创建 AddressServerMemberLookup ,服务端定时拉取信息进行管理;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.core.cluster.lookup;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.http.HttpClientBeanHolder;
import com.alibaba.nacos.common.http.client.NacosRestTemplate;
import com.alibaba.nacos.common.http.param.Header;
import com.alibaba.nacos.common.http.param.Query;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.utils.ExceptionUtil;
import com.alibaba.nacos.core.cluster.AbstractMemberLookup;
import com.alibaba.nacos.core.cluster.MemberUtil;
import com.alibaba.nacos.core.utils.GenericType;
import com.alibaba.nacos.core.utils.GlobalExecutor;
import com.alibaba.nacos.core.utils.Loggers;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.apache.commons.lang3.StringUtils;

import java.io.Reader;
import java.io.StringReader;
import java.util.HashMap;
import java.util.Map;

/**
* Cluster member addressing mode for the address server.
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public class AddressServerMemberLookup extends AbstractMemberLookup {

private final GenericType<RestResult<String>> genericType = new GenericType<RestResult<String>>() {
};

public String domainName;

public String addressPort;

public String addressUrl;

public String envIdUrl;

public String addressServerUrl;

private volatile boolean isAddressServerHealth = true;

private int addressServerFailCount = 0;

private int maxFailCount = 12;

private final NacosRestTemplate restTemplate = HttpClientBeanHolder.getNacosRestTemplate(Loggers.CORE);

private volatile boolean shutdown = false;

@Override
public void start() throws NacosException {
if (start.compareAndSet(false, true)) {
this.maxFailCount = Integer.parseInt(EnvUtil.getProperty("maxHealthCheckFailCount", "12"));
initAddressSys();
run();
}
}

private void initAddressSys() {
String envDomainName = System.getenv("address_server_domain");
if (StringUtils.isBlank(envDomainName)) {
domainName = EnvUtil.getProperty("address.server.domain", "jmenv.tbsite.net");
} else {
domainName = envDomainName;
}
String envAddressPort = System.getenv("address_server_port");
if (StringUtils.isBlank(envAddressPort)) {
addressPort = EnvUtil.getProperty("address.server.port", "8080");
} else {
addressPort = envAddressPort;
}
String envAddressUrl = System.getenv("address_server_url");
if (StringUtils.isBlank(envAddressUrl)) {
addressUrl = EnvUtil.getProperty("address.server.url", EnvUtil.getContextPath() + "/" + "serverlist");
} else {
addressUrl = envAddressUrl;
}
addressServerUrl = "http://" + domainName + ":" + addressPort + addressUrl;
envIdUrl = "http://" + domainName + ":" + addressPort + "/env";

Loggers.CORE.info("ServerListService address-server port:" + addressPort);
Loggers.CORE.info("ADDRESS_SERVER_URL:" + addressServerUrl);
}

@SuppressWarnings("PMD.UndefineMagicConstantRule")
private void run() throws NacosException {
// With the address server, you need to perform a synchronous member node pull at startup
// Repeat three times, successfully jump out
boolean success = false;
Throwable ex = null;
int maxRetry = EnvUtil.getProperty("nacos.core.address-server.retry", Integer.class, 5);
for (int i = 0; i < maxRetry; i++) {
try {
syncFromAddressUrl();
success = true;
break;
} catch (Throwable e) {
ex = e;
Loggers.CLUSTER.error("[serverlist] exception, error : {}", ExceptionUtil.getAllExceptionMsg(ex));
}
}
if (!success) {
throw new NacosException(NacosException.SERVER_ERROR, ex);
}

GlobalExecutor.scheduleByCommon(new AddressServerSyncTask(), 5_000L);
}

@Override
public void destroy() throws NacosException {
shutdown = true;
}

@Override
public Map<String, Object> info() {
Map<String, Object> info = new HashMap<>(4);
info.put("addressServerHealth", isAddressServerHealth);
info.put("addressServerUrl", addressServerUrl);
info.put("envIdUrl", envIdUrl);
info.put("addressServerFailCount", addressServerFailCount);
return info;
}

private void syncFromAddressUrl() throws Exception {
RestResult<String> result = restTemplate
.get(addressServerUrl, Header.EMPTY, Query.EMPTY, genericType.getType());
if (result.ok()) {
isAddressServerHealth = true;
Reader reader = new StringReader(result.getData());
try {
afterLookup(MemberUtil.readServerConf(EnvUtil.analyzeClusterConf(reader)));
} catch (Throwable e) {
Loggers.CLUSTER.error("[serverlist] exception for analyzeClusterConf, error : {}",
ExceptionUtil.getAllExceptionMsg(e));
}
addressServerFailCount = 0;
} else {
addressServerFailCount++;
if (addressServerFailCount >= maxFailCount) {
isAddressServerHealth = false;
}
Loggers.CLUSTER.error("[serverlist] failed to get serverlist, error code {}", result.getCode());
}
}

class AddressServerSyncTask implements Runnable {

@Override
public void run() {
if (shutdown) {
return;
}
try {
syncFromAddressUrl();
} catch (Throwable ex) {
addressServerFailCount++;
if (addressServerFailCount >= maxFailCount) {
isAddressServerHealth = false;
}
Loggers.CLUSTER.error("[serverlist] exception, error : {}", ExceptionUtil.getAllExceptionMsg(ex));
} finally {
GlobalExecutor.scheduleByCommon(this, 5_000L);
}
}
}
}

数据同步

Nacos数据同步分为全量同步和增量同步,所谓全量同步就是初始化数据一次性同步,而增量同步是指有数据增加的时候,只同步增加的数据。

全量同步

image-20220425220334568

全量同步流程比较复杂,流程如上图:

  • 1:启动一个定时任务线程DistroLoadDataTask加载数据,调用load()方 法加载数据
  • 2:调用loadAllDataSnapshotFromRemote()方法从远程机器同步所有的 数据
  • 3:从namingProxy代理获取所有的数据data
  • 4:构造http请求,调用httpGet方法从指定的server获取数据
  • 5:从获取的结果result中获取数据bytes
  • 6:处理数据processData
  • 7:从data反序列化出datumMap
  • 8:把数据存储到dataStore,也就是本地缓存dataMap
  • 9:监听器不包括key,就创建一个空的service,并且绑定监听器
  • 10:监听器listener执行成功后,就更新data store
任务启动

在 com.alibaba.nacos.core.distributed.distro.DistroProtocol 的构造函数中调用 startDistroTask() 方法,该方法会执行startVerifyTask() 和 startLoadTask() ,我们重点关注 startLoadTask() ,该方法代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void startDistroTask() {
// 单机模式下返回
if (EnvUtil.getStandaloneMode()) {
isInitialized = true;
return;
}
/*
启动 startVerifyTask,做分区数据同步校验
当前 nacos 节点把自己负责的 service 的 key 和实例的签名数据发送给其他 nacos 节点。
其他节点收到 key 和签名后,对比本地数据,筛选出需要删除和更新的服务
*/
startVerifyTask();
//启动 DistroLoadDataTask,用于启动时全量加载数据
startLoadTask();
}

private void startLoadTask() {
DistroCallback loadCallback = new DistroCallback() {
@Override
public void onSuccess() {
isInitialized = true;
}

@Override
public void onFailed(Throwable throwable) {
isInitialized = false;
}
};
/*
启动加载远程快照数据到本地
*/
GlobalExecutor.submitLoadDataTask(
new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));
}
数据如何执行加载

上面方法会调用 DistroLoadDataTask 对象,而该对象其实是个线程,因此会执行它的run方法,run方法会调用load()方法实现数据全量加载,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Override
public void run() {
try {
//加载快照数据
load();
if (!checkCompleted()) {
GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
} else {
loadCallback.onSuccess();
Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");
}
} catch (Exception e) {
loadCallback.onFailed(e);
Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);
}
}

private void load() throws Exception {
while (memberManager.allMembersWithoutSelf().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
TimeUnit.SECONDS.sleep(1);
}
while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
TimeUnit.SECONDS.sleep(1);
}
//同步数据
for (String each : distroComponentHolder.getDataStorageTypes()) {
if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
//从远程机器上同步所有数据
loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
}
}
}
数据同步

数据同步会通过Http请求从远程服务器获取数据,并同步到当前服务的缓存中,执行流程如下:

  • 1:loadAllDataSnapshotFromRemote()从远程加载所有数据,并处理同步 到本机
  • 2:transportAgent.getDatumSnapshot()远程加载数据,通过Http请求 执行远程加载
  • 3:dataProcessor.processSnapshot()处理数据同步到本地

数据处理完整逻辑代码如下: loadAllDataSnapshotFromRemote() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private boolean loadAllDataSnapshotFromRemote(String resourceType) {
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == transportAgent || null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}",
resourceType, transportAgent, dataProcessor);
return false;
}
//遍历集群成员节点,不包括自己
for (Member each : memberManager.allMembersWithoutSelf()) {
try {
Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());
//从远程节点加载数据,调用http请求接口: /v1/ns/distro/datums;
DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
//处理数据
boolean result = dataProcessor.processSnapshot(distroData);
Loggers.DISTRO
.info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(),
result);
if (result) {
return true;
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);
}
}
return false;
}

远程加载数据代码如下: transportAgent.getDatumSnapshot() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public DistroData getDatumSnapshot(String targetServer) {
try {
//从namingProxy代理获取所有的数据data,从获取的结果result中获取数据bytes;
byte[] allDatum = NamingProxy.getAllData(targetServer);
//将数据封装成DistroData
return new DistroData(new DistroKey("snapshot", KeyBuilder.INSTANCE_LIST_KEY_PREFIX), allDatum);
} catch (Exception e) {
throw new DistroException(String.format("Get snapshot from %s failed.", targetServer), e);
}
}

public static byte[] getAllData(String server) throws Exception {
//参数封装
Map<String, String> params = new HashMap<>(8);
// uri: /v1/ns/distro/datums 下一个入口:com.alibaba.nacos.naming.controllers.DistroController.getAllDatums
RestResult<String> result = HttpClient.httpGet(
"http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL,
new ArrayList<>(), params);

if (result.ok()) {
return result.getData().getBytes();
}

throw new IOException("failed to req API: " + "http://" + server + EnvUtil.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL + ". code: " + result.getCode() + " msg: "
+ result.getMessage());
}

处理数据同步到本地代码如下: dataProcessor.processSnapshot()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@Override
public boolean processSnapshot(DistroData distroData) {
try {
//处理从远程加载回来的快照数据
return processData(distroData.getContent());
} catch (Exception e) {
return false;
}
}

private boolean processData(byte[] data) throws Exception {
if (data.length > 0) {
Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class);
// 更新倒本地存储
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
dataStore.put(entry.getKey(), entry.getValue());

if (!listeners.containsKey(entry.getKey())) {
// pretty sure the service not exist:
if (switchDomain.isDefaultInstanceEphemeral()) {
// create empty service
Loggers.DISTRO.info("creating service {}", entry.getKey());
Service service = new Service();
String serviceName = KeyBuilder.getServiceName(entry.getKey());
String namespaceId = KeyBuilder.getNamespace(entry.getKey());
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(Constants.DEFAULT_GROUP);
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();

// The Listener corresponding to the key value must not be empty
RecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek();
if (Objects.isNull(listener)) {
return false;
}
listener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);
}
}
}

for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {

if (!listeners.containsKey(entry.getKey())) {
// Should not happen:
Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());
continue;
}

try {
for (RecordListener listener : listeners.get(entry.getKey())) {
listener.onChange(entry.getKey(), entry.getValue().value);
}
} catch (Exception e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);
continue;
}

// Update data store if listener executed successfully:
dataStore.put(entry.getKey(), entry.getValue());
}
}
return true;
}

到此实现数据全量同步,其实全量同步最终封装的协议还是Http。

增量同步

新增数据使用异步广播同步:

  • 1:DistroProtocol 使用 sync() 方法接收增量数据
  • 2:向其他节点发布广播任务

调用 distroTaskEngineHolder 发布延迟任务

  • 3:调用 DistroDelayTaskProcessor.process() 方法进行任务投递:

将延迟任务转换为异步变更任务

  • 4:执行变更任务 DistroSyncChangeTask.run() 方法:向指定节点发送 消息

调用 DistroHttpAgent.syncData() 方法发送数据

调用 NamingProxy.syncData() 方法发送数据

  • 5:异常任务调用 handleFailedTask() 方法进行处理

调用 DistroFailedTaskHandler 处理失败任务

调用 DistroHttpCombinedKeyTaskFailedHandler 将失败任务重新 投递成延迟任务。

增量数据入口

我们回到服务注册,服务注册的 InstanceController.register() 就是数据入口,它会调用 ServiceManager.registerInstance() ,执行数据同步的时候,调用 addInstance() ,在该方法中会执行DistroConsistencyServiceImpl.put() ,该方法是增量同步的入口,会调用 distroProtocol.sync() 方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
@Override
public void put(String key, Record value) throws NacosException {
// 向本地存储添加一条记录
onPut(key, value);
/*
* ***后续逻辑看集群模式下的增量数据同步时再往下跟******
* 使用 distro 协议进行数据同步(增量同步) 操作类型是: DataOperation.CHANGE 默认1s后执行
*/
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}

sync() 方法会执行任务发布,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void sync(DistroKey distroKey, DataOperation action, long delay) {
/*
memberManager.allMembersWithoutSelf() 获取集群除自己以外的成员信息
*/
for (Member each : memberManager.allMembersWithoutSelf()) {
// 创建 DistroKey
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
each.getAddress());
// 创建 DistroDelayTask 任务
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
/*
添加延迟任务 执行 查看 addTask 将任务添加到 tasks 队列中

getDelayTaskExecuteEngine返回的是:DistroDelayTaskExecuteEngine
addTask 在其父类: NacosDelayTaskExecuteEngine
下一个入口:NacosDelayTaskExecuteEngine 构造
*/
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
}
}
}
增量同步操作

延迟任务对象我们可以从 DistroTaskEngineHolder 构造函数中得知是DistroDelayTaskProcessor ,代码如下:

1
2
3
4
public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {
DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);
delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
}

它延迟执行的时候会执行 process 方法,该方法正是执行数据同步的地方,它会执行DistroSyncChangeTask任务,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public boolean process(NacosTask task) {
if (!(task instanceof DistroDelayTask)) {
return true;
}
// 处理 DistroDelayTask 任务
DistroDelayTask distroDelayTask = (DistroDelayTask) task;
DistroKey distroKey = distroDelayTask.getDistroKey();
// 操作类型时 数据的变更 CHANGE
if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {
// 封装 最终的 DistroSyncChangeTask 任务
DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
// 提交去执行 下一个入口:com.alibaba.nacos.core.distributed.distro.task.execute.DistroSyncChangeTask.run
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
return true;
}
return false;
}

DistroSyncChangeTask 实质上是任务的开始,它自身是一个线程,所以会执行它的run方法,而run方法这是数据同步操作,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 同步数据查看这里
*/
@Override
public void run() {
Loggers.DISTRO.info("[DISTRO-START] {}", toString());
try {
// 获取资源类型
String type = getDistroKey().getResourceType();
DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());
distroData.setType(DataOperation.CHANGE);
// 进行数据同步 *******syncData********
boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
if (!result) {
// 失败后重试
handleFailedTask();
}
Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
} catch (Exception e) {
Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
// 失败后重试
handleFailedTask();
}
}

数据同步会执行调用 syncData ,该方法其实就是通过Http协议将数据发送到其他节点实现数据同步,代码如下:

1
2
3
4
5
6
7
8
9
10
@Override
public boolean syncData(DistroData data, String targetServer) {
if (!memberManager.hasMember(targetServer)) {
return true;
}
// 获取要同步的数据
byte[] dataContent = data.getContent();
// 使用 NamingProxy 进行同步
return NamingProxy.syncData(dataContent, data.getDistroKey().getTargetServer());
}

最后:一定要跟着讲师所给的源码自行走一遍!!!