Nacos实战应用 能力目标:
能够实现Nacos安装
基于Nacos能实现应用负载均衡
能基于Nacos实现配置管理
配置管理
负载均衡
多环境切换
配置共享
配置刷新
灰度发布
掌握Nacos集群部署
Nacos概要 Nacos是Alibaba微服务生态组件中的重要组件之一,主要用它实现应用的动态服务发现、配置管理、服务管理。
https://github.com/alibaba/spring-cloud-alibaba/wiki/Nacos-discovery
Nacos是什么
Nacos 致力于帮助您发现、配置和管理微服务。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据及流量管理。
Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。 Nacos 是构建以“服务”为中心的现代应用架构 (例如微服务范式、云原生范式) 的服务基础设施。
Nacos 支持几乎所有主流类型的“服务”的发现、配置和管理
1:Kubernetes Service
2:gRPC & Dubbo RPC Service
3:Spring Cloud RESTful Service
Nacos特性 服务发现和服务健康监测
Nacos 支持基于 DNS 和基于 RPC 的服务发现。服务提供者使用 原生 SDK、OpenAPI、或一个独立的Agent TODO注册 Service 后,服务消费者 可以使用DNS TODO 或HTTP&API查找和发现服务。
Nacos 提供对服务的实时的健康检查,阻止向不健康的主机或服务实例发送 请求。Nacos 支持传输层 (PING 或 TCP)和应用层 (如 HTTP、MySQL、 用户自定义)的健康检查。 对于复杂的云环境和网络拓扑环境中(如 VPC、 边缘网络等)服务的健康检查,Nacos 提供了 agent 上报模式和服务端主 动检测2种健康检查模式。Nacos 还提供了统一的健康检查仪表盘,帮助您根 据健康状态管理服务的可用性及流量。
动态配置服务 动态配置服务可以让您以中心化、外部化和动态化的方式管理所有环境的应用配置和服务配置。
动态配置消除了配置变更时重新部署应用和服务的需要,让配置管理变得更加高效和敏捷。
配置中心化管理让实现无状态服务变得更简单,让服务按需弹性扩展变得更容易。
Nacos 提供了一个简洁易用的UI (控制台样例 Demo) 帮助您管理所有的服 务和应用的配置。Nacos 还提供包括配置版本跟踪、金丝雀发布、一键回滚配置以及客户端配置更新状态跟踪在内的一系列开箱即用的配置管理特性,帮助您更安全地在生产环境中管理配置变更和降低配置变更带来的风险。
动态DNS服务 动态 DNS 服务支持权重路由,让您更容易地实现中间层负载均衡、更灵活的路由策略、流量控制以及数据中心内网的简单DNS解析服务。动态DNS服务还能让您更容易地实现以 DNS 协议为基础的服务发现,以帮助您消除耦合到厂商私有服务发现 API 上的风险。
Nacos 提供了一些简单的 DNS APIs TODO 帮助您管理服务的关联域名和可用的 IP:PORT 列表.
服务及其元数据管理 Nacos 能让您从微服务平台建设的视角管理数据中心的所有服务及元数据,包括管理服务的描述、生命周期、服务的静态依赖分析、服务的健康状态、服务的流量管理、路由及安全策略、服务的 SLA 以及最首要的 metrics 统计数据
Nacos版图
特性大图:要从功能特性,非功能特性,全面介绍我们要解的问题域的特性诉求
架构大图:通过清晰架构,让您快速进入 Nacos 世界
业务大图:利用当前特性可以支持的业务场景,及其最佳实践
生态大图:系统梳理 Nacos 和主流技术生态的关系
优势大图:展示 Nacos 核心竞争力
战略大图:要从战略到战术层面讲 Nacos 的宏观优势
Nacos生态图
Nacos 无缝支持一些主流的开源生态:
1:Spring Cloud
2:Apache Dubbo and Dubbo Mesh
3:Kubernetes and CNCF。
使用 Nacos 简化服务发现、配置管理、服务治理及管理的解决方案,让微服务的发现、管理、共享、组合更加容易。
Nacos架构 参考官方文档:https://nacos.io/zh-cn/docs/architecture.html
Nacos安装 关于Nacos安装,可以直接参考官网安装 https://nacos.io/zh-cn/docs/quick-start.html,我们接下来学习基于Docker实现Nacos单机安装和基于Docker实现Nacos集群安装。
采用Docker-Compose安装Nacos要更方便,所以大家可以先学习一下Docker-Compose
Nacos安装模式有多种:
单机模式 Derby:
这种模式是极简模式,数据没法持久化存储,适合开发环境。
单机模式 MySQL:(支持MySQL5.7和MySQL8.0,我们这里学习MySQL5.7安装模式,因为当前主流还是MySQL5.7)
这种模式支持数据持久化,数据会存储到MySQL中,适合生产环境。
集群模式:
这种模式适合生产环境并且服务节点个数较多,不存在单点故障问题。
克隆项目:
1 2 3 4 5 6 7 8 9 10 11 12 13 # 克隆项目 git clone https://github.com/nacos-group/nacos-docker.git # 进入nacos-docker目录 cd nacos-docker # 查看文件列表 ce example ll
example 中文件列表如下:
Nacos Derby安装
derby是java的内存数据库,还是很方便的,可以直接一件启动nacos,但是是单机版,配置也无法存储,只是用来浅尝一番
安装Nacos生产环境会结合prometheus和grafana实现对Nacos的监控,我们这里不做它们的监控操作,需要将docker-compose的配置注释掉,修改example/standalone-derby.yaml ,配置如下:
进入到 example 目录下执行如下命令:
1 2 3 4 5 6 # 启动 docker-compose -f standalone-derby.yaml up -d # 停止 docker-compose -f standalone-derby.yaml stop # 删除 docker-compose -f standalone-derby.yaml down
安装完成后,我们可以直接访问它的控制台 http://192.168.200.129:8848/nacos,账号密码都是nacos,效果如下:
关于控制台的使用,我们在后面详细讲解。
Nacos MySQL版安装 1、我们先停掉之前安装的服务,并删掉之前的容器
1 2 3 4 5 6 #停掉容器 docker stop nacos-standalone #删掉容器 docker rm nacos-standalone #或者一步到位 docker-compose -f standalone-derby.yaml down
2、使用 example/standalone-mysql-5.7.yaml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 version: "2" services: nacos: image: nacos/nacos-server:${NACOS_VERSION} container_name: nacos-standalone-mysql env_file: - ../env/nacos-standlone-mysql.env volumes: - ./standalone-logs/:/home/nacos/logs - ./init.d/custom.properties:/home/nacos/init.d/custom.properties ports: - "8848:8848" - "9848:9848" - "9555:9555" depends_on: - mysql restart: on-failure mysql: container_name: mysql image: nacos/nacos-mysql:5.7 env_file: - ../env/mysql.env volumes: - ./mysql:/var/lib/mysql ports: - "3306:3306"
3、在 example 目录下使用 docker-compose 命令启动
1 docker-compose -f standalone-mysql-5.7.yaml up -d
数据库脚本在nacos的 nacos\config\src\main\resources\META-INF 工程中有对应脚本,也可以通过下面的网址获取SQL:
https://github.com/alibaba/nacos/blob/develop/config/src/main/resources/META-INF/nacos-db.sql
创建数据库 nacos_config ,并执行初始化操作,初始化脚本后,数据库数据如下:
4、新建配置,查看存储
此时访问后台 http://192.168.200.129:8848/nacos ,并创建一个配置信息,如下图:
此时我们可以随意填写些配置,如下图:
配置填写后,数据会添加到数据库中的 config_info 表中,如下表:
Docker 安装Nacos 获取nacos,下载到~/nacos
1 2 3 4 5 mkdir ~/nacos cd ~/nacos wget https://github.com/nacos-group/nacos-docker/archive/refs/tags/v2.0.3.zip unzip v2.0.3.zip mv nacos-docker-2.0.3 nacos-docker
我们也可以直接采用Docker的方式安装Nacos,这样安装不要克隆项目也不需要修改配置文件,会更方便,安装命令如下:
1 2 3 4 5 6 7 8 9 10 11 docker run -d \ -e MODE=standalone \ -e SPRING_DATASOURCE_PLATFORM=mysql \ -e MYSQL_SERVICE_HOST=192.168.200.129 \ -e MYSQL_SERVICE_PORT=3306 \ -e MYSQL_SERVICE_USER=root \ -e MYSQL_SERVICE_PASSWORD=root \ -e MYSQL_SERVICE_DB_NAME=nacos_config \ -p 8848:8848 \ --restart=always \ --name nacos nacos/nacos-server
注意:需要提取准备好msyql数据库
访问 http://192.168.200.129:8848/nacos 效果如下:
Nacos功能应用 Nacos服务注册与发现 服务发现是微服务架构体系中最关键的组件之一。如果尝试着用手动的方式来给每一个客户端来配置所有服务提供者的服务列表是一件非常困难的事,而且也不利于 服务的动态扩缩容。Nacos Discovery Starter 可以帮助您将服务自动注册到 Nacos 服务端并且能够动态感知和刷新某个服务实例的服务列表。除此之外,Nacos Discovery Starter 也将服务实例自身的一些元数据信息-例如host,port,健康检查URL,主页等-注册到 Nacos 。
接下来我们学习一下如何使用Nacos作为服务的注册中心,并实现服务注册和服务发现。当前项目开发主流技术是SpringBoot,我们就讲解基于SpringBoot如何使用Nacos实现服务注册与发现。
如上图,我们以打车项目为例,当用户打车成功的时候,会调用 hailtaxi- order , hailtaxi-order 会下订单,同时修改司机状态,修改状态需要调用hailtaxi-driver ,我们把 hailtaxi-order 服务和 hailtaxi-driver 服务都注册到Nacos中,并实现服务调用,如果整个调用都没有问题,就说明服务注册发现没问题。
关于SpringCloud Alibaba和SpringBoot的版本,我们可以通过 https://start.spring.io/actuator/info 查看。
SpringBoot我们修改成2.2.10版本,SpringCloud版本改成Hoxton.SR11版本。要使用Nacos需要引入依赖包,并且将Consul依赖移除。
1、注释掉 hailtaxi-api 和 hailtaxi-gateway中的consul依赖包:
1 2 3 4 5 6 7 8 9 10 <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-consul-discovery</artifactId > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-consul-config</artifactId > <version > 2.2.1.RELEASE</version > </dependency >
另外在 hailtaxi-order 和 hailtaxi-pay 中也注意要注释掉consul的相关依赖坐标
2、在 hailtaxi-api 和 hailtaxi-gateway 中引入如下依赖:
1 2 3 4 5 6 7 8 9 10 11 <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-discovery</artifactId > <version > 2.2.5.RELEASE</version > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-config</artifactId > <version > 2.2.5.RELEASE</version > </dependency >
3、配置 bootstrap.yml
bootstrap.yml ( bootstrap.properties )用来在程序引导时执行,应用于更加早期配置信息读取,如可以使用来配置 application.yml 中使用到参数等。
application.yml ( application.properties ) 应用程序特有配置信息,可以用来配置后续各个模块中需使用的公共参数等。
bootstrap.yml 先于 application.yml 加载。
项目中如果使用Nacos,需要使用 bootstrap.yml ,因此我们需要将项目中的 application.yml 换成 bootstrap.yml 。
在 hailtaxi-gateway 中创建 bootstrap.yml 配置文件,添加如下配置:
1 2 3 4 5 6 7 8 9 10 11 spring: application: name: hailtaxi-gateway cloud: nacos: discovery: server-addr: 192.168 .213 .130 :8848 config: server-addr: 192.168 .213 .130 :8848
4、在项目启动类上添加开启服务发现的注解
1 2 3 4 5 6 7 8 @EnableDiscoveryClient @SpringBootApplication public class GatewayApplication implements CommandLineRunner { public static void main (String[] args) { SpringApplication.run(GatewayApplication.class,args); } }
此时我们运行 hailtaxi-gateway ,可以发现在Nacos中已经注册了相关服务,如下图:
5、按相同方式配置 hailtaxi-driver , hailtaxi-order , hailtaxi-pay 三个项目
6、启动各个服务,查看服务列表
注意:启动前,将 hailtaxi-driver 中的异常错误处理掉!!
7、测试
使用postman测试:
http://localhost:8001/driver/info/1
http://localhost:8001/order
此时服务调用没有任何问题,说明服务注册和服务发现正常
负载均衡
如上图,如果此时用户打车成功,会调用订单服务,订单服务会修改司机状态,此时会调用 hailtaxi-driver ,如果是生产环境,每个节点一定是集群状态,比如有2个 hailtaxi-driver 节点,此时如何实现负载均衡?
我们可以发现服务注册到Nacos中,有一个权重属性,这个权重属性就是Nacos的负载均衡机制,此时需要用到Nacos的负载均衡策略 NacosRule ,我们可以在程序中先初始化负载均衡算法,再到bootstrap.yml中配置权重。
1、初始化负载均衡算法
在 hailtaxi-order 中初始化负载均衡算法:
1 2 3 4 5 6 7 8 9 10 11 12 @Configuration public class LoadBalanceConfiguration { @Bean @Scope(value = "prototype") public IRule nacosRule () { return new NacosRule (); } }
2、权重配置,因为是基于权重的
为了演示集群效果,我们启动多个 hailtaxi-driver ,并在bootstrap.yml 中配置权重
1 2 3 4 5 6 7 8 9 10 11 12 spring: application: name: hailtaxi-driver cloud: nacos: discovery: server-addr: 192.168 .213 .130 :8848 weight: 1 config: server-addr: 192.168 .213 .130 :8848
这是默认18082的,对于18085和18086的配置如下
1 2 3 # 默认是1,另起2个服务 -Dspring.cloud.nacos.discovery.weight=2 -Dserver.port=18085 -Dspring.cloud.nacos.discovery.weight=3 -Dserver.port=18086
最终可以在nacos控制台查看服务的权重信息
为了方便查看调用了哪个节点,我们把每个节点的端口号输出,我们请求打车测试
http://localhost:8001/order ,执行6次,应该会按1:2:3 的比例来调用
注意:如果 hailtaxi-order 模块通过 openfeign 调用超时,可以设置它的超时时间
由于 openfeign 底层默认使用的是 ribbon ,故可以采用如下配置:
1 2 3 4 5 ribbon: ReadTimeout: 5000 ConnectTimeout: 5000
如果我们把算法 NacosRule 注释,默认就是和 Ribbon 集成,和 Ribbon 默认开启,可以通过如下配置实现关闭或开启:
1 2 3 ribbon: nacos: enabled: true
配置中心 Nacos 提供用于存储配置和其他元数据的 key/value 存储,为分布式系统中的外部化配置提供服务器端和客户端支持。使用 Spring Cloud AlibabaNacos Config,您可以在 Nacos Server 集中管理你 Spring Cloud 应用的外部属性配置。
Spring Cloud Alibaba Nacos Config 是 Config Server 和 Client 的替代方案,客户端和服务器上的概念与 Spring Environment 和 PropertySource 有着一致的抽象,在特殊的 bootstrap 阶段,配置被加载到 Spring 环境中。当应用程序通过部署管道从开发到测试再到生产时,您可以管理这些环境之间的配置,并确保应用程序具有迁移时需要运行的所有内容。
配置管理 1、我们可以在Nacos控制台配置项目的配置数据,先打开Nacos控制台,
在 命名空间 中点击 新建命名空间 ,如下图:
命名空间可以用于数据隔离,默认在public 命名空间中!!!
2、修改 hailtaxi-driver 中的 bootstrap.yml 配置文件,指定命名空间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 spring: application: name: hailtaxi-driver cloud: nacos: discovery: server-addr: 192.168 .213 .130 :8848 weight: 1 namespace: 1ebba5f6-49da-40cc-950b-f75c8f7d07b3 config: server-addr: 192.168 .213 .130 :8848 namespace: 1ebba5f6-49da-40cc-950b-f75c8f7d07b3 file-extension: yaml
3、在nacos中添加配置
在 配置管理>配置列表 中添加,如下图:
将hailtaxi-driver原来在 application.yml 中的配置全部填写到下面表单中,然后将 application.yml 配置文件删除,或者删除所有配置,如下图:
Data ID:默认加载${spring.application.name}.${file-extension}
另外对于web服务的端口 server.port 一般留在 application.yml 中
注意 Data ID 的配置。
4、启动测试:
我们启动 hailtaxi-driver 服务,默认加载${spring.application.name}.${file-extension:properties}
配置,加载完成后,配置数据会生效,并访问
http://localhost:18081/driver/info/1 测试,效果如下:
5、配置 hailtaxi-gateway , hailtaxi-order , hailtaxi-pay 等几个项目
按照相同的方式,将各个项目中 application.yml 中的配置配置到nacos中,并注释/删除掉本地 application.yml 中的配置(端口的配置一般留在项目中)
最后启动所有项目,测试走网关的API。
多环境切换 spring-cloud-starter-alibaba-nacos-config 在加载配置的时候,不仅仅加载以 dataid 为 ${spring.application.name}.${file-extension:properties}
为前缀的基础配置,还加载dataid为 ${spring.application.name}-${profile}.${file-extension:properties}
的基础配置。在日常开发中如果遇到多套环境下的不同配置,可以通过Spring 提供的 ${spring.profiles.active} 这个配置项来配置。
1、在nacos中创建 hailtaxi-driver-dev.yaml 作为开发环节的配置,创建 hailtaxi-driver-test.yaml 作为测试环境的配置文件,创建如下:
hailtaxi-driver-test.yaml
hailtaxi-driver-dev.yaml
这样多出来两份配置信息如下:
2、在 hailtaxi-driver 项目的 bootstrap.yml 中激活配置,如下:
启动项目,看是否能正常启动!!!
访问:http://localhost:18081/driver/info/1
3、将 active 换成 test ,启动后访问:http://localhost:18081/driver/info/1
共享/扩展 配置 在实际的业务场景中应用和共享配置间的关系可能, Spring Cloud Alibaba Nacos Config 从 0.2.1 版本后,可支持自定义 Data Id 的配置,通过它可以解决配置共享问题
共享配置:
每个服务与每个服务直接共享的信息
1、创建一个Data ID 为: datasource.yaml 的配置,用于配置数据库连接,如下图:
2、在 hailtaxi-driver-dev.yaml 中将数据库的配置信息删除,如下图:
3、在 bootstrap.yml 中引入共享配置需要使用 shared-configs 属性,配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 spring: cloud: nacos: discovery: server-addr: 192.168 .213 .130 :8848 namespace: b24962bb-2277-40b2-a08d-a9da0053d135 config: server-addr: 192.168 .213 .130 :8848 namespace: b24962bb-2277-40b2-a08d-a9da0053d135 file-extension: yaml shared-configs: - dataId: datasource.yaml refresh: true
配置信息的加载由NacosConfigProperties完成
启动测试访问 http://localhost:18081/driver/info/1,此时能访问数据库,
同时也能获取 hailtaxi-driver-dev.yaml 中的配置,效果如下:
扩展配置:
每个服务与每个服务特有的信息,好像没什么用,完全可以放到主配置文件里,应用名-环境.yaml
nacos除了支持读取以上所支持的的配置信息外,用户还可以自定义扩展配置
1、在nacos中 创建一个Data ID 为: custom.yaml 的配置,配置信息如下图:
2、在 hailtaxi-driver 模块的 bootstrap.yml 中加载该配置,使用extension-configs 属性,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 spring: cloud: nacos: discovery: server-addr: 192.168 .213 .130 :8848 namespace: b24962bb-2277-40b2-a08d-a9da0053d135 config: server-addr: 192.168 .213 .130 :8848 namespace: b24962bb-2277-40b2-a08d-a9da0053d135 file-extension: yaml shared-configs: - dataId: datasource.yaml refresh: true extension-configs: - dataId: custom.yaml
3、在应用程序中去读该配置,在 DriverController 中添加如下代码
1 2 3 4 5 6 @Value("${app.version}") private String version; @GetMapping("/appinfo") public String getAppInfo () { return version; }
4、测试访问:http://localhost:18081/driver/appinfo 查看结果
配置刷新
配置自动刷新对程序来说非常重要,Nacos支持配置自动刷新,并且提供了多种刷新机制。
Environment自动刷新spring-cloud-starter-alibaba-nacos-config 支持配置的动态更新,Environment能实时更新到最新的配置信息,启动 Spring Boot 应用测试的代码如下:
1、在 hailtaxi-driver-dev.yaml 中添加一个配置项,如下
2、在 hailtaxi-driver 的启动类中添加一段测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @SpringBootApplication @EnableDiscoveryClient @MapperScan(basePackages = "com.itheima.driver.mapper") public class DriverApplication { public static void main (String[] args) { ApplicationContext applicationContext = SpringApplication.run(DriverApplication.class,args); while (true ) { String name = applicationContext.getEnvironment().getProperty("app.name" ); String version = applicationContext.getEnvironment().getProperty("app.version" ); System.out.println("app.name=" +name+";app.version=" + version); try { TimeUnit.SECONDS.sleep(5 ); } catch (InterruptedException e) { e.printStackTrace(); } } } }
3、启动查看控制台输出,
4、在nacos中修改 app.name 和 app.version ,再次查看控制台输出
5、默认情况下, shared-configs 和 extension-configs 是不自动刷新的,【其他配置可以】,如果要支持刷新,需要添加 refresh 属性,如下
1 2 3 4 5 6 7 8 # 加载共享配置信息 shared-configs[0 ]: dataId: datasource.yaml refresh: true # 加载扩展配置 extension-configs: - dataId: custom.yaml refresh: true
@Value刷新
注意,如果知识配置了refresh: true,nacos会推送到程序后台,但是springboot不会刷新,还要配置RefreshScope才能达到自动刷新
程序中如果写了 @Value 注解,可以采用 @RefreshScope 实现刷新,只需要在指定类上添加该注解即可,如下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @RestController @RequestMapping(value = "/driver") @RefreshScope public class DriverController { @Value("${app.version}") private String version; @Value("${app.name}") private String appname; @GetMapping("/appinfo") public String getAppInfo () { return appname + ";" +version; } } # 或者通过springboot的,也是可以拿到最新的配置信息的 @Autowired Environment environment; environment.getProperty("app.version" ) # 用配置类的方式,也能自动更新 @Data @Configuration @ConfigurationProperties(prefix="app) public class AppProperteis { private String version; } 3种方式
启动测试: http://localhost:18081/driver/appinfo
修改完配置信息后再次测试,查看是否能动态刷新!
灰度发布 灰度配置指的是指定部分客户端IP进行新配置的下发,其余客户端配置保持不变,用以验证新配置对客户端的影响,保证配置的平稳发布。灰度配置是生产环境中一个比较重要的功能,对于保证生产环境的稳定性非常重要。在1.1.0中,Nacos支持了以I为粒度的灰度配置 ,具体使用步骤如下:
1、将 hailtaxi-driver 项目打包,上传到服务器,并启动
1 java -jar hailtaxi-driver.jar
2、本地也启动
查看 nacos 中的服务列表
可以看到有一个实例是来自远程服务器上的
3、在nacos中,找到 hailtaxi-driver-dev.yaml 中,编辑配置,勾选“Beta发布”,在文本框里填入要下发配置配置的IP,多个IP用逗号分隔,操作如下:
配置Beta发布,针对192.168.200.129生效
修改配置内容,点击“发布Beta”按钮,即可完成灰度配置的发布,点击“发 布Beta”后,“发布Beta”按钮变灰,此时可以选择“停止Beta”或者“发布”。“停止Beta”表示取消停止灰度发布,当前灰度发布配置的IP列表和配置内容都会删除,页面回到正常发布的样式。“发布”表示将灰度配置在所有客户端生效,之前的配置也会被覆盖,同时页面回到正常发布的样式:
4、测试:访问:
http://localhost:18081/driver/appinfo
http://192.168.200.129:18081/driver/appinfo
访问192.168.200.129就会访问到Beta中配置的结果
Nacos集群 在生产环境Nacos一般都不是单节点存在,如果是单节点,很容易存在单点故障,因此生产环境一般都以集群形式存在
集群架构 Nacos集群模式有多种,但其实无论哪种都是将3个Nacos服务进行集群发布。
集群需要采用数据共享模式进行配置信息共享,也就是要将数据存入到同一个数据库中 ,我们对每种集群模式进行说明:
1)直连模式
http://ip1:port/openAPI 直连ip模式,机器挂则需要修改ip才可以使用。
比如我现在有3个Nacos,每次操作数据的时候,都需要使用IP:端口的模式,这种模式效率极低,并且一旦节点故障无法识别,因此官方不推荐这种模式。
2)VIP模式
http://VIP:port/openAPI 挂载VIP模式,直连vip即可,下面挂server真实ip,可读性不好。
3)域名模式
http://nacos.com:port/openAPI 域名 + VIP模式,可读性好,而且换ip方便,因此官方推荐该模式,该模式的结构图如下:
Nacos集群部署 我们搭建Nacos集群环境,集群环境配置如下:
1)服务下载
在 https://github.com/alibaba/nacos/releases/ 下载需要的服务,
当前使用的是1.4.1, 我们可以选择下载1.4.1版本,版本如下:
解压压缩包后,包结构如下:
2)配置数据库
修改 conf/application.properties 配置数据库,配置如下:
3)集群配置
修改 conf/cluster.conf 配置集群:
1 2 3 192.168.211.145:8848 192.168.211.146:8848 192.168.211.147:8848
真集群ip不同,端口相同;伪集群ip相同,端口不同
4)节点同步
将修改好的服务分别上传到 192.168.211.146 、 192.168.211.147 服务:
1 2 scp -r nacos 192.168.211.146:/usr/local/server/alibaba/ scp -r nacos 192.168.211.147:/usr/local/server/alibaba/
5)启动每个节点
进入到每个节点 nacos/bin 目录下,执行启动:sh startup.sh
访问任何一个单节点,信息如下:
注:
学习阶段,为了节省资源,可以直接使用 nacos-docker 中已配置好的集群启动 yml 进行启动
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 version: "2" services: nacos1: image: nacos/nacos-server:${NACOS_VERSION} container_name: nacos1 networks: nacos_net: ipv4_address: 172.16 .238 .10 volumes: - ./cluster-logs/nacos1:/home/nacos/logs ports: - "8848:8848" - "9848:9848" - "9555:9555" env_file: - ../env/nacos-ip.env restart: on-failure depends_on: - mysql nacos2: image: nacos/nacos-server:${NACOS_VERSION} container_name: nacos2 networks: nacos_net: ipv4_address: 172.16 .238 .11 volumes: - ./cluster-logs/nacos2:/home/nacos/logs ports: - "8849:8848" - "9849:9848" env_file: - ../env/nacos-ip.env restart: always depends_on: - mysql nacos3: image: nacos/nacos-server:${NACOS_VERSION} container_name: nacos3 networks: nacos_net: ipv4_address: 172.16 .238 .12 volumes: - ./cluster-logs/nacos2:/home/nacos/logs ports: - "8850:8848" - "9850:9848" env_file: - ../env/nacos-ip.env restart: always depends_on: - mysql mysql: container_name: mysql image: nacos/nacos-mysql:5.7
172.16.238.12是nacos 分配的固定ip,不用改
1 docker-compose -f cluster-ip.yaml up -d
访问ip:8848/49/50,就可以访问了
客户端接入Nacos集群 客户端接入,不建议写多个节点的IP:Port,建议以域名的方式连接
Nacos,因此需要配置Nacos域名,在 192.168.211.145 节点中配置域名nacos.hailtaxi.com , nginx 配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 upstream hailtaxi-nacos{ server 192.168.211.145:8848 ; server 192.168.211.146:8848 ; server 192.168.211.147:8848 ; } server { listen 80 ; server_name nacos.hailtaxi.com; location / { proxy_pass http://hailtaxi-nacos; } }
配置 nacos.hailtaxi.com 域名映射:
1 2 3 4 #修改hosts文件 vi /etc/hots #添加如下映射关系 192.168.211.145 hailtaxinacos.com
保存Nginx配置,并启动Nginx,修改本地
C:\Windows\System32\drivers\hosts ,添加如下配置:
1 192.168.211.145 hailtaxinacos.com
访问 http://hailtaxinacos.com/nacos ,效果如下:
项目中使用:
方式1:
项目中Nacos地址可以把多个地址写到一起,用逗号隔开,如下代码:
Nacos效果如下:
ip万一更换,也是增加维护成本
方式2:
通过Nginx访问即可,配置如下:
192.168.211.145:80
访问域名,域名到hosts中的配置(本地),走负载均衡(本地nginx)
nacos源码
掌握Nacos服务通信源码原理
掌握Nacos数据一致性原理
掌握Nacos数据全量同步/增量同步源码
源码环境搭建 Nacos源码有很多值得我们学习的地方,为了深入理解Nacos,我们剖析源码,分析如下2个知识点:
1:Nacos对注册中心的访问原理
2:Nacos注册服务处理流程
1、从官方项目 上克隆下来,并且检出 1.4.1 版本,导入idea。nacos源码环境搭建起来比较轻松,几乎不会报什么错误,导入后编译安装到本地环境即可
编译之后可能找不到类,要将consistency下的java和grpc-java设置为生成目录
2、找到 config 模块中找到 \resources\META-INF\nacos-db.sql ,在本地mysql中创建数据库 nacos-config ,将该脚本导入执行创建表。
3、找到 console 模块下的配置文件 application.properties ,修改数据库相关配置
1 2 3 4 5 6 7 8 9 spring.datasource.platform =mysql db.num =1 db.url.0 =jdbc:mysql://127.0.0.1:3306/nacos-config? characterEncoding=utf8&connectTimeout=1000&socketTimeout=3 000&autoReconnect=true&useUnicode=true&useSSL=false&server Timezone=UTC db.user.0 =nacos db.password.0 =nacos
4、找到 console 模块下的启动类,启动nacos的服务端,启动时添加启动参数,指定启动模式为非集群启动
5、访问本地的nacos:http://localhost:8848/nacos
至此,源码环境搭建成功!
nacos 客户端 首先要搞清楚:nacos的客户端其实在我们自己的服务里,我们引入了nacos的相关坐标依赖,nacos客户端以jar包的形式在我们的服务中工作
对于nacos的客户端,它都要帮我们实现什么功能呢?
它的主要工作有:服务注册、服务发现、服务下线操作、服务订阅操作等相关操作。
客户端与注册中心服务端的交互,主要集中在服务注册、服务下线、服务发现、订阅某个服务,其实使用最多的就是服务注册和服务发现,下面我会从源码的角度分析一下这四个功能。
在Nacos源码中 nacos-example 中 com.alibaba.nacos.example.NamingExample 类分别演示了这4个功能的操作,我们可以把它当做入口,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 package com.alibaba.nacos.example;import com.alibaba.nacos.api.exception.NacosException;import com.alibaba.nacos.api.naming.NamingFactory;import com.alibaba.nacos.api.naming.NamingService;import com.alibaba.nacos.api.naming.listener.AbstractEventListener;import com.alibaba.nacos.api.naming.listener.Event;import com.alibaba.nacos.api.naming.listener.NamingEvent;import java.util.Properties;import java.util.concurrent.Executor;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadFactory;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class NamingExample { public static void main (String[] args) throws NacosException { Properties properties = new Properties (); properties.setProperty("serverAddr" , "127.0.0.1:8848" ); properties.setProperty("namespace" , "demo" ); NamingService naming = NamingFactory.createNamingService(properties); naming.registerInstance("nacos.test.1" , "192.168.200.10" , 8888 ); naming.registerInstance("nacos.test.1" , "192.168.200.10" , 7777 ); naming.registerInstance("nacos.test.2" , "DEFAULT_GROUP" ,"192.168.200.10" , 9999 ); System.out.println(naming.getAllInstances("nacos.test.1" )); naming.deregisterInstance("nacos.test.1" , "192.168.200.10" , 7777 ); System.out.println(naming.getAllInstances("nacos.test.1" )); Executor executor = new ThreadPoolExecutor (1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>(), new ThreadFactory () { @Override public Thread newThread (Runnable r) { Thread thread = new Thread (r); thread.setName("test-thread" ); return thread; } }); naming.subscribe("nacos.test.3" , new AbstractEventListener () { @Override public Executor getExecutor () { return executor; } @Override public void onEvent (Event event) { System.out.println(((NamingEvent) event).getServiceName()); System.out.println(((NamingEvent) event).getInstances()); } }); } }
这里要在本地nacos创建一个命名空间demo
服务注册 我们沿着案例中的服务注册方法调用找到 nacos-api 中的NamingService.registerInstance() 并找到它的实现类和方法com.alibaba.nacos.client.naming.NacosNamingService ,代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Override public void registerInstance (String serviceName, String groupName, String ip, int port, String clusterName) throws NacosException { Instance instance = new Instance (); instance.setIp(ip); instance.setPort(port); instance.setWeight(1.0 ); instance.setClusterName(clusterName); registerInstance(serviceName, groupName, instance); } @Override public void registerInstance (String serviceName, String groupName, Instance instance) throws NacosException { NamingUtils.checkInstanceIsLegal(instance); String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); if (instance.isEphemeral()) { BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance); beatReactor.addBeatInfo(groupedServiceName, beatInfo); } serverProxy.registerService(groupedServiceName, groupName, instance); }
注册主要做了两件事,第一件事:为注册的服务设置一个定时心跳任务。
第二件事:将服务注册到服务端。
1:启动一个定时心跳任务,时间间隔为5s,如果服务正常,不做处理,如果不正常,重新注册
2:发送http请求给注册中心服务端,调用服务注册接口,注册服务
上面代码我们可以看到定时任务添加,但并未完全看到远程请求,serverProxy.registerService() 方法如下,会先封装请求参数,接下来调用 reqApi() 而 reqApi() 最后会调用 callServer() ,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public void registerService (String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}" , namespaceId, serviceName, instance); final Map<String, String> params = new HashMap <String, String>(16 ); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put(CommonParams.GROUP_NAME, groupName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put("ip" , instance.getIp()); params.put("port" , String.valueOf(instance.getPort())); params.put("weight" , String.valueOf(instance.getWeight())); params.put("enable" , String.valueOf(instance.isEnabled())); params.put("healthy" , String.valueOf(instance.isHealthy())); params.put("ephemeral" , String.valueOf(instance.isEphemeral())); params.put("metadata" , JacksonUtils.toJson(instance.getMetadata())); reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST); }
往下跟reqApi最后掉到callServer方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public String callServer (String api, Map<String, String> params, Map<String, String> body, String curServer, String method) throws NacosException { long start = System.currentTimeMillis(); long end = 0 ; injectSecurityInfo(params); Header header = builderHeader(); String url; if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) { url = curServer + api; } else { if (!IPUtil.containsPort(curServer)) { curServer = curServer + IPUtil.IP_PORT_SPLITER + serverPort; } url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api; } try { HttpRestResult<String> restResult = nacosRestTemplate .exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class); end = System.currentTimeMillis(); MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode())) .observe(end - start); if (restResult.ok()) { return restResult.getData(); } if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) { return StringUtils.EMPTY; } throw new NacosException (restResult.getCode(), restResult.getMessage()); } catch (Exception e) { NAMING_LOGGER.error("[NA] failed to request" , e); throw new NacosException (NacosException.SERVER_ERROR, e); } }
执行远程Http请求的对象是 NacosRestTemplate ,该对象就是封装了普通的Http请求,大家可以自己查阅一下。
服务发现 两个入口:
1、 NamingService.getAllInstances(serviceName)
我们沿着案例中的服务发现方法调用找到 nacos-api 中的NamingService.getAllInstances() 并找到它的实现类和方法com.alibaba.nacos.client.naming.NacosNamingService.getAllInstances() ,代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Override public List<Instance> getAllInstances (String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException { ServiceInfo serviceInfo; if (subscribe) { serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, "," )); } else { serviceInfo = hostReactor .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, "," )); } List<Instance> list; if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) { return new ArrayList <Instance>(); } return list; }
上面的代码调用了 hostReactor.getServiceInfo() 方法,该方法会先调用 getServiceInfo0() 方法从本地缓存获取数据,缓存没有数据,就构建实例更新到Nacos,并从Nacos中获取最新数据, getServiceInfo0() 方法源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public ServiceInfo getServiceInfo (final String serviceName, final String clusters) { NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); String key = ServiceInfo.getKey(serviceName, clusters); if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); if (null == serviceObj) { serviceObj = new ServiceInfo (serviceName, clusters); serviceInfoMap.put(serviceObj.getKey(), serviceObj); updatingMap.put(serviceName, new Object ()); updateServiceNow(serviceName, clusters); updatingMap.remove(serviceName); } else if (updatingMap.containsKey(serviceName)) { if (UPDATE_HOLD_INTERVAL > 0 ) { synchronized (serviceObj) { try { serviceObj.wait(UPDATE_HOLD_INTERVAL); } catch (InterruptedException e) { NAMING_LOGGER .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e); } } } } scheduleUpdateIfAbsent(serviceName, clusters); return serviceInfoMap.get(serviceObj.getKey()); }
updateServiceNow(serviceName, clusters); 主从从远程服务器获取更新数据,最终会调用 updateService() 方法,在该方法中完成远程请求和数据处理,源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public void updateService (String serviceName, String clusters) throws NacosException { ServiceInfo oldService = getServiceInfo0(serviceName, clusters); try { String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false ); if (StringUtils.isNotEmpty(result)) { processServiceJson(result); } } finally { if (oldService != null ) { synchronized (oldService) { oldService.notifyAll(); } } } }
服务下线 我们沿着案例中的服务下线方法调用找到 nacos-api 中的NamingService.deregisterInstance() 并找到它的实现类和方法NacosNamingService.deregisterInstance() ,代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Override public void deregisterInstance (String serviceName, String groupName, String ip, int port, String clusterName) throws NacosException { Instance instance = new Instance (); instance.setIp(ip); instance.setPort(port); instance.setClusterName(clusterName); deregisterInstance(serviceName, groupName, instance); } @Override public void deregisterInstance (String serviceName, String groupName, Instance instance) throws NacosException { if (instance.isEphemeral()) { beatReactor.removeBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), instance.getIp(), instance.getPort()); } serverProxy.deregisterService(NamingUtils.getGroupedName(serviceName, groupName), instance); }
服务下线方法比较简单,和服务注册做的事情正好相反,也做了两件事,第一件事:不在进行心跳检测。 第二件事:请求服务端服务下线接口。
服务订阅 我们可以查看订阅服务的案例,会先创建一个线程池,接下来会把线程池封装到监听器中,而监听器中可以监听指定实例信息,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 /** * 服务订阅 * (step4) */ naming.subscribe("nacos.test.3", new AbstractEventListener() { //EventListener onEvent is sync to handle, If process too low in onEvent, maybe block other onEvent callback. //So you can override getExecutor() to async handle event. @Override public Executor getExecutor() { return executor; } @Override public void onEvent(Event event) { System.out.println(((NamingEvent) event).getServiceName()); System.out.println(((NamingEvent) event).getInstances()); } });
我们沿着案例中的服务订阅方法调用找到 nacos-api 中的NamingService.subscribe() 并找到它的实现类和方法NacosNamingService.deregisterInstance() ,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 public void subscribe (String serviceName, String clusters, EventListener eventListener) { notifier.registerListener(serviceName, clusters, eventListener); getServiceInfo(serviceName, clusters); }
此时会注册监听,注册监听,注册监听就是将当前的监听对象信息注入到listenerMap集合中,在监听对象的指定方法onEvent中可以读取实例信息,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public void registerListener (String serviceName, String clusters, EventListener listener) { String key = ServiceInfo.getKey(serviceName, clusters); ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key); if (eventListeners == null ) { synchronized (lock) { eventListeners = listenerMap.get(key); if (eventListeners == null ) { eventListeners = new ConcurrentHashSet <EventListener>(); listenerMap.put(key, eventListeners); } } } eventListeners.add(listener); }
nacos 服务端 注册中心服务端的主要功能包括,接收客户端的服务注册,服务发现,服务下线的功能,但是除了这些和客户端的交互之外,服务端还要做一些更重要的事情,就是我们常常会在分布式系统中听到的AP和CP,作为一个集群,nacos即实现了AP也实现了CP,其中AP使用的自己实现的Distro协议,而CP是采用raft协议实现的,这个过程中牵涉到心跳、选主等操作。
我们来学习一下注册中心服务端接收客户端服务注册的功能。
注册处理 我们先来学习一下Nacos的工具类 WebUtils ,该工具类在 nacos-core 工程下,该工具类是用于处理请求参数转化的,里面提供了2个常被用到的方法required() 和 optional() :
required方法通过参数名key,解析HttpServletRequest请求中的参数, 并转码为UTF-8编码。
optional方法在required方法的基础上增加了默认值,如果获取不到,则 返回默认值。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public static String required (final HttpServletRequest req, final String key) { String value = req.getParameter(key); if (StringUtils.isEmpty(value)) { throw new IllegalArgumentException ("Param '" + key + "' is required." ); } String encoding = req.getParameter("encoding" ); return resolveValue(value, encoding); } public static String optional (final HttpServletRequest req, final String key, final String defaultValue) { if (!req.getParameterMap().containsKey(key) || req.getParameterMap().get(key)[0 ] == null ) { return defaultValue; } String value = req.getParameter(key); if (StringUtils.isBlank(value)) { return defaultValue; } String encoding = req.getParameter("encoding" ); return resolveValue(value, encoding); }
nacos 的 server 与 client 使用了 http 协议来交互,那么在 server 端必定提供了 http 接口的入口,并且在 core 模块看到其依赖了 spring boot starter ,所以它的http接口由集成了Spring的web服务器支持,简单地说就是像我们平时写的业务服务一样,有controller层和service层。
以OpenAPI作为入口来学习,我们找到 /nacos/v1/ns/instance 服务注册接口,在 nacos-naming 工程中我们可以看到 InstanceController 正是我们要找的对象,如下图:
处理服务注册,我们直接找对应的POST方法即可,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register (HttpServletRequest request) throws Exception { final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); final Instance instance = parseInstance(request); serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok" ; }
如上图,该方法主要用于接收客户端注册信息,并且会校验参数是否存在问题,如果不存在问题就创建服务的实例,服务实例创建后将服务实例注册到Nacos中,注册的方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 public void registerInstance (String namespaceId, String serviceName, Instance instance) throws NacosException { createEmptyService(namespaceId, serviceName, instance.isEphemeral()); Service service = getService(namespaceId, serviceName); if (service == null ) { throw new NacosException (NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }
注册的方法中会先创建该实例对象,创建前先检查本地缓存是否存在该实例对象,如果不存在就创建,最后注册该服务,并且该服务会和实例信息捆绑到一起。
Distro协议介绍 Distro是阿里巴巴的私有协议, 是一种分布式一致性算法,目前流行的Nacos服务管理框架就采用了Distro协议。Distro 协议被定位为 临时数据的一致性协议 :该类型协议, 不需要把数据存储到磁盘或者数据库,因为临时数据通常和服务器保持一个session会话, 该会话只要存在,数据就不会丢失 。
Distro 协议保证写必须永远是成功的,即使可能会发生网络分区。当网络恢复时,把各数据分片的数据进行合并。
Distro 协议具有以下特点:
1:专门为了注册中心而创造出的协议;
2:客户端与服务端有两个重要的交互,服务注册与心跳发送;
3:客户端以服务为维度向服务端注册,注册后每隔一段时间向服务端发送一 次心跳,心跳包需要带上注册服务的全部信息,在客户端看来,服务端节点 对等,所以请求的节点是随机的;
4:客户端请求失败则换一个节点重新发送请求;
5:服务端节点都存储所有数据,但每个节点只负责其中一部分服务,在接收 到客户端的“写”(注册、心跳、下线等)请求后,服务端节点判断请求的服务是否为自己负责,如果是,则处理,否则交由负责的节点处理;
6:每个服务端节点主动发送健康检查到其他节点,响应的节点被该节点视为 健康节点;
7:服务端在接收到客户端的服务心跳后,如果该服务不存在,则将该心跳请 求当做注册请求来处理;
8:服务端如果长时间未收到客户端心跳,则下线该服务;
9:负责的节点在接收到服务注册、服务心跳等写请求后将数据写入后即返 回,后台异步地将数据同步给其他节点;
10:节点在收到读请求后直接从本机获取后返回,无论数据是否为最新。
Distro寻址 Distro协议主要用于nacos 服务端节点之间的相互发现,nacos使用寻址机制来实现服务端节点的管理。在Nacos中,寻址模式有三种:
单机模式(StandaloneMemberLookup)
文件模式(FileConfigMemberLookup)
服务器模式(AddressServerMemberLookup)
三种寻址模式如下图:
单机模式 在 com.alibaba.nacos.core.cluster.lookup.LookupFactory 中有创建寻址方式,可以创建集群启动方式、单机启动方式,不同启动方式就决定了不同寻址模式,如果是集群启动,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public static MemberLookup createLookUp (ServerMemberManager memberManager) throws NacosException { if (!EnvUtil.getStandaloneMode()) { String lookupType = EnvUtil.getProperty(LOOKUP_MODE_TYPE); LookupType type = chooseLookup(lookupType); LOOK_UP = find(type); currentLookupType = type; } else { LOOK_UP = new StandaloneMemberLookup (); } LOOK_UP.injectMemberManager(memberManager); Loggers.CLUSTER.info("Current addressing mode selection : {}" , LOOK_UP.getClass().getSimpleName()); return LOOK_UP; } private static MemberLookup find (LookupType type) { if (LookupType.FILE_CONFIG.equals(type)) { LOOK_UP = new FileConfigMemberLookup (); return LOOK_UP; } if (LookupType.ADDRESS_SERVER.equals(type)) { LOOK_UP = new AddressServerMemberLookup (); return LOOK_UP; } throw new IllegalArgumentException (); }
单节点寻址模式会直接创建 StandaloneMemberLookup 对象,而文件寻址模式会创建 FileConfigMemberLookup 对象,服务器寻址模式会创建AddressServerMemberLookup ;
文件寻址模式
文件寻址模式主要在创建集群的时候,通过 cluster.conf 来配置集群,程序可以通过监听 cluster.conf 文件变化实现动态管理节点,FileConfigMemberLookup 源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 package com.alibaba.nacos.core.cluster.lookup;import com.alibaba.nacos.api.exception.NacosException;import com.alibaba.nacos.core.cluster.AbstractMemberLookup;import com.alibaba.nacos.core.cluster.Member;import com.alibaba.nacos.core.cluster.MemberUtil;import com.alibaba.nacos.sys.env.EnvUtil;import com.alibaba.nacos.sys.file.FileChangeEvent;import com.alibaba.nacos.sys.file.FileWatcher;import com.alibaba.nacos.sys.file.WatchFileCenter;import com.alibaba.nacos.core.utils.Loggers;import org.apache.commons.lang3.StringUtils;import java.util.ArrayList;import java.util.Collection;import java.util.List;public class FileConfigMemberLookup extends AbstractMemberLookup { private FileWatcher watcher = new FileWatcher () { @Override public void onChange (FileChangeEvent event) { readClusterConfFromDisk(); } @Override public boolean interest (String context) { return StringUtils.contains(context, "cluster.conf" ); } }; @Override public void start () throws NacosException { if (start.compareAndSet(false , true )) { readClusterConfFromDisk(); try { WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher); } catch (Throwable e) { Loggers.CLUSTER.error("An exception occurred in the launch file monitor : {}" , e.getMessage()); } } } @Override public void destroy () throws NacosException { WatchFileCenter.deregisterWatcher(EnvUtil.getConfPath(), watcher); } private void readClusterConfFromDisk () { Collection<Member> tmpMembers = new ArrayList <>(); try { List<String> tmp = EnvUtil.readClusterConf(); tmpMembers = MemberUtil.readServerConf(tmp); } catch (Throwable e) { Loggers.CLUSTER .error("nacos-XXXX [serverlist] failed to get serverlist from disk!, error : {}" , e.getMessage()); } afterLookup(tmpMembers); } }
服务器寻址模式 使用地址服务器存储节点信息,会创建 AddressServerMemberLookup ,服务端定时拉取信息进行管理;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 package com.alibaba.nacos.core.cluster.lookup;import com.alibaba.nacos.api.exception.NacosException;import com.alibaba.nacos.common.http.HttpClientBeanHolder;import com.alibaba.nacos.common.http.client.NacosRestTemplate;import com.alibaba.nacos.common.http.param.Header;import com.alibaba.nacos.common.http.param.Query;import com.alibaba.nacos.common.model.RestResult;import com.alibaba.nacos.common.utils.ExceptionUtil;import com.alibaba.nacos.core.cluster.AbstractMemberLookup;import com.alibaba.nacos.core.cluster.MemberUtil;import com.alibaba.nacos.core.utils.GenericType;import com.alibaba.nacos.core.utils.GlobalExecutor;import com.alibaba.nacos.core.utils.Loggers;import com.alibaba.nacos.sys.env.EnvUtil;import org.apache.commons.lang3.StringUtils;import java.io.Reader;import java.io.StringReader;import java.util.HashMap;import java.util.Map;public class AddressServerMemberLookup extends AbstractMemberLookup { private final GenericType<RestResult<String>> genericType = new GenericType <RestResult<String>>() { }; public String domainName; public String addressPort; public String addressUrl; public String envIdUrl; public String addressServerUrl; private volatile boolean isAddressServerHealth = true ; private int addressServerFailCount = 0 ; private int maxFailCount = 12 ; private final NacosRestTemplate restTemplate = HttpClientBeanHolder.getNacosRestTemplate(Loggers.CORE); private volatile boolean shutdown = false ; @Override public void start () throws NacosException { if (start.compareAndSet(false , true )) { this .maxFailCount = Integer.parseInt(EnvUtil.getProperty("maxHealthCheckFailCount" , "12" )); initAddressSys(); run(); } } private void initAddressSys () { String envDomainName = System.getenv("address_server_domain" ); if (StringUtils.isBlank(envDomainName)) { domainName = EnvUtil.getProperty("address.server.domain" , "jmenv.tbsite.net" ); } else { domainName = envDomainName; } String envAddressPort = System.getenv("address_server_port" ); if (StringUtils.isBlank(envAddressPort)) { addressPort = EnvUtil.getProperty("address.server.port" , "8080" ); } else { addressPort = envAddressPort; } String envAddressUrl = System.getenv("address_server_url" ); if (StringUtils.isBlank(envAddressUrl)) { addressUrl = EnvUtil.getProperty("address.server.url" , EnvUtil.getContextPath() + "/" + "serverlist" ); } else { addressUrl = envAddressUrl; } addressServerUrl = "http://" + domainName + ":" + addressPort + addressUrl; envIdUrl = "http://" + domainName + ":" + addressPort + "/env" ; Loggers.CORE.info("ServerListService address-server port:" + addressPort); Loggers.CORE.info("ADDRESS_SERVER_URL:" + addressServerUrl); } @SuppressWarnings("PMD.UndefineMagicConstantRule") private void run () throws NacosException { boolean success = false ; Throwable ex = null ; int maxRetry = EnvUtil.getProperty("nacos.core.address-server.retry" , Integer.class, 5 ); for (int i = 0 ; i < maxRetry; i++) { try { syncFromAddressUrl(); success = true ; break ; } catch (Throwable e) { ex = e; Loggers.CLUSTER.error("[serverlist] exception, error : {}" , ExceptionUtil.getAllExceptionMsg(ex)); } } if (!success) { throw new NacosException (NacosException.SERVER_ERROR, ex); } GlobalExecutor.scheduleByCommon(new AddressServerSyncTask (), 5_000L ); } @Override public void destroy () throws NacosException { shutdown = true ; } @Override public Map<String, Object> info () { Map<String, Object> info = new HashMap <>(4 ); info.put("addressServerHealth" , isAddressServerHealth); info.put("addressServerUrl" , addressServerUrl); info.put("envIdUrl" , envIdUrl); info.put("addressServerFailCount" , addressServerFailCount); return info; } private void syncFromAddressUrl () throws Exception { RestResult<String> result = restTemplate .get(addressServerUrl, Header.EMPTY, Query.EMPTY, genericType.getType()); if (result.ok()) { isAddressServerHealth = true ; Reader reader = new StringReader (result.getData()); try { afterLookup(MemberUtil.readServerConf(EnvUtil.analyzeClusterConf(reader))); } catch (Throwable e) { Loggers.CLUSTER.error("[serverlist] exception for analyzeClusterConf, error : {}" , ExceptionUtil.getAllExceptionMsg(e)); } addressServerFailCount = 0 ; } else { addressServerFailCount++; if (addressServerFailCount >= maxFailCount) { isAddressServerHealth = false ; } Loggers.CLUSTER.error("[serverlist] failed to get serverlist, error code {}" , result.getCode()); } } class AddressServerSyncTask implements Runnable { @Override public void run () { if (shutdown) { return ; } try { syncFromAddressUrl(); } catch (Throwable ex) { addressServerFailCount++; if (addressServerFailCount >= maxFailCount) { isAddressServerHealth = false ; } Loggers.CLUSTER.error("[serverlist] exception, error : {}" , ExceptionUtil.getAllExceptionMsg(ex)); } finally { GlobalExecutor.scheduleByCommon(this , 5_000L ); } } } }
数据同步 Nacos数据同步分为全量同步和增量同步,所谓全量同步就是初始化数据一次性同步,而增量同步是指有数据增加的时候,只同步增加的数据。
全量同步
全量同步流程比较复杂,流程如上图:
1:启动一个定时任务线程DistroLoadDataTask加载数据,调用load()方 法加载数据
2:调用loadAllDataSnapshotFromRemote()方法从远程机器同步所有的 数据
3:从namingProxy代理获取所有的数据data
4:构造http请求,调用httpGet方法从指定的server获取数据
5:从获取的结果result中获取数据bytes
6:处理数据processData
7:从data反序列化出datumMap
8:把数据存储到dataStore,也就是本地缓存dataMap
9:监听器不包括key,就创建一个空的service,并且绑定监听器
10:监听器listener执行成功后,就更新data store
任务启动 在 com.alibaba.nacos.core.distributed.distro.DistroProtocol 的构造函数中调用 startDistroTask() 方法,该方法会执行startVerifyTask() 和 startLoadTask() ,我们重点关注 startLoadTask() ,该方法代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private void startDistroTask () { if (EnvUtil.getStandaloneMode()) { isInitialized = true ; return ; } startVerifyTask(); startLoadTask(); } private void startLoadTask () { DistroCallback loadCallback = new DistroCallback () { @Override public void onSuccess () { isInitialized = true ; } @Override public void onFailed (Throwable throwable) { isInitialized = false ; } }; GlobalExecutor.submitLoadDataTask( new DistroLoadDataTask (memberManager, distroComponentHolder, distroConfig, loadCallback)); }
数据如何执行加载 上面方法会调用 DistroLoadDataTask 对象,而该对象其实是个线程,因此会执行它的run方法,run方法会调用load()方法实现数据全量加载,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Override public void run () { try { load(); if (!checkCompleted()) { GlobalExecutor.submitLoadDataTask(this , distroConfig.getLoadDataRetryDelayMillis()); } else { loadCallback.onSuccess(); Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success" ); } } catch (Exception e) { loadCallback.onFailed(e); Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. " , e); } } private void load () throws Exception { while (memberManager.allMembersWithoutSelf().isEmpty()) { Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init..." ); TimeUnit.SECONDS.sleep(1 ); } while (distroComponentHolder.getDataStorageTypes().isEmpty()) { Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register..." ); TimeUnit.SECONDS.sleep(1 ); } for (String each : distroComponentHolder.getDataStorageTypes()) { if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) { loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each)); } } }
数据同步 数据同步会通过Http请求从远程服务器获取数据,并同步到当前服务的缓存中,执行流程如下:
1:loadAllDataSnapshotFromRemote()从远程加载所有数据,并处理同步 到本机
2:transportAgent.getDatumSnapshot()远程加载数据,通过Http请求 执行远程加载
3:dataProcessor.processSnapshot()处理数据同步到本地
数据处理完整逻辑代码如下: loadAllDataSnapshotFromRemote() 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private boolean loadAllDataSnapshotFromRemote (String resourceType) { DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType); DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType); if (null == transportAgent || null == dataProcessor) { Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}" , resourceType, transportAgent, dataProcessor); return false ; } for (Member each : memberManager.allMembersWithoutSelf()) { try { Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}" , resourceType, each.getAddress()); DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress()); boolean result = dataProcessor.processSnapshot(distroData); Loggers.DISTRO .info("[DISTRO-INIT] load snapshot {} from {} result: {}" , resourceType, each.getAddress(), result); if (result) { return true ; } } catch (Exception e) { Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed." , resourceType, each.getAddress(), e); } } return false ; }
远程加载数据代码如下: transportAgent.getDatumSnapshot() 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Override public DistroData getDatumSnapshot (String targetServer) { try { byte [] allDatum = NamingProxy.getAllData(targetServer); return new DistroData (new DistroKey ("snapshot" , KeyBuilder.INSTANCE_LIST_KEY_PREFIX), allDatum); } catch (Exception e) { throw new DistroException (String.format("Get snapshot from %s failed." , targetServer), e); } } public static byte [] getAllData(String server) throws Exception { Map<String, String> params = new HashMap <>(8 ); RestResult<String> result = HttpClient.httpGet( "http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL, new ArrayList <>(), params); if (result.ok()) { return result.getData().getBytes(); } throw new IOException ("failed to req API: " + "http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL + ". code: " + result.getCode() + " msg: " + result.getMessage()); }
处理数据同步到本地代码如下: dataProcessor.processSnapshot()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 @Override public boolean processSnapshot (DistroData distroData) { try { return processData(distroData.getContent()); } catch (Exception e) { return false ; } } private boolean processData (byte [] data) throws Exception { if (data.length > 0 ) { Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class); for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) { dataStore.put(entry.getKey(), entry.getValue()); if (!listeners.containsKey(entry.getKey())) { if (switchDomain.isDefaultInstanceEphemeral()) { Loggers.DISTRO.info("creating service {}" , entry.getKey()); Service service = new Service (); String serviceName = KeyBuilder.getServiceName(entry.getKey()); String namespaceId = KeyBuilder.getNamespace(entry.getKey()); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(Constants.DEFAULT_GROUP); service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); RecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek(); if (Objects.isNull(listener)) { return false ; } listener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service); } } } for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) { if (!listeners.containsKey(entry.getKey())) { Loggers.DISTRO.warn("listener of {} not found." , entry.getKey()); continue ; } try { for (RecordListener listener : listeners.get(entry.getKey())) { listener.onChange(entry.getKey(), entry.getValue().value); } } catch (Exception e) { Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}" , entry.getKey(), e); continue ; } dataStore.put(entry.getKey(), entry.getValue()); } } return true ; }
到此实现数据全量同步,其实全量同步最终封装的协议还是Http。
增量同步 新增数据使用异步广播同步:
1:DistroProtocol 使用 sync() 方法接收增量数据
2:向其他节点发布广播任务
调用 distroTaskEngineHolder 发布延迟任务
3:调用 DistroDelayTaskProcessor.process() 方法进行任务投递:
将延迟任务转换为异步变更任务
4:执行变更任务 DistroSyncChangeTask.run() 方法:向指定节点发送 消息
调用 DistroHttpAgent.syncData() 方法发送数据
调用 NamingProxy.syncData() 方法发送数据
5:异常任务调用 handleFailedTask() 方法进行处理
调用 DistroFailedTaskHandler 处理失败任务
调用 DistroHttpCombinedKeyTaskFailedHandler 将失败任务重新 投递成延迟任务。
增量数据入口 我们回到服务注册,服务注册的 InstanceController.register() 就是数据入口,它会调用 ServiceManager.registerInstance() ,执行数据同步的时候,调用 addInstance() ,在该方法中会执行DistroConsistencyServiceImpl.put() ,该方法是增量同步的入口,会调用 distroProtocol.sync() 方法,代码如下:
1 2 3 4 5 6 7 8 9 10 11 @Override public void put(String key, Record value) throws NacosException { // 向本地存储添加一条记录 onPut(key, value); /* * ***后续逻辑看集群模式下的增量数据同步时再往下跟****** * 使用 distro 协议进行数据同步(增量同步) 操作类型是: DataOperation.CHANGE 默认1s后执行 */ distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2); }
sync() 方法会执行任务发布,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void sync(DistroKey distroKey, DataOperation action, long delay) { /* memberManager.allMembersWithoutSelf() 获取集群除自己以外的成员信息 */ for (Member each : memberManager.allMembersWithoutSelf()) { // 创建 DistroKey DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), each.getAddress()); // 创建 DistroDelayTask 任务 DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay); /* 添加延迟任务 执行 查看 addTask 将任务添加到 tasks 队列中 getDelayTaskExecuteEngine返回的是:DistroDelayTaskExecuteEngine addTask 在其父类: NacosDelayTaskExecuteEngine 下一个入口:NacosDelayTaskExecuteEngine 构造 */ distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask); if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress()); } } }
增量同步操作 延迟任务对象我们可以从 DistroTaskEngineHolder 构造函数中得知是DistroDelayTaskProcessor ,代码如下:
1 2 3 4 public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) { DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder); delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor); }
它延迟执行的时候会执行 process 方法,该方法正是执行数据同步的地方,它会执行DistroSyncChangeTask任务,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Override public boolean process(NacosTask task) { if (!(task instanceof DistroDelayTask)) { return true; } // 处理 DistroDelayTask 任务 DistroDelayTask distroDelayTask = (DistroDelayTask) task; DistroKey distroKey = distroDelayTask.getDistroKey(); // 操作类型时 数据的变更 CHANGE if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) { // 封装 最终的 DistroSyncChangeTask 任务 DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder); // 提交去执行 下一个入口:com.alibaba.nacos.core.distributed.distro.task.execute.DistroSyncChangeTask.run distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask); return true; } return false; }
DistroSyncChangeTask 实质上是任务的开始,它自身是一个线程,所以会执行它的run方法,而run方法这是数据同步操作,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 /** * 同步数据查看这里 */ @Override public void run() { Loggers.DISTRO.info("[DISTRO-START] {}", toString()); try { // 获取资源类型 String type = getDistroKey().getResourceType(); DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey()); distroData.setType(DataOperation.CHANGE); // 进行数据同步 *******syncData******** boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer()); if (!result) { // 失败后重试 handleFailedTask(); } Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result); } catch (Exception e) { Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e); // 失败后重试 handleFailedTask(); } }
数据同步会执行调用 syncData ,该方法其实就是通过Http协议将数据发送到其他节点实现数据同步,代码如下:
1 2 3 4 5 6 7 8 9 10 @Override public boolean syncData(DistroData data, String targetServer) { if (!memberManager.hasMember(targetServer)) { return true; } // 获取要同步的数据 byte[] dataContent = data.getContent(); // 使用 NamingProxy 进行同步 return NamingProxy.syncData(dataContent, data.getDistroKey().getTargetServer()); }
最后:一定要跟着讲师所给的源码自行走一遍!!!