目录
1 概述
2 JMS模式
3 JMS创建通用步骤
4 JMS消息可靠性
5 ActiveMQ作用
6 协议
7 可持久化
8 ActiveMQ部署模式
9 高级特性参考资料
· 有道云笔记
· SCDN
概述
背景
在客户端与服务器进行通讯时.客户端调用后,必须等待服务对象完成处理返回结果才能继续执行。客户与服务器对象的生命周期紧密耦合,客户进程和服务对象进程都必须正常运行;如果由于服务对象崩溃或者网络故障导致用户的请求不可达,客户会受到异常。
实质
ActiveMQ 是一个 MOM(消息中间件),具体来说是一个实现了 JMS 规范的系统间远程通信的消息代理。其他实现JMS规范的还有kafka、RabbitMQ、RockitMQ等。
ActiveMQ解决了耦合调用、异步模型、抵御洪峰流量,保护了主业务,消峰。
MOM
MOM 就是面向消息中间件(Message-oriented middleware),是用于以分布式应用或系统中的异步、松耦合、可靠、可扩展和安全通信的一类软件。MOM 的总体思想是它作为消息发送器和消息接收器之间的消息中介,这种中介提供了一个全新水平的松耦合。
JMS
概念
JMS 叫做 Java 消息服务(Java Message Service),是 Java 平台上有关面向 MOM 的技术规范,旨在通过提供标准的产生、发送、接收和处理消息的 API 简化企业应用的开发,类似于 JDBC 和关系型数据库通信方式的抽象。
部件
JMS provider
实现JMS 的消息中间件,也就是MQ服务器。
JMS producer
消息生产者,创建和发送消息的客户端
JMS consumer
消息消费者,接收和处理消息的客户端
JMS message
JMS 消息,分为消息头、消息属性、消息体
(1)消息头
· JMSDestination 寻址对象
· JMSDeliveryMode 是否持久
· JMSExpiration 过期时间
· JMSPriority 优先级,默认是4,有0~9 共10个等级,5-9 是紧急的,0-4 是普通的
· JMSMessageId 唯一的消息ID
(2)消息体
发送和接收的消息类型必须一致
· TextMessage 普通字符串消息,包含一个String
· Mapmessage Map 类型的消息,k-> String ,v -> Java 基本类型
· BytesMessage 二进制数组消息,包含一个byte[]
· StreamMessage Java 数据流消息,用标准流操作来顺序的填充读取
· ObjectMessage 对象消息,包含一个可序列化的Java 对象
(3)消息属性:识别、去重、重点标注
其他概念
(1)Domains:消息传递方式,包括点对点(P2P)、发布/订阅(Pub/Sub)两种
(2)Connection factory:客户端使用连接工厂来创建与 JMS provider 的连接
(3)Destination:消息被寻址、发送以及接收的对象,包括queue(点对点模式)和topic(发布订阅模式)
(4)broker 就是实现了用代码形式启动 ActiveMQ 将 MQ 内嵌到 Java 代码中,可以随时启动,节省资源,提高了可靠性。即将 MQ 服务器作为了 Java 对象
JMS模式
点对点模式(P2P)
(1)使用queue作为Destination
(2)消息接收方式
· 依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
· 生产者
public class Producter {
public static void main(String[] args) throws JMSException {
// ConnectionFactory:连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.*DEFAULT_USER*,
ActiveMQConnection.*DEFAULT_PASSWORD*, "tcp://127.0.0.1:61616");
// JMS客户端到JMS Provider 的连接
Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一个发送或接收消息的线程
Session session = connection.createSession(Boolean.falst, Session.*AUTO_ACKNOWLEDGE*);
// Destination:消息的目的地;消息发送给谁.
//获取session注意参数值my-queue是Query的名字
Destination destination = session.createQueue("my-queue");
// MessageProducer:消息生产者
MessageProducer producer = session.createProducer(destination);
//设置不持久化
producer.setDeliveryMode(DeliveryMode.*NON_PERSISTENT*);
//发送一条消息
for (int i = 1; i <= 5; i++) {
sendMsg(session, producer, i);
}
session.commit();
connection.close();
}
/**
*在指定的会话上,通过指定的消息生产者发出一条消息
*
\* @param session
*消息会话
\* @param producer
*消息生产者
*/
public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
//创建一条文本消息
TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);
//通过消息生产者发出消息
producer.send(message);
}
}
· 同步:Consumer 可以使用 MessageConsumer.receive() 同步地接收消息
public class JmsReceiver {
public static void main(String[] args) throws JMSException {
// ConnectionFactory:连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.*DEFAULT_USER*,
ActiveMQConnection.*DEFAULT_PASSWORD*, "tcp://127.0.0.1:61616");
// JMS客户端到JMS Provider 的连接
Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一个发送或接收消息的线程
Session session = connection.createSession(Boolean.*TRUE*, Session.*AUTO_ACKNOWLEDGE*);
// Destination:消息的目的地;消息发送给谁.
//获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
Destination destination = session.createQueue("my-queue");
//消费者,消息接收者
MessageConsumer consumer = session.createConsumer(destination);
while (true) {
TextMessage message = (TextMessage) consumer.receive();
if (null != message) {
System.out.println("收到消息:" + message.getText());
session.commit();
} else
break;
}
session.close();
connection.close();
}
}
· 异步:Consumer 也可以使用MessageConsumer.setMessageListener() 注册一个 MessageListener 实现异步接收
// 通过监听的方式来消费消息
// 通过异步非阻塞的方式消费消息
// 通过messageConsumer 的setMessageListener 注册一个监听器,
// 当有消息发送来时,系统自动调用MessageListener 的 onMessage 方法处理消息
messageConsumer.setMessageListener(new MessageListener() { // 可以用监听器替换之前的同步receive 方法
public void onMessage(Message message) {
if (null != message && message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("消费者的消息:"+textMessage.getText());
}catch (JMSException e) {
e.printStackTrace();
}
}
}
});
(3)工作模式
· “负载均衡”,当先启动两个消费者时,在启动生产者发出消息,此时的消息平均的被两个消费者消费。 并且消费者不会消费已经被消费的消息(即为已经出队的消息)。
· 多个 Consumer 可以注册到同一个 queue 上,但一个消息只能被一个 Consumer 所接收,然后由该 Consumer 来确认消息。并且在这种情况下,Provider 对所有注册的 Consumer 以轮询的方式发送消息。
(4)有无状态
Queue数据默认会在mq服务器上以文件形式保存,也可以配置成DB存储。比如ActiveMQ 一般保存在$AMQ_HOME\data\kr-store\data下面。
(5)传递完整性
如果没有消费者,保留消息直到被消费或者到达消息超时时间。
发布/订阅模式(Pub/Sub)
(1)使用topic作为Destination
(2)消息接收方式
同步和异步两种方式,同P2P模式。
(3)工作模式
· 有订阅者,则所有订阅者都会受到消息。消息会按照订阅者的数量进行复制。故随着订阅者的增加,处理性能也会明显降低。
· 无订阅者,消息将会被丢弃。先要有订阅者,生产者才有意义
· 生产者
public class TOPSendf {
private static String BROKERURL = "tcp://127.0.0.1:61616";
private static String TOPIC = "my-topic";
public static void main(String[] args) throws JMSException {
start();
}
static public void start() throws JMSException {
System.out.println("生产者已经启动....");
//创建ActiveMQConnectionFactory 会话工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL);
Connection connection = activeMQConnectionFactory.createConnection();
//启动JMS 连接
connection.start();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(null); producer.setDeliveryMode(DeliveryMode.PERSISTENT);
send(producer, session);
System.out.println("发送成功!");
connection.close();
}
static public void send(MessageProducer producer, Session session) throws JMSException {
for (int i = 1; i <= 5; i++) {
System.out.println("我是消息" + i);
TextMessage textMessage = session.createTextMessage("我是消息" + i);
Destination destination = session.createTopic(TOPIC);
producer.send(destination, textMessage);
}
}
}
· 消费者
public class TopReceiver {
private static String BROKERURL = "tcp://127.0.0.1:61616";
private static String TOPIC = "my-topic";
public static void main(String[] args) throws JMSException {
start();
}
static public void start() throws JMSException {
System.out.println("消费点启动...");
// 创建ActiveMQConnectionFactory 会话工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL);
Connection connection = activeMQConnectionFactory.createConnection();
// 启动JMS 连接
connection.start();
// 不开消息启事物,消息只要发送给消费者,则表示消息已经签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个队列
Topic topic = session.createTopic(TOPIC);
MessageConsumer consumer = session.createConsumer(topic);
// consumer.setMessageListener(new MsgListener());
while (true) {
TextMessage textMessage = (TextMessage) consumer.receive();
if (textMessage != null) {
System.out.println("接受到消息:" + textMessage.getText());
// textMessage.acknowledge();// 手动签收
// session.commit();
} else {
break;
}
}
connection.close();
}
}
(4)有无状态
无状态
(5)传递完整性
· 默认情况下,如果没有订阅者,消息会被丢弃
· 也可以通过持久化(Durable)订阅来实现消息的保存。这种情况下,当订阅者与 Provider 断开时,Provider 会为它存储消息。当持久化订阅者重新连接时,将会受到所有的断连期间未消费的消息。
JMS创建通用步骤
(1)获取连接工厂
(2)使用连接工厂创建连接
(3)启动连接
(4)从连接创建会话
(5)获取 Destination
(6)创建 Producer,或创建 message
(7)创建 Consumer,或发送或接收message发送或接收 message
(8)创建 Consumer
(9)注册消息监听器(可选)
(10)发送或接收 message
(11)关闭资源(connection, session, producer, consumer 等)
JMS消息可靠性
持久化
(1)配置
// 在队列为目的地的时候持久化消息
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 队列为目的地的非持久化消息
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
(2)概念
持久化的消息,服务器宕机后消息依旧存在,只是没有入队,当服务器再次启动,消息任就会被消费。
但是非持久化的消息,服务器宕机后消息永远丢失。
(3)默认情况
· P2P模式:默认是持久化的
· 发布订阅模式:默认是非持久化的
// 持久化生产者
MessageProducer messageProducer = session.createProducer(topic);
// 6 通过messageProducer 生产 3 条 消息发送到消息队列中
// 设置持久化topic 在启动
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
for (int i = 1; i < 4 ; i++) {
// 7 创建字消息
TextMessage textMessage = session.createTextMessage("topic_name--" + i);
// 8 通过messageProducer发布消息
messageProducer.send(textMessage);
MapMessage mapMessage = session.createMapMessage();
// mapMessage.setString("k1","v1");
// messageProducer.send(mapMessage);
}
// 9 关闭资源
// 接收持久化消息的消费者
Topic topic = session.createTopic(TOPIC_NAME);
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark...");
// 发布订阅
connection.start();
Message message = topicSubscriber.receive();// 一直等
while (null != message){
TextMessage textMessage = (TextMessage)message;
System.out.println(" 收到的持久化 topic :"+textMessage.getText());
message = topicSubscriber.receive(3000L); // 等1秒后meesage 为空,跳出循环,控制台关闭
}
事务
(1)开启事务
createSession的第一个参数为true 为开启事务,开启事务之后必须将消息提交,才可以在队列中看到消息
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
提交:session.commit();
(2)意义
· 如果对于多条必须同批次传输的消息,可以使用事务,如果一条传输失败,可以将事务回滚,再次传输,保证数据的完整性。
· 对于消息消费者来说,开启事务的话,可以避免消息被多次消费,以及后台和服务器数据的不一致性。
· 如果消息消费的 createSession 设置为 ture ,但是没有 commit ,此时就会造成非常严重的后果,那就是在后台看来消息已经被消费,但是对于服务器来说并没有接收到消息被消费,此时就有可能被多次消费。
Acknowledge 签收(ack)
(1)非事务的session
· AUTO_ACKNOWLEDGE 自动签收(默认情况)
· CLIENT_ACKNOWLEDGE 手动签收
手动签收需要acknowledge,即textMessage.acknowledge();
· DUPS_OK_ACKNOWLEDGE 非必须签收
(2) 而对于开启事务时,设置手动签收和自动签收没有多大的意义,都默认自动签收,也就是说事务的优先级更高一些。但是开启事务没有commit 仍就会重复消费
保证消息幂等性的方法
幂等性即不能重复消费消息。
(1)使用手动签收。自动签收在高并发下可能存在重复消费
(2)数据库主键唯一性。消费一条数据往数据库里插入一条,若重复消费,数据库插入报错
(3)如果收到的数据是写redis,天然的幂等性,每次都是set
(4)使用全局唯一的MessageId
(5)日志记录
ActiveMQ作用
(1)解决耦合调用
(2)削峰,抵御洪峰流量
(3)异步模型
详细说明在《分布式高可用框架总结》有说明
协议
支持的协议
支持的协议有 TCP 、 UDP、NIO、SSL、HTTP(S) 、VM,可以在配置文件中配置
默认配置
默认使用配置文件中name为”openwire”的协议,即TCP协议。
默认的Broker 配置,TCP 的Client 监听端口 61616 ,在网络上传输数据,必须序列化数据,消息是通过一个 write protocol 来序列化为字节流。默认情况 ActiveMQ 会把 wire protocol 叫做 Open Wire ,它的目的是促使网络上的效率和数据快速交互 。
NIO 协议为ActiveMQ 提供更好的性能
(1)适合NIO 使用的场景:
· 当有大量的Client 连接到Broker 上 , 使用NIO 比使用 tcp 需要更少的线程数量,所以使用 NIO
· 可能对于 Broker 有一个很迟钝的网络传输, NIO 的性能高于 TCP
(2)连接形式:
nio://hostname:port?key=value
(3)NIO 增强
· auto 就像是一个网络协议的适配器,可以自动检测协议的类型,并作出匹配
<transportConnector name="auto" uri="auto://localhost:5671?auto.protocols=default,stomp"/>
持久化
概念
将MQ 收到的消息存储到文件、硬盘、数据库 等、 则叫MQ 的持久化,这样即使服务器宕机,消息在本地还是有,仍就可以访问到。
ActiveMQ 支持的消息持久化机制
带赋值功能的 LeavelDB 、 KahaDB 、 AMQ 、 JDBC。
KahaDB
(1)概念
ActiveMQ 5.3 版本起的默认存储方式。KahaDB存储是一个基于文件的快速存储消息,设计目标是易于使用且尽可能快。它使用基于文件的消息数据库意味着没有第三方数据库的先决条件。
(2)配置
<broker brokerName="broker" persistent="true" useShutdownHook="false">
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="16mb"/>
</persistenceAdapter>
</broker>
(3)它使用一个事务日志和索引(B-Tree索引)文件来存储所有的地址
AMQ
(1)概念
· 文件存储形式,不依赖于第三方数据库,能够快速启动和运行
· 写入快、易恢复 默认 32M 在 ActiveMQ 5.3 之后不再适用
· AMQ 消息存储库是可靠持久性和高性能索引(B-Tree索引)的事务日志组合,当消息吞吐量是应用程序的主要需求时,该存储是最佳选择
· 但因为它为每个索引使用两个分开的文件,并且每个 Destination 都有一个索引,所以当你打算在代理中使用数千个队列的时候,不应该使用它
(2)配置
<persistenceAdapter>
<amqPersistenceAdapter
directory="${activemq.data}/kahadb"
syncOnWrite="true"
indexPageSize="16kb"
indexMaxBinSize="100"
maxFileLength="10mb" />
</persistenceAdapter>
LeavelDB
(1)基于文件存储,不适用B-Tree索引,而使用自带的LeavelDB 索引
(2)可以和zookeeper结合,作为Master-Slave数据复制首选方案
JDBC
(1)有一部分数据会真实的存储到数据库中,使用JDBC 的持久化。
(2)ActiveMQ 启动后会自动在 mysql 的activemq 数据库下创建三张表:
· activemq_acks:用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存
· activemq_lock:在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker
· activemq_msgs:用于存储消息,Queue和Topic都存储在这个表中
ActiveMQ部署模式
单例模式
acitvemq单机存储方式,以本地的kahadb文件的方式存储,所以性能完全依赖于本地的磁盘IO,所以不能提供高可用。
无共享主从模式
主从节点分别存储 Message。
(1)主节点
· 当主节点收到持久消息,会等待从节点完成消息的处理(通常是持久化到存储),然后再自己完成消息的处理(如持久化到存储)后,再返回对 Producer 的回执。
· 主节点只能有一个从节点,并且从节点不允许再有其他从节点
· 当从节点没有连接上主节点之前,任何主节点处理的 Message 或者消息确认都会在主节点失效后丢失
(2)从节点
· 从节点需要配置为连接到主节点,并且需要特殊配置其状态。
· 所有消息命令(消息,确认,订阅,事务等)都从主节点复制到从节点,这种复制发生在主节点对其接收的任何命令生效之前。并且,
· 从节点不启动任何传输,也不能接受任何客户端或网络连接,除非主节点失效。当主节点失效后,从节点自动成为主节点,并且开启传输并接受连接。这时,使用 failover 传输的客户端就会连接到该新主节点
(3)配置
Broker 连接配置如下:
failover://(tcp://masterhost:61616,tcp://slavehost:61616)?randomize=false
从节点配置
<services>
<masterConnector remoteURI="tcp://remotehost:62001" userName="Rob" password="Davies"/>
</services>
共享主从模式
允许多个代理共享存储,但任意时刻只有一个是活动的。这种情况下,当主节点失效时,无需人工干预来维护应用的完整性。
另外一个好处就是没有从节点数的限制。
有两种细分模式:基于数据库和基于文件系统
(1)基于数据库
它会获取一个表上的排它锁,以确保没有其他 ActiveMQ 代理可以同时访问数据库。其他未获得锁的代理则处于轮询状态,就会被当做是从节点,不会开启传输也不会接受连接。
(2)基于文件系统
需要获取分布式共享文件锁,linux 系统下推荐用 GFS2。
Zookeeper+ActiveMQ
具体搭建方法见:ActiveMQ集群的搭建(高可用) - 简书
(1)Zookeeper集群下,每个节点搭载一个ActiveMQ
(2)启动后只有一个ActiveMQ可以被访问,即为Master节点,其余节点成为从节点,不提供服务
(3)当 Master 宕机后,zookeper 监测到没有心跳信号, 则认为 master 宕机了,然后选举机制会从剩下的 Slave 中选出一个作为 新的 Master
(4)必须保证需要半数以上的MQ存活才能保证集群正常运行,即ActiveMQ的高可用依赖于Zookeeper的高可用。
高级特性
Failover机制
在项目上线后,需求方要求Agent可以在故障时自动切换到另一个MQ集群以实现容错,且在原集群正常后可以自动切换回来
需要在建立连接时把randomize设置为false,然后加个priorityBackup=true,failover中配置上两个MQ集群的F5就OK。
异步投递
对于一个慢消费者,使用同步有可能造成堵塞,消息消费较慢时适合用异步发送消息 。
(1)activemq 支持同步异步 发送的消息,默认异步。当你设定同步发送的方式和 未使用事务的情况下发持久化消息,这时是同步的。
· 同步发送和异步发送的区别就在于 :
同步发送send 不阻塞就代表消息发送成功
异步发送需要接收回执并又客户端在判断一次是否发送
(2)开启异步投递
// 开启异步投递
activeMQConnectionFactory.setUseAsyncSend(true);
(3)不足
在高性能要求下,可以使用异步提高producer 的性能。但会消耗较多的client 端内存,也不能完全保证消息发送成功。在 useAsyncSend = true 情况下容忍消息丢失。
(4)正确的异步发送方法需要接收回调
activeMQConnectionFactory.setUseAsyncSend(true);
//……
for (int i = 1; i < 4 ; i++) {
textMessage = session.createTextMessage("msg--" + i);
textMessage.setJMSMessageID(UUID.randomUUID().toString()+"-- orderr");
String msgid = textMessage.getJMSMessageID();
messageProducer.send(textMessage, new **AsyncCallback**() {
@Override
public void onSuccess() {
// 发送成功怎么样
System.out.println(msgid+"has been successful send ");
}
@Override
public void onException(JMSException e) {
// 发送失败怎么样
System.out.println(msgid+" has been failure send ");
}
});
}
定时投递与延时投递
(1)在配置文件中设置定时器开关 为 true
(2)Java 代码中封装的辅助消息类型ScheduleMessage,可以设置的常用参数。
(3)demo
long delay = 3 * 1000 ;
long perid = 4 * 1000 ;
int repeat = 7 ;
for (int i = 1; i < 4 ; i++) {
TextMessage textMessage = session.createTextMessage("delay msg--" + i);
// 消息每过 3 秒投递,每 4 秒重复投递一次 ,一共重复投递 7 次
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,delay);
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,perid);
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,repeat);
messageProducer.send(textMessage);
}
ActiveMQ消息重试
(1)重发情况
· 使用事务会话并调用rollback()
· client使用事务并在commit()前关闭会话或者没有commit()
· 手动签收模式下,在session中调用recover()
· 客户端连接超时
(2)一个消息被重发超过默认的最大重发次数(默认6次)时,消费端会给MQ发送一个“poison ack”,告诉broker不要再发了,此时broker会把消息放到DLQ(死信队列)
(3)修改DLQ设置
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(3);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);