文章目录
  1. 1. JMS
    1. 1.1. JMS术语
    2. 1.2. Message接口的消息体
    3. 1.3. ActiveMQ介绍
    4. 1.4. ActiveMQ的HelloWorld示例
    5. 1.5. ActiveMQ Subscribers模式hello world

JMS

Java消息服务(JMS)定义了Java中访问消息中间件的接口。JMS只是接口,并没有给予实现,实现JMS接口的消息中间件称为JMS Provide,已有的MOM系统包括Apache的ActiveMQ、以及阿里巴巴的RocketMQ等

JMS术语

  • Provider (Message Provider):生产者
    由会话创建的对象,用于发送和接受到目标的消息。消费者可以同步的或异步的接受队列和主题类型的消息
  • Consumer (Message Consumer):消费者
    由会话创建的对象,用于发送消息到目标。
  • PTP:Point to Point:即点对点的消息模型
  • Pub/Sub:Publish/Subscribe:即发布/订阅消息模型
  • Queue:队列目标
  • Topic:主题目标
  • ConnectionFactory:连接工厂,JMS用它创建连接
    连接工厂用来创建到JMS提供者的连接的被监管对象。JMS客户通过可移植的接口访问连接,这样当下层的实现代码改变时,代码不需要进行修改。
  • Connection:JMS客户端到JMS Provider的连接
    连接代表了应用程序和消息服务器直接的通信链路。
  • Destination:消息的目的地
    目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。
  • Session:会话,一个发送或者接受消息的线程
  • Message接口:消息
    实在消费者和生产者之间传递的对象。一个消息由三个部分组成:消息头(必须):包含识别和为消息寻找路由的操作设置;一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容;一个消息体(可选):允许用户创建五种类型的消息(文本消息、映射消息、字节消息、流消息和对象消息)

Message接口的消息体

  • StreamMessage Java原始值的数据流
  • MapMessage 名称和键值对
  • TextMessage 一个字符串对象
  • ObjectMessage 一个序列化的Java对象
  • ByteMessage 字节的数据流

ActiveMQ介绍

Active是Apache旗下的一个开源的MOM框架,是一个完全基于JMS1.1和JMS1.4规范的JMS Provider实现。ActiveMQ在业界应用最广泛,可以满足80%的需求。

  • ActiveMQ下载
    ActiveMQ下载地址,我们这里使用的是5.11.1 版本。
  • ActiveMQ目录结构
    activeMQ目录结构
    conf 目录下的 activemq.xml 文件是ActiveMQ的核心配置文件,其他的目录和其他的项目类似。
  • 启动ActiveMQ
    bin 目录下有win32win64 两个目录,不同的操作系统执行不同启动脚本。启动时如果出现端口被占用的情况可以关掉Windows Firewall/Internet Connection Sharing 服务。
  • 进入ActiveMQ的web管理页面
    在本地可以访问http://localhost:8161/ 地址,点击 Manage ActiveMQ broker 连接。输入账号和密码,默认都是admin ,也可以在ActiveMQ的安装目录下的conf 目录下的users.properties 文件进行修改,登录成功后,进入首页,表示成功
    activeMQweb页面

ActiveMQ的HelloWorld示例

这里使用Java代码创建消息生产者和消息消费者
消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消息生产者
* @author
*
*/
public class Sender {
public static void main(String[] args) throws Exception {
// 第一步:建立ConnectionFactory工厂对象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://0.0.0.0:61616");
// 第二步:通过ConnectionFactory创建一个Connection连接,启动connection
Connection connection = connectionFactory.createConnection();
connection.start();
// 第三步,通过connection创建session,用于接收消息;并设置是否,签收模式
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// 第四步:通过session创建Destination对象,该对象指的是一个客户端用来指定生产消费目标和消费消息来源的对象,
// 在PTP模式中,Destination被称为queue,在发布订阅模式中被称为topic;并制定destination的名称
Destination destination = session.createQueue("hellloQueue");
// 第五步:通过session对象创建消息的发送和接受对象(生成者和接受者)
MessageProducer product = session.createProducer(destination);
// 第六步:设置消息持久化的方式
product.setDeliveryMode(DeliveryMode.NON_PERSISTENT);// 设置非持久化
// 第七步:通过session创建消息,并使用product发送出去
for (int i = 0; i <=5; i++) {
TextMessage message = session.createTextMessage();
message.setText("Hello World "+i);
product.send(message);
}
if(connection!=null){
connection.close();
}
}
}

