JMS
Java消息服务(JMS)定义了Java中访问消息中间件的接口。JMS只是接口,并没有给予实现,实现JMS接口的消息中间件称为JMS Provide,已有的MOM系统包括Apache的ActiveMQ、以及阿里巴巴的RocketMQ等
JMS术语
- Provider (Message Provider):生产者
由会话创建的对象,用于发送和接受到目标的消息。消费者可以同步的或异步的接受队列和主题类型的消息
- Consumer (Message Consumer):消费者
由会话创建的对象,用于发送消息到目标。
- PTP:Point to Point:即点对点的消息模型
- Pub/Sub:Publish/Subscribe:即发布/订阅消息模型
- Queue:队列目标
- Topic:主题目标
- ConnectionFactory:连接工厂,JMS用它创建连接
连接工厂用来创建到JMS提供者的连接的被监管对象。JMS客户通过可移植的接口访问连接,这样当下层的实现代码改变时,代码不需要进行修改。
- Connection:JMS客户端到JMS Provider的连接
连接代表了应用程序和消息服务器直接的通信链路。
- Destination:消息的目的地
目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。
- Session:会话,一个发送或者接受消息的线程
- Message接口:消息
实在消费者和生产者之间传递的对象。一个消息由三个部分组成:消息头(必须):包含识别和为消息寻找路由的操作设置;一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容;一个消息体(可选):允许用户创建五种类型的消息(文本消息、映射消息、字节消息、流消息和对象消息)
Message接口的消息体
- StreamMessage Java原始值的数据流
- MapMessage 名称和键值对
- TextMessage 一个字符串对象
- ObjectMessage 一个序列化的Java对象
- ByteMessage 字节的数据流
ActiveMQ介绍
Active是Apache旗下的一个开源的MOM框架,是一个完全基于JMS1.1和JMS1.4规范的JMS Provider实现。ActiveMQ在业界应用最广泛,可以满足80%的需求。
- ActiveMQ下载
ActiveMQ下载地址,我们这里使用的是5.11.1
版本。
- ActiveMQ目录结构
在conf
目录下的 activemq.xml
文件是ActiveMQ的核心配置文件,其他的目录和其他的项目类似。
- 启动ActiveMQ
在bin
目录下有win32
和 win64
两个目录,不同的操作系统执行不同启动脚本。启动时如果出现端口被占用的情况可以关掉Windows Firewall/Internet Connection Sharing
服务。
- 进入ActiveMQ的web管理页面
在本地可以访问http://localhost:8161/
地址,点击 Manage ActiveMQ broker
连接。输入账号和密码,默认都是admin
,也可以在ActiveMQ的安装目录下的conf
目录下的users.properties
文件进行修改,登录成功后,进入首页,表示成功
ActiveMQ的HelloWorld示例
这里使用Java代码创建消息生产者和消息消费者
消息生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; * 消息生产者 * @author * */ public class Sender { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://0.0.0.0:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("hellloQueue"); MessageProducer product = session.createProducer(destination); product.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for (int i = 0; i <=5; i++) { TextMessage message = session.createTextMessage(); message.setText("Hello World "+i); product.send(message); } if(connection!=null){ connection.close(); } } }
|
我们上面的代码使用的是PTP模式,所以发送的目的是queue。执行上面代码,点击queue菜单,向queue添加6条消息,另外6条是原来测试的,这里会展现历史记录
点击队列名称查看队列消息的详细信息
消息消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; * 消费者消费者 * @author * */ public class Receiver { public static void main(String[] args) throws Exception{ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://0.0.0.0:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("hellloQueue"); MessageConsumer consumer = session.createConsumer(destination); while(Boolean.TRUE){ TextMessage message = (TextMessage)consumer.receive(); if(message==null) break; System.out.println("收到的内如:"+message.getText()); } if(connection!=null){ connection.close(); } } }
|
执行消费者后,消费者可以接受到信息并打印
ActiveMQweb控制台,显示6条消息已经被消费
ActiveMQ Subscribers模式hello world
发布者订阅者模式其实和PTP模式代码差不多,只是需要修改部分代码,但是消息可以重复消费
消息发布者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; * Created by YQ on 2017/6/15. * 消息发布者 */ public class Publishers { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://0.0.0.0:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destination helloTopic = session.createTopic("helloTopic"); MessageProducer producer = session.createProducer(helloTopic); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for (int i = 0; i < 6; i++) { TextMessage message = session.createTextMessage(); message.setText("Hello World Publish " + i); producer.send(message); } if (connection != null) { connection.close(); } } }
|
消息订阅者,可以有多个消息订阅者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| * Created by YQ on 2017/6/15. * 消息订阅者 */ public class Subscriber { public static void main(String[] args) throws Exception{ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://0.0.0.0:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destination helloTopic = session.createTopic("helloTopic"); MessageConsumer consumer = session.createConsumer(helloTopic); while(Boolean.TRUE){ TextMessage message = (TextMessage)consumer.receive(); if(message==null) break; System.out.println("收到的内如:"+message.getText()); } if(connection!=null){ connection.close(); } } }
|
执行顺序,需要先执行订阅者,也就是要先订阅才能接受到数据;执行订阅者类,可以在ActiveMQ控制台看到该主题有一个订阅者,如下