目录
1 应用架构演变
2 RPC
3 Dubbo概述
4 Dubbo配置
5 Dubbo协议
6 高可用
7 Dubbo原理
8 Dubbo源码分析
9 Dubbox
10 基于Spring的Dubbo整合
11 常见面试题总结参考目录
· 尚硅谷Dubbo
· CSDN
应用架构演变
单一应用架构
当网站流量很小时,只需一个应用,将所有功能都部署在一起,以减少部署节点和成本。 此时,用于简化增删改查工作量的数据访问框架**(ORM) 是关键**。
垂直应用架构
当访问量逐渐增大,单一应用增加机器带来的加速度越来越小,将应用拆成互不相干的几个应用,以提升效率。 此时,用于加速前端页面开发的Web框架**(MVC) 是关键**。
分布式服务架构
当垂直应用越来越多,应用之间交互不可避免,将核心业务抽取出来,作为独立的服务,逐渐形成稳定的服务中心,使前端应用能更快速的响应多变的市场需求。 此时,用于提高业务复用及整合的分布式服务框架**(RPC) 是关键**。
流动计算架构
当服务越来越多,容量的评估,小服务资源的浪费等问题逐渐显现,此时需增加一个调度中心基于访问压力实时管理集群容量,提高集群利用率。 此时,用于提高机器利用率的资源调度和治理中心**(SOA) 是关键**。
RPC
定义
RPC【Remote Procedure Call】是指远程过程调用,是一种进程间通信方式,他是一种技术的思想,而不是规范。它允许程序调用另一个地址空间(通常是共享网络的另一台机器上)的过程或函数,而不用程序员显式编码这个远程调用的细节。
结构
RPC 服务方通过 RpcServer 去导出(export)远程接口方法,而客户方通过 RpcClient 去引入(import)远程接口方法。客户方像调用本地方法一样去调用远程接口方法,RPC 框架提供接口的代理实现,实际的调用将委托给代理RpcProxy 。代理封装调用信息并将调用转交给RpcInvoker 去实际执行。在客户端的RpcInvoker 通过连接器RpcConnector 去维持与服务端的通道RpcChannel,并使用RpcProtocol 执行协议编码(encode)并将编码后的请求消息通过通道发送给服务方。
具体功能如下:
(1)RpcServer 负责导出(export)远程接口
(2)RpcClient 负责导入(import)远程接口的代理实现
(3)RpcProxy 远程接口的代理实现
(4)RpcInvoker
客户方实现:负责编码调用信息和发送调用请求到服务方并等待调用结果返回
服务方实现:负责调用服务端接口的具体实现并返回调用结果
(5)RpcProtocol 负责协议编/解码
(6)RpcConnector 负责维持客户方和服务方的连接通道和发送数据到服务方
(7)RpcAcceptor 负责接收客户方请求并返回请求结果
(8)RpcProcessor 负责在服务方控制调用过程,包括管理调用线程池、超时时间等
(9)RpcChannel 数据传输通道
工作原理
(1)Client像调用本地服务似的调用远程服务;
(2)Client stub接收到调用后,将方法、参数序列化
(3)客户端通过sockets将消息发送到服务端
(4)Server stub 收到消息后进行解码(将消息对象反序列化)
(5)Server stub 根据解码结果调用本地的服务
(6)本地服务执行(对于服务端来说是本地执行)并将结果返回给Server stub
(7)Server stub将返回结果打包成消息(将结果消息对象序列化)
(8)服务端通过sockets将消息发送到客户端
(9)Client stub接收到结果消息,并进行解码(将结果消息发序列化)
(10)客户端得到最终结果。
**· 关于对stub的理解
** stub:RPC(Remote Procedure Call protocol)的一个重要思想就是使远程调用看起来象当地的调用一样,也就是说调用进程无需知道被调进程具体在哪台机器上执行。Stub就是用来保证此特性的很重要的部分。具体的讲,比如在客户端,一个进程在执行过程中调用到了某个函数fn(),此函数的具体实现是在远程的某台机器上,那么此进程实际上是调用了位于当地机器上的另外一个版本的fn()(起名为c_fn()),此c_fn()就是客户端的一个stub. 对应的,当客户端的消息发送到服务器端时,服务器端也不是把消息直接就交给真正的fn(),而是同样先交给一个不同版本的fn()(起名为s_fn()),此s_fn()就是服务器端的一个stub.
工作方式
(1)同步调用:客户方等待调用执行完成并返回结果。
(2)异步调用:客户方调用后不用等待执行结果返回,但依然可以通过回调通知等方式获取返回结果。若客户方不关心调用返回结果,则变成单向异步调用,单向调用不用返回结果。
Dubbo概述
概念
Dubbo是一个分布式服务框架,致力于提供高性能和透明化的RPC远程服务调用方案,SOA服务治理方案。
简单的说,dubbo就是个服务框架,如果没有分布式的需求,其实是不需要用的,只有在分布式的时候,才有dubbo这样的分布式服务框架的需求,并且本质上是个服务调用,说白了就是个远程服务调用的分布式框架。
三大核心能力
(1)远程通讯,提供对多种基于长连接的NiO框架抽象封装,包括多种线程模型,序列化,以及“请求一响应”模式的信息交换方式。
(2)集群容错: 提供基于接口方法的透明远程过程调用,包括多协议支持。以及负载均衡,失败容错,地址路由,动态配置等集群支持。
(3)自动发现:基于注册中心目录服务,使用服务消费能动态查找服务提供方,使地址透明,使用服务提供方可以平滑增加或减少服务器
架构
(1)服务提供者(Provider):暴露服务的服务提供方,服务提供者在启动时,向注册中心注册自己提供的服务。
(2)服务消费者(Consumer)**:调用远程服务的服务消费方,服务消费者在启动时,向注册中心订阅自己所需的服务,服务消费者,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行调用,如果调用失败,再选另一台调用。
(3)注册中心(Registry):注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者
(4)监控中心(Monitor):服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心
调用关系
(1)服务容器负责启动,加载,运行服务提供者。
(2)服务提供者在启动时,向注册中心注册自己提供的服务。
(3)服务消费者在启动时,向注册中心订阅自己所需的服务。
(4)注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者。
(5)服务消费者,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行调用,如果调用失败,再选另一台调用。
(6)服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心。
注册中心
(1)可选方案:zookeeper、Redis
(2)建议使用dubbo-2.3.3以上版本的使用zookeeper注册中心客户端
·Zookeeper是Apache Hadoop的子项目,强度相对较好,建议生产环境使用该注册中心。
· Dubbo未对Zookeeper服务器端做任何侵入修改,只需安装原生的Zookeeper服务器即可, 所有注册中心逻辑适配都在调用Zookeeper客户端时完成
· Dubbo使用ZooKeeper发布服务时,使用的是ZooKeeper的持久节点
具体如何通过Zookeeper和Redis构建,参考:Dubbo(一)——Dubbo及注册中心原理_baoyu_G的博客-CSDN博客_dubbo注册
Dubbo的特性
(1)健状性:
· 监控中心宕掉不影响使用,只是丢失部分采样数据
· 数据库宕掉后,注册中心仍能通过缓存提供服务列表查询,但不能注册新服务
· 注册中心对等集群,任意一台宕掉后,将自动切换到另一台
· 注册中心全部宕掉后,服务提供者和服务消费者仍能通过本地缓存通讯
· 服务提供者无状态,任意一台宕掉后,不影响使用
· 服务提供者全部宕掉后,服务消费者应用将无法使用,并无限次重连等待服务提供者恢复
(2)伸缩性:
注册中心为对等集群,可动态增加机器部署实例,所有客户端将自动发现新的注册中心
服务提供者无状态,可动态增加机器部署实例,注册中心将推送新的服务提供者信息给消费者
(3)升级性:
当服务集群规模进一步扩大,带动IT治理结构进一步升级,需要实现动态部署,进行流动计算,现有分布式服务架构不会带来阻力。
Dubbo-admin管理平台
(1)图形化的服务管理页面;安装时需要指定注册中心地址,即可从注册中心中获取到所有的提供者/消费者进行配置管理,从而进行服务治理。
(2)主要包含
·路由规则
·动态配置
·服务降级
·访问控制
·权重调整 负载均衡等管理功能
Dubbo配置
配置原则
· JVM启动 -D 参数优先,这样可以使用户在部署和启动时进行参数重写,比如在启动时需改变协议的端口。
· XML次之,如果在 XML 中有配置,则 dubbo.properties 中的相应配置项无效。
· Properties最后,相当于缺省值,只有 XML 没有配置时,dubbo.properties 的相应配置项才会生效,通常用于共享公共配置,比如应用名。
重试次数
失败自动切换,当出现失败,重试其它服务器,但重试会带来更长延迟。可通过retries=”2”来设置重试次数(不含第一次)。
· 重试次数配置如下:
<dubbo:service retries="2" />
或
<dubbo:reference retries="2" />
或
<dubbo:reference>
<dubbo:method name="findFoo" retries="2" />
</dubbo:reference>
超时时间
由于网络或服务端不可靠,会导致调用出现一种不确定的中间状态(超时)。为了避免超时导致客户端资源(线程)挂起耗尽,必须设置超时时间。
(1)Dubbo消费端
全局超时配置
<dubbo:consumer timeout="5000" />
指定接口以及特定方法超时配置
<dubbo:reference interface="com.foo.BarService" timeout="2000">
<dubbo:method name="sayHello" timeout="3000" />
</dubbo:reference>
(2)Dubbo服务端
全局超时配置
<dubbo:provider timeout="5000" />
指定接口以及特定方法超时配置
<dubbo:provider interface="com.foo.BarService" timeout="2000">
<dubbo:method name="sayHello" timeout="3000" />
</dubbo:provider>
(3)配置原则
dubbo推荐在Provider上尽量多配置Consumer端属性:
1、作服务的提供者,比服务使用方更清楚服务性能参数,如调用的超时时间,合理的重试次数,等等
2、在Provider配置后,Consumer不配置则会使用Provider的配置值,即Provider配置可以作为Consumer的缺省值。否则,Consumer会使用Consumer端的全局设置,这对于Provider不可控的,并且往往是不合理的
配置的覆盖规则:
· 方法级配置别优于接口级别,即小Scope优先
· Consumer端配置 优于 Provider配置 优于 全局配置,
· 最后是Dubbo Hard Code的配置值(见配置文档)
版本号
(1)当一个接口实现,出现不兼容升级时,可以用版本号过渡,版本号不同的服务相互间不引用。
(2)可以按照以下的步骤进行版本迁移:
· 在低压力时间段,先升级一半提供者为新版本
· 再将所有消费者升级为新版本
· 然后将剩下的一半提供者升级为新版本
·老版本服务提供者配置:
<dubbo:service interface="com.foo.BarService" version="1.0.0" />
·新版本服务提供者配置:
<dubbo:service interface="com.foo.BarService" version="2.0.0" />
·老版本服务消费者配置:
<dubbo:reference id="barService" interface="com.foo.BarService" version="1.0.0" />
·新版本服务消费者配置:
<dubbo:reference id="barService" interface="com.foo.BarService" version="2.0.0" />
·如果不需要区分版本,可以按照以下的方式配置:
<dubbo:reference id="barService" interface="com.foo.BarService" version="*" />
Dubbo协议
Dubbo协议
Dubbo缺省协议采用单一长连接和NIO异步通讯。
适合于小数据量大并发的服务调用,以及服务消费者机器数远大于服务提供者机器数的情况。Dubbo缺省协议不适合传送大数据量的服务,比如传文件,传视频等,除非请求量很低。
Hessian协议
(1)Hessian协议用于集成Hessian的服务,Hessian底层采用Http通讯,采用Servlet暴露服务,Dubbo缺省内嵌Jetty作为服务器实现。Hessian是Caucho开源的一个RPC框架:
(2)基于Hessian的远程调用协议特点
· 连接个数:多连接
· 连接方式:短连接
· 传输协议:HTTP
· 传输方式:同步传输
· 序列化:Hessian二进制序列化
· 适用范围:传入传出参数数据包较大,提供者比消费者个数多,提供者压力较大,可传文件。
· 适用场景:页面传输,文件传输,或与原生hessian服务互操作
HTTP协议
(1)此协议采用spring的HttpInvoker的功能实现
(2)特点
· 连接个数:多个
· 连接方式:长连接
· 连接协议:http
· 传输方式:同步传输
· 序列化:表单序列化
· 适用范围:传入传出参数数据包大小混合,提供者比消费者个数多,可用浏览器查看,可用表单或URL传入参数,暂不支持传文件。
· 适用场景:需同时给应用程序和浏览器JS使用的服务。
RMI协议
(1)采用JDK标准的java.rmi.*实现,采用阻塞式短连接和JDK标准序列化方式
(2)Java标准的远程调用协议:
· 连接个数:多连接
· 连接方式:短连接
· 传输协议:TCP
· 传输方式:同步传输
· 序列化:Java标准二进制序列化
· 适用范围:传入传出参数数据包大小混合,消费者与提供者个数差不多,可传文件。
· 适用场景:常规远程服务方法调用,与原生RMI服务互操作
webservice
基于WebService的远程调用协议,集成CXF实现,提供和原生WebService的互操作。多个短连接,基于HTTP传输,同步传输,适用系统集成和跨语言调用;
高可用
ZooKeeper宕机与Dubbo直连
(1)现象:zookeeper注册中心宕机,还可以消费dubbo暴露的服务。
(2)原因:健壮性
集群下dubbo负载均衡配置
(1)在集群负载均衡时,Dubbo提供了多种均衡策略,缺省为 random 随机调用
(2)负载均衡策略
· Random LoadBalance 随机,按权重设置随机概率。
在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。
· RoundRobin LoadBalance 轮循,按公约后的权重设置轮循比率。
存在慢的提供者累积请求的问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。
· LeastActive LoadBalance 最少活跃调用数,相同活跃数的随机
活跃数指调用前后计数差。
使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。
· ConsistentHash LoadBalance 一致性Hash,相同参数的请求总是发到同一提供者
当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。
<!-- 缺省只对第一个参数Hash,如果要修改,请配置 -->
<dubbo:parameter key="hash.arguments" value="0,1" />
<!-- 缺省用160 份虚拟节点,如果要修改,请配置 -->
<dubbo:parameter key="hash.nodes" value="320" />
整合Hystrix
服务降级
(1)概念
当服务器压力剧增的情况下,根据实际业务情况及流量,对一些服务和页面有策略的不处理或换种简单的方式处理,从而释放服务器资源以保证核心交易正常运作或高效运作。
可以通过服务降级功能临时屏蔽某个出错的非关键服务,并定义降级后的返回策略。
(2)向注册中心写入动态配置覆盖规则:
RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
Registry registry = registryFactory.getRegistry(URL.valueOf("zookeeper://10.20.153.10:2181"));
registry.register(URL.valueOf("override://0.0.0.0/com.foo.BarService?category=configurators&dynamic=false&application=foo&**mock=force:return+null**"));
其中:
· mock=force:return+null表示消费方对该服务的方法调用都直接返回 null 值,不发起远程调用。用来屏蔽不重要服务不可用时对调用方的影响。
· 还可以改为mock=fail:return+null表示消费方对该服务的方法调用在失败后,再返回 null 值,不抛异常。用来容忍不重要服务不稳定时对调用方的影响。
集群容错
在集群调用失败时,Dubbo提供了多种容错方案,缺省为 failover 重试。
· 集群容错模式
(1)Failover Cluster
失败自动切换,当出现失败,重试其它服务器。通常用于读操作,但重试会带来更长延迟。可通过retries=”2” 来设置重试次数(不含第一次)。
重试次数配置如下:
<dubbo:service retries="2" />
或
<dubbo:reference retries="2" />
或
<dubbo:reference>
<dubbo:method name="findFoo" retries="2" />
</dubbo:reference>
(2)Failfast Cluster
快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。
(3)Failsafe Cluster
失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。
(4)Failback Cluster
失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。
(5)Forking Cluster
并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过forks=”2” 来设置最大并行数。
(6)Broadcast Cluster
广播调用所有提供者,逐个调用,任意一台报错则报错[2]。通常用于通知所有提供者更新缓存或日志等本地资源信息。
· 集群模式配置
按照以下示例在服务提供方和消费方配置集群模式
<dubbo:service cluster="failsafe" />
或
<dubbo:reference cluster="failsafe" />
整合hystrix
Hystrix旨在通过控制那些访问远程系统、服务和第三方库的节点,从而对延迟和故障提供更强大的容错能力。Hystrix具备拥有回退机制和断路器功能的线程和信号隔离,请求缓存和请求打包,以及监控和配置等功能
(1)配置spring-cloud-starter-netflix-hystrix
spring boot官方提供了对hystrix的集成,直接在pom.xml里加入依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
<version>1.4.4.RELEASE</version>
</dependency>
然后在Application类上增加@EnableHystrix来启用hystrix starter:
@SpringBootApplication
@EnableHystrix
public class ProviderApplication {
}
(2)配置Provider端
在Dubbo的Provider上增加@HystrixCommand配置,这样子调用就会经过Hystrix代理。
@Service(version = "1.0.0")
public class HelloServiceImpl implements HelloService {
@HystrixCommand(commandProperties = {
@HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "10"),
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "2000") })
@Override
public String sayHello(String name) {
// System.out.println("async provider received: " + name);
// return "annotation: hello, " + name;
throw new RuntimeException("Exception to show hystrix enabled.");
}
}
(3)配置Consumer端
对于Consumer端,则可以增加一层method调用,并在method上配置@HystrixCommand。当调用出错时,会走到fallbackMethod = “reliable”的调用里。
@Reference(version = "1.0.0")
private HelloService demoService;
@HystrixCommand(fallbackMethod = "reliable")
public String doSayHello(String name) {
return demoService.sayHello(name);
}
public String reliable(String name) {
return "hystrix fallback value";
}
Dubbo原理
是一个RPC远程调用框架
具体可见第2小节
Netty
Dubbo底层采用Netty作为网络通信。
概念
Netty是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。它极大地简化并简化了TCP和UDP套接字服务器等网络编程。
同时是一个基于JAVA NIO 类库的异步通信框架,它的架构特点是:异步非阻塞、基于事件驱动、高性能、高可靠性和高可定制性。
应用场景
(1)分布式开源框架中dubbo、Zookeeper,RocketMQ底层rpc通讯使用就是netty。
(2)游戏开发中,底层使用netty通讯。
TCP粘包和拆包
(1)概念
一个完整的业务可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这个就是TCP的拆包和封包问题。
(2)由于网络的复杂性,可能数据会被分离成N多个复杂的拆包/粘包的情况,所以在做TCP服务器的时候就需要首先解决该问题。
· 消息定长,报文大小固定长度,不够空格补全,发送和接收方遵循相同的约定,这样即使粘包了通过接收方编程实现获取定长报文也能区分。
sc.pipeline().addLast(new FixedLengthFrameDecoder(10));
· 包尾添加特殊分隔符,例如每条报文结束都添加回车换行符(例如FTP协议)或者指定特殊字符作为报文分隔符,接收方通过特殊分隔符切分报文区分。
ByteBuf buf = Unpooled.*copiedBuffer*("_mayi".getBytes());
sc.pipeline().addLast(**new** DelimiterBasedFrameDecoder(1024, buf));
· 将消息分为消息头和消息体,消息头中包含表示信息的总长度(或者消息体长度)的字段
序列化协议
(1)一种序列化协议就是Java默认提供的序列化机制,需要序列化的Java对象只需要实现 Serializable / Externalizable 接口并生成序列化ID,这个类就能够通过 ObjectInput 和 ObjectOutput 序列化和反序列化
(2)XML
· 定义:
XML(Extensible Markup Language)是一种常用的序列化和反序列化协议, 它历史悠久,从1998年的1.0版本被广泛使用至今。
· 优点
· 人机可读性好
· 可指定元素或特性的名称
· 缺点
· 序列化数据只包含数据本身以及类的结构,不包括类型标识和程序集信息。
· 类必须有一个将由XmlSerializer序列化的默认构造函数。
· 只能序列化公共属性和字段
· 不能序列化方法
· 文件庞大,文件格式复杂,传输占带宽
· 使用场景
· 当做配置文件存储数据
· 实时数据转换
(3)JSON
· 定义:
JSON(JavaScript Object Notation, JS对象标记) 是一种轻量级的数据交换格式。它基于 ECMAScript (w3c制定的js规范)的一个子集, JSON采用与编程语言无关的文本格式,但是也使用了类C语言(包括C, C++, C#, Java, JavaScript, Perl, Python等)的习惯,简洁和清晰的层次结构使得 JSON 成为理想的数据交换语言。
· 优点
· 前后兼容性高
· 数据格式比较简单,易于读写
· 序列化后数据较小,可扩展性好,兼容性好
· 与XML相比,其协议比较简单,解析速度比较快
· 缺点
· 数据的描述性比XML差
· 不适合性能要求为ms级别的情况
· 额外空间开销比较大
· 适用场景(可替代XML)
· 跨防火墙访问
· 可调式性要求高的情况
· 基于Web browser的Ajax请求
· 传输数据量相对小,实时性要求相对低(例如秒级别)的服务
(4)Fastjson
· 定义
Fastjson是一个Java语言编写的高性能功能完善的JSON库。它采用一种“假定有序快速匹配”的算法,把JSON Parse的性能提升到极致。
· 优点
· 接口简单易用
· 目前java语言中最快的json库
· 缺点
· 过于注重快,而偏离了“标准”及功能性
· 代码质量不高,文档不全
· 适用场景
· 协议交互
· Web输出
· Android客户端
(5)Thrift
· 定义:
Thrift并不仅仅是序列化协议,而是一个RPC框架。它可以让你选择客户端与服务端之间传输通信协议的类别,即文本(text)和二进制(binary)传输协议, 为节约带宽,提供传输效率,一般情况下使用二进制类型的传输协议。
· 优点
· 序列化后的体积小,速度快
· 支持多种语言和丰富的数据类型
· 对于数据字段的增删具有较强的兼容性
· 支持二进制压缩编码
· 缺点
· 使用者较少
· 跨防火墙访问时,不安全
· 不具有可读性,调试代码时相对困难
· 不能与其他传输层协议共同使用(例如HTTP)
· 无法支持向持久层直接读写数据,即不适合做数据持久化序列化协议
· 适用场景
· 分布式系统的RPC解决方案
(6)Avro
· 定义:
Avro属于Apache Hadoop的一个子项目。 Avro提供两种序列化格式:JSON格式或者Binary格式。Binary格式在空间开销和解析性能方面可以和Protobuf媲美,Avro的产生解决了JSON的冗长和没有IDL的问题
· 优点
· 支持丰富的数据类型
· 简单的动态语言结合功能
· 具有自我描述属性
· 提高了数据解析速度
· 快速可压缩的二进制数据形式
· 可以实现远程过程调用RPC
· 支持跨编程语言实现
· 缺点
· 对于习惯于静态类型语言的用户不直观
· 适用场景
在Hadoop中做Hive、Pig和MapReduce的持久化数据格式
(7)Protobuf
· 定义
protocol buffers由谷歌开源而来,在谷歌内部久经考验。它将数据结构以.proto文件进行描述,通过代码生成工具可以生成对应数据结构的POJO对象和Protobuf相关的方法和属性。
· 优点
· 序列化后码流小,性能高
· 结构化数据存储格式(XML JSON等)
· 通过标识字段的顺序,可以实现协议的前向兼容
· 结构化的文档更容易管理和维护
· 缺点
· 需要依赖于工具生成代码
· 支持的语言相对较少,官方只支持Java、C++ 、Python
· 适用场景
· 对性能要求高的RPC调用
· 具有良好的跨防火墙的访问属性
· 适合应用层对象的持久化
(8)其它
· protostuff基于protobuf协议,但不需要配置proto文件,直接导包即
· Jboss marshaling可以直接序列化java类, 无须实java.io.Serializable接口
· Message pack一个高效的二进制序列化格式
· Hessian采用二进制协议的轻量级remoting onhttp工具
· kryo基于protobuf协议,只支持java语言,需要注册(Registration),然后序列化(Output),反序列化(Input)
Netty代码
3.3.0版本
· 依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>3.3.0.Final</version>
</dependency>
· 服务端
class ServerHandler extends SimpleChannelHandler {
/**
\* 通道关闭的时候触发
*/
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
System.out.println("channelClosed");
}
/**
\* 必须是连接已经建立,关闭通道的时候才会触发.
*/
@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
super.channelDisconnected(ctx, e);
System.out.println("channelDisconnected");
}
/**
\* 捕获异常
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
super.exceptionCaught(ctx, e);
System.out.println("exceptionCaught");
}
/**
\* 接受消息
*/
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
super.messageReceived(ctx, e);
// System.out.println("messageReceived");
System.out.println("服务器端收到客户端消息:"+e.getMessage());
//回复内容
ctx.getChannel().write("好的");
}
}
// **netty 服务器端**
public class NettyServer {
public static void main(String[] args) {
// 创建服务类对象
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 创建两个线程池 分别为监听监听端口 ,nio监听
ExecutorService boos = Executors.newCachedThreadPool();
ExecutorService worker = Executors.newCachedThreadPool();
// 设置工程 并把两个线程池加入中
serverBootstrap.setFactory(new NioServerSocketChannelFactory(boos, worker));
// 设置管道工厂
serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
//将数据转换为string类型.
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("serverHandler", new ServerHandler());
return pipeline;
}
});
// 绑定端口号
serverBootstrap.bind(new InetSocketAddress(9090));
System.out.println("netty server启动....");
}
}
· 客户端
class ClientHandler extends SimpleChannelHandler {
/**
\* 通道关闭的时候触发
*/
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
System.out.println("channelClosed");
}
/**
\* 必须是连接已经建立,关闭通道的时候才会触发.
*/
@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
super.channelDisconnected(ctx, e);
System.out.println("channelDisconnected");
}
/**
\* 捕获异常
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
super.exceptionCaught(ctx, e);
System.out.println("exceptionCaught");
}
/**
\* 接受消息
*/
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
super.messageReceived(ctx, e);
// System.out.println("messageReceived");
System.out.println("服务器端向客户端回复内容:"+e.getMessage());
//回复内容
// ctx.getChannel().write("好的");
}
}
// **Netty客户端**
public class NettyClient {
public static void main(String[] args) {
System.out.println("netty client启动...");
// 创建客户端类
ClientBootstrap clientBootstrap = new ClientBootstrap();
// 线程池
ExecutorService boos = Executors.newCachedThreadPool();
ExecutorService worker = Executors.newCachedThreadPool();
clientBootstrap.setFactory(new NioClientSocketChannelFactory(boos, worker));
clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
// 将数据转换为string类型.
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("clientHandler", new ClientHandler());
return pipeline;
}
});
//连接服务端
ChannelFuture connect = clientBootstrap.connect(new InetSocketAddress("127.0.0.1", 9090));
Channel channel = connect.getChannel();
System.out.println("client start");
Scanner scanner= new Scanner(System.in);
while (true) {
System.out.println("请输输入内容...");
channel.write(scanner.next());
}
}
}
5.0版本
· 依赖
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>
· 服务端
class ServerHandler extends ChannelHandlerAdapter {
/**
\* 当通道被调用,执行该方法
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 接收数据
String value = (String) msg;
System.out.println("Server msg:" + value);
// 回复给客户端 “您好!”
String res = "好的...";
ctx.writeAndFlush(Unpooled.copiedBuffer(res.getBytes()));
}
}
// Netty服务端**
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
System.out.println("服务器端已经启动....");
// 1.创建2个线程,一个负责接收客户端连接, 一个负责进行 传输数据
NioEventLoopGroup pGroup = new NioEventLoopGroup();
NioEventLoopGroup cGroup = new NioEventLoopGroup();
// 2. 创建服务器辅助类
ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
// 3.设置缓冲区与发送区大小
.option(ChannelOption.SO_SNDBUF, 32 * 1024).option(ChannelOption.SO_RCVBUF, 32 * 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new StringDecoder());
sc.pipeline().addLast(new ServerHandler());
}
});
ChannelFuture cf = b.bind(8080).sync();
cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
}
//客户端
class ClientHandler extends ChannelHandlerAdapter {
/**
\* 当通道被调用,执行该方法
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 接收数据
String value = (String) msg;
System.out.println("client msg:" + value);
}
}
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
System.out.println("客户端已经启动....");
// 创建负责接收客户端连接
NioEventLoopGroup pGroup = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(pGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new StringDecoder());
sc.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture cf = b.connect("127.0.0.1", 8080).sync();
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("itmayiedu".getBytes()));
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("itmayiedu".getBytes()));
// 等待客户端端口号关闭
cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
}
}
SPI
Java SPI
概述
(1)概念
SPI全称(service provider interface),是JDK内置的一种服务提供发现机制。目前市面上有很多框架都是用它来做服务的扩展发现,大家耳熟能详的如JDBC、日志框架都有用到;
简单来说,它是一种动态替换发现的机制。
(2)举个简单的例子,
· 如果我们定义了一个规范,需要第三方厂商去实现,
· 那么对于我们应用方来说,只需要集成对应厂商的插件,既可以完成对应规范的实现机制。
· 形成一种插拔式的扩展手段。
SPI规范
(1)需要在classpath下创建一个目录,该目录命名必须是:META-INF/services
(2)在该目录下创建一个properties文件,该文件需要满足以下几个条件
· 文件名必须是扩展的接口的全路径名称
· 文件内部描述的是该扩展接口的所有实现类
· 文件的编码格式是UTF-8
(3)通过java.util.ServiceLoader的加载机制来发现
SPI实例
JDK官方提供了java.sql.Driver这个驱动扩展点,但是你们并没有看到JDK中有对应的Driver实现。
以连接Mysql为例,我们需要添加mysql-connector-java依赖。你们可以在这个jar包中找到SPI的配置信息。所以java.sql.Driver由各个数据库厂商自行实现。
SPI的缺点
(1)JDK标准的SPI会一次性加载实例化扩展点的所有实现
就是如果你在META-INF/service下的文件里面加了N个实现类,那么JDK启动的时候都会一次性全部加载。
那么如果有的扩展点实现初始化很耗时或者如果有些实现类并没有用到,那么会很浪费资源
(2)如果扩展点加载失败,会导致调用方报错,而且这个错误很难定位到是这个原因
Dubbo SPI
(1)Dubbo扩展的新特性
· 内嵌在dubbo中
· 支持通过SPI文件声明扩展实现(interfce必须有@SPI注解),
格式为extensionName=extensionClassName,extensionName类似于spring的beanName
· 支持通过配置指定extensionName来从SPI文件中选出对应实现
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("defineProtocol")
通过配置文件中的权限定名,加载实现类。在运行时,可以动态为接口替换实现类。
(2)源码分析
ServiceConfig类中的一行代码:
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class). getAdaptiveExtension();
· Protocol
一个是在类级别上的@SPI(“dubbo”),@SPI 表示当前这个接口是一个扩展点,可以实现自己的扩展实现,默认的扩展点是DubboProtocol。
另一个是@Adaptive,表示一个自适应扩展点,在方法级别上,会动态生成一个适配器类。
@SPI("dubbo")
publicinterfaceProtocol{
/**
\* 获取缺省端口,当用户没有配置端口时使用。
*
*@return缺省端口
*/
intgetDefaultPort();
/**
\* 暴露远程服务:<br>
\* 1. 协议在接收请求时,应记录请求来源方地址信息:RpcContext.getContext().setRemoteAddress();<br>
\* 2. export()必须是幂等的,也就是暴露同一个URL的Invoker两次,和暴露一次没有区别。<br>
\* 3. export()传入的Invoker由框架实现并传入,协议不需要关心。<br>
*
*@param 服务的类型
*@paraminvoker 服务的执行体
*@returnexporter 暴露服务的引用,用于取消暴露
*@throwsRpcException 当暴露服务出错时抛出,比如端口已占用
*/
@Adaptive
Exporterexport(Invoker<T> invoker)throwsRpcException;
/**
\* 引用远程服务:<br>
\* 1. 当用户调用refer()所返回的Invoker对象的invoke()方法时,协议需相应执行同URL远端export()传入的Invoker对象的invoke()方法。<br>
\* 2. refer()返回的Invoker由协议实现,协议通常需要在此Invoker中发送远程请求。<br>
\* 3. 当url中有设置check=false时,连接失败不能抛出异常,并内部自动恢复。<br>
*
*@param 服务的类型
*@paramtype 服务的类型
*@paramurl 远程服务的URL地址
*@returninvoker 服务的本地代理
*@throwsRpcException 当连接服务提供方失败时抛出
*/
@Adaptive
Invokerrefer(Class<T> type, URL url)throwsRpcException;
/**
\* 释放协议:<br>
\* 1. 取消该协议所有已经暴露和引用的服务。<br>
\* 2. 释放协议所占用的所有资源,比如连接和端口。<br>
\* 3. 协议在释放后,依然能暴露和引用新的服务。<br>
*/
voiddestroy();
}
· 方法调用
Dubbo源码分析
框架设计
总体分为Business、RPC和Remoting三层设计
· Service服务接口层:该层是与实际业务逻辑相关的,根据服务提供方和服务消费方的业务设计对应的接口和实现。
· config配置层:对外配置接口,以 ServiceConfig, ReferenceConfig 为中心,可以直接初始化配置类,也可以通过 spring 解析配置生成配置类
· proxy服务代理层:服务接口透明代理,生成服务的客户端 Stub 和服务器端 Skeleton, 以 ServiceProxy 为中心,扩展接口为 ProxyFactory
· registry注册中心层:封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为 RegistryFactory, Registry, RegistryService
· cluster路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口为 Cluster, Directory, Router, LoadBalance
· monitor监控层:RPC 调用次数和调用时间监控,以 Statistics 为中心,扩展接口为 MonitorFactory, Monitor, MonitorService
· protocol远程调用层:封装 RPC 调用,以 Invocation, Result 为中心,扩展接口为 Protocol, Invoker, Exporter
· 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
· 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec
· serialize数据序列化层:可复用的一些工具,扩展接口为 Serialization, ObjectInput, ObjectOutput, ThreadPool
dubbo原理-启动解析、加载配置信息
(1)解析XML
在dubboNamespaceHandler中将<dubbo:provider>
等解析为对应的xxxConfig对象.
(2)标签内容解析,封装为beanDefinition对象
dubbo原理 -服务暴露
Invoker是用户接口的代理对象实例,会经过层层包装。
在使用Protocol时,会调用两个protocol。一个是DubboProtocol,其对应的DubboExporter打开Nttey服务器,监听对应的服务提供者端口;一个是RegistryProtocol,其对应的RegistryExporter对将提供者地址(服务器地址,如http://127.0.0.1:20880)作为key,实例化的Invoker(具体服务接口的实现serviceImpl)作为value添加到注册表中,并向Zookeeper注册中心注册服务(添加节点信息)。
dubbo原理 -服务引用
首先ReferenceConfig类的init方法调用Protocol的refer方法生成Invoker实例,这是服务消费的关键。接下来把Invoker转换为客户端需要的接口(如:HelloWorld)。
dubbo原理 -服务调用
集群容错模式选择:
Dubbox
基于Spring的Dubbo整合
依赖
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo</artifactId>
<version>2.5.6</version>
</dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
Provider
配置文件provider.xml
<?xml version="1.0" encoding="utf-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd ">
<!-- 提供方应用信息,用于计算依赖关系 -->
<dubbo:application name="provider" />
<!-- 使用zookeeper注册中心暴露服务地址 -->
<dubbo:registry address="zookeeper://127.0.0.1:2181" />
<!-- 用dubbo协议在29014端口暴露服务 -->
<dubbo:protocol name="dubbo" port="29014" />
<!-- 声明需要暴露的服务接口 -->
<dubbo:service interface="com.itmayiedu.service.UserService"
ref="orderService" />
<!-- 具体的实现bean -->
<bean id="orderService" class="com.itmayiedu.service.impl.UserServiceImpl" />
</beans>
Service
public class UserServiceImpl implements UserService {
public String getList(Integer id) {
System.out.println("客户端有人来消费了....");
if (id==1) {
return "我";
}
if (id==2) {
return "扎克伯格";
}
if (id==3) {
return "马化腾";
}
return "没有找到";
}
}
启动
public class TestMember {
public static void main(String[] args) throws IOException {
// 发布服务
ClassPathXmlApplicationContext app = new ClassPathXmlApplicationContext("provider.xml");
app.start();// 加载
System.out.println("服务发布成功...");
System.in.read(); // 让程序阻塞
}
}
Consumer
配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<!-- 消费方应用名,用于计算依赖关系,不是匹配条件,不要与提供方一样 -->
<dubbo:application name="consumer" />
<!-- 使用multicast广播注册中心暴露发现服务地址 -->
<dubbo:registry protocol="zookeeper" address="zookeeper://127.0.0.1:2181" />
<!-- 生成远程服务代理,可以和本地bean一样使用demoService -->
<dubbo:reference id="userService" interface="com.itmayiedu.service.UserService" />
</beans>
启动
ClassPathXmlApplicationContext app = new ClassPathXmlApplicationContext("consumer.xml");
UserService userService = (UserService) app.getBean("userService");
String name = userService.getList(1);
System.out.println("name:" + name);