我们上面的代码使用的是PTP模式,所以发送的目的是queue。执行上面代码,点击queue菜单,向queue添加6条消息,另外6条是原来测试的,这里会展现历史记录
activeMQ-向队列中添加元素
点击队列名称查看队列消息的详细信息
activeMQ-向队列详细信息

消息消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消费者消费者
* @author
*
*/
public class Receiver {
public static void main(String[] args) throws Exception{
// 第一步:建立ConnectionFactory工厂对象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://0.0.0.0:61616");
// 第二步:通过ConnectionFactory创建一个Connection连接,启动connection
Connection connection = connectionFactory.createConnection();
connection.start();
// 第三步,通过connection创建session,用于接收消息;并设置是否,签收模式
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// 第四步:通过session创建Destination对象,该对象指的是一个客户端用来指定生产消费目标和消费消息来源的对象,
// 在PTP模式中,Destination被称为queue,在发布订阅模式中被称为topic;并制定destination的名称
Destination destination = session.createQueue("hellloQueue");
// 第五步:通过connection创建consumer
MessageConsumer consumer = session.createConsumer(destination);
while(Boolean.TRUE){
TextMessage message = (TextMessage)consumer.receive();
if(message==null) break;
System.out.println("收到的内如:"+message.getText());
}
if(connection!=null){
connection.close();
}
}
}

执行消费者后,消费者可以接受到信息并打印
activeMQ消费者接受到消息并打印
ActiveMQweb控制台,显示6条消息已经被消费
activeMQ-向队列中消费元素

ActiveMQ Subscribers模式hello world

发布者订阅者模式其实和PTP模式代码差不多,只是需要修改部分代码,但是消息可以重复消费

消息发布者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created by YQ on 2017/6/15.
* 消息发布者
*/
public class Publishers {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://0.0.0.0:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
Destination helloTopic = session.createTopic("helloTopic");
MessageProducer producer = session.createProducer(helloTopic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for (int i = 0; i < 6; i++) {
TextMessage message = session.createTextMessage();
message.setText("Hello World Publish " + i);
producer.send(message);
}
if (connection != null) {
connection.close();
}
}
}

消息订阅者,可以有多个消息订阅者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Created by YQ on 2017/6/15.
* 消息订阅者
*/
public class Subscriber {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://0.0.0.0:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// 在PTP模式中,Destination被称为queue,在发布订阅模式中被称为topic;并制定destination的名称
Destination helloTopic = session.createTopic("helloTopic");
MessageConsumer consumer = session.createConsumer(helloTopic);
while(Boolean.TRUE){
TextMessage message = (TextMessage)consumer.receive();
if(message==null) break;
System.out.println("收到的内如:"+message.getText());
}
if(connection!=null){
connection.close();
}
}
}

执行顺序,需要先执行订阅者,也就是要先订阅才能接受到数据;执行订阅者类,可以在ActiveMQ控制台看到该主题有一个订阅者,如下
ActiveMQ发布订阅模式-订阅者

文章目录
  1. 1. JMS
    1. 1.1. JMS术语
    2. 1.2. Message接口的消息体
    3. 1.3. ActiveMQ介绍
    4. 1.4. ActiveMQ的HelloWorld示例
    5. 1.5. ActiveMQ Subscribers模式hello world