查看原文
其他

ActiveMQ消息队列从入门到实践(3)—通过ActiveMQ收发消息

素文宅博客 Java精选 2022-08-09


在MQ中,消息模型有两种,一种是队列(Queue),一种是主题(Topic)。队列是Point-To-Point的,队列中的消息,仅能被消费一次。主题是Pub/Sub模型,主题中的消息,可以由多个订阅者消费;订阅者只能消费它订阅以后的消息。这是遵循的JMS规范。



1.1 收发消息对象创建过程


如上图所示,JMS规范中,收发消息的对象创建过程如下,下面的示例代码中也将注释这些过程:

1)初始化ConnetionFactory

2)ConnetionFactory创建Connection

3)Connection创建Session

4)Session创建Destination(包括Queue 和 Topic两种)

5)发:Session创建消息生产者MessageProducer(收:Session创建消息消费者MessageConsumer

6)Session创建Message,(发:)MessageProducer发送到Destination,(收:)MessageConsumer从Destination接受消息。


1.2 接口间的关系

JMS规范定义了通用接口(JMS Common Interfaces)、队列接口(PTP-specific Interfaces)和 主题接口(Pub/Sub-specific Interfaces),队列接口和主题接口分别继承于通用接口,具体关系如下表所示。

ActiveMQ对这些规范接口都有相应的实现。在实际的编程过程中,声明通用接口基本就够用了。如何区分Queue和Topic也很简单,参看下面的代码。

//Queue,队列
Destination destination = session.createQueue(subject);
//Topic,主题
Destination destination = session.createTopic(subject);


2. 通过队列发送和接受消息

运行代码的时候,可以先run起来接受消息的程序,再run发送消息的程序,来观察消息发送的过程。

启动ActiveMQ服务器,安装部署和启动的办法,参考:https://blog.yoodb.com/yoodb/article/detail/1543


2.1 通过Queue发送消息

package com.yoodb.mq.queue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * 演示如何向MQ发送消息,和JDBC操作数据库的过程很像
 * 1.初始化连接工厂ConnectionFactory
 * 2.创建连接Connection
 * 3. 创建会话session
 * 4.打开队列createQueue
 * 5.获得消息生产者MessageProducer
 * 6.使用消息生产者发送消息
 * 7. 关闭会话session和连接Connection
 * 使用JMS发送一个这么简单的消息
 */
public class Sender {
  public static void main(String[] args) {
    Sender sender = new Sender();
    String msg = "Hello World!";
    sender.sendMessage(msg);
    System.out.println("发送消息结束:" + msg);
  }
  /**
   * 使用JMS向MQ发送消息
   * 
   * @param msg 消息内容
   */
  public void sendMessage(String msg) {
    // defualt user & password both are null
    String user = ActiveMQConnection.DEFAULT_USER;
    String password = ActiveMQConnection.DEFAULT_PASSWORD;
    // DEFAULT_BROKER_URL =failover://tcp://localhost:61616
    String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    String subject = "TOOL.DEFAULT";
    // 1. 初始化连接工厂
    ConnectionFactory contectionFactory = new ActiveMQConnectionFactory(user, password, url);
    try {
      // 2. 创建连接
      Connection connection = contectionFactory.createConnection();
      connection.start();
      // 3.创建会话
      Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
      // 4. 打开队列
      Destination destination = session.createQueue(subject);
      // 5. MessageProducer负责发送消息
      MessageProducer producer = session.createProducer(destination);
      TextMessage message = session.createTextMessage();
      for (int i = 0; i < 10; i++) {
        String tmp = i + ":" + msg;
        message.setStringProperty("hello", tmp);
        // 6. 发送消息
        producer.send(message);
        System.out.println("send: " + tmp);
        Thread.sleep(3000);
        //只有commit之后,消息才会进入队列
        session.commit();
        
      }
      // 7. 关闭会话和连接
      session.close();
      connection.close();
    } catch (JMSException e) {
      e.printStackTrace();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}


2.2 通过Queue接受消息

package com.yoodb.mq.queue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * 如何从MQ接受消息
 * 1.初始化连接工厂ConnectionFactory
 * 2.创建连接Connection
 * 3. 创建会话session
 * 4.打开队列createQueue
 * 5.获得消息消费者MessageConsumer
 * 6.使用MessageConsumer接受消息
 * 7. 关闭会话session和连接Connection
 */
public class Receiver {
  public static void main(String[] args) {
    String user = ActiveMQConnection.DEFAULT_USER;
    String password = ActiveMQConnection.DEFAULT_PASSWORD;
    String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    String subject = "TOOL.DEFAULT";
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
    Connection connection;
    try {
      connection = connectionFactory.createConnection();
      connection.start();
      final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
      Destination destination = session.createQueue(subject);
      // MessageConsumer负责接受消息
      MessageConsumer consumer = session.createConsumer(destination);
      consumer.setMessageListener(new MessageListener() {
        public void onMessage(Message msg) {
          TextMessage message = (TextMessage) msg;
          try {
            String hello = message.getStringProperty("hello");
            System.out.println("收到消息:\t" + hello);
            session.commit();
          } catch (JMSException e) {
            e.printStackTrace();
          }
        }
      });
      // 为了演示接受消息,这里把关闭会话和连接注释掉了。
      // session.close();
      // connection.close();
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }
}


3. 通过Topic发布和订阅消息

为了使订阅者能够订阅消息,在运行程序时,需要先运行订阅者(Subscriber),后运行发布者(Publisher)。


3.1 通过Topic发布消息

package com.yoodb.mq.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * 如何向MQ发送消息,和JDBC操作数据库的过程很像
 * 1.初始化连接工厂ConnectionFactory
 * 2.创建连接Connection
 * 3. 创建会话session
 * 4.创建topic
 * 5.获得消息生产者MessageProducer
 * 6.使用消息生产者发送消息
 * 7. 关闭会话session和连接Connection
 * 只有那些在线的订阅者可以收到消息,所以我们需要先启动Subscriber
 *
 */
public class Publisher {
  public static void main(String[] args) {
    Publisher pb = new Publisher();
    String msg = "Hello World!~~~~~";
    pb.sendMessage(msg);
    System.out.println("发送消息结束:" + msg);
  }
  /**
   * 使用JMS向MQ发送消息
   * @param msg 消息内容
   */
  public void sendMessage(String msg) {
    // defualt user & password both are null
    String user = ActiveMQConnection.DEFAULT_USER;
    String password = ActiveMQConnection.DEFAULT_PASSWORD;
    // DEFAULT_BROKER_URL =failover://tcp://localhost:61616
    String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    String subject = "MQ.TOPIC";
    // 1. 初始化连接工厂
    ConnectionFactory contectionFactory = new ActiveMQConnectionFactory(user, password, url);
    try {
      // 2. 创建连接
      Connection connection = contectionFactory.createConnection();
      connection.start();
      // 3.创建会话
      Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
      // 4. 创建要发布的主题,和Queue的区别就在此
      Destination destination = session.createTopic(subject);
      // 5. MessageProducer负责发送消息
      MessageProducer producer = session.createProducer(destination);
      TextMessage message = session.createTextMessage();
      message.setStringProperty("hello", msg);
      // 6. 发送消息
      producer.send(message);
      // 7. 关闭会话和连接
      session.commit();
      session.close();
      connection.close();
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }
}


3.2 通过Topic订阅消息

这里我们只有1个订阅者,想要验证多个订阅者,拷贝多份代码,改个类名即可。再次提醒,先运行订阅者。

package com.yoodb.mq.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * 演示如何从MQ接受消息,和发送差不多
 * 1.初始化连接工厂ConnectionFactory
 * 2.创建连接Connection
 * 3. 创建会话session
 * 4.打开队列createQueue
 * 5.获得消息消费者MessageConsumer
 * 6.使用MessageConsumer接受消息
 * 7. 关闭会话session和连接Connection
 */
public class Subscriber {
  public static void main(String[] args) {
    String user = ActiveMQConnection.DEFAULT_USER;
    String password = ActiveMQConnection.DEFAULT_PASSWORD;
    String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    String subject = "MQ.TOPIC";
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
    Connection connection;
    try {
      connection = connectionFactory.createConnection();
      connection.start();
      final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
      Topic topic = session.createTopic(subject);
      // MessageConsumer负责接受消息
      MessageConsumer consumer = session.createConsumer(topic);
      consumer.setMessageListener(new MessageListener() {
        public void onMessage(Message msg) {
          TextMessage message = (TextMessage) msg;
          try {
            String hello = message.getStringProperty("hello");
            System.out.println("订阅者---SecondSubscriber---收到消息:\t" + hello);
            session.commit();
          } catch (JMSException e) {
            e.printStackTrace();
          }
        }
      });
      // 为了测试效果,注释掉了两行代码,使Session和connection一直处于打开状态
      //session.close();
      //connection.close();
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }
}


推荐阅读

Dubbo应用服务迁移到Kubernetes集成方案

JDK1.7中HashMap死环问题及JDK1.8中对HashMap的优化源码详解

Shiro权限基础篇(一):Shiro权限的基本使用方法

Shiro应用篇(二):Shiro结合Redis实现分布式环境下的Session共享

ActiveMQ消息队列从入门到实践(1)—JMS的概念和JMS消息模型

ActiveMQ消息队列从入门到实践(2)—Windows安装activemq服务

Spring基础篇—常见的Spring异常分析及处理

Spring高级篇—Spring Security入门原理及实战

微框架Spring Boot使用Redis如何实现Session共享

Java面试高级篇—Dubbo与Zookeeper面试题16期

Java面试高级篇—Java NIO:浅析I/O模型面试题15期

Java面试高级篇—JavaIO流原理以及Buffered高效原理详解

Java面试高级篇—详谈Java四种线程池及new Thread的弊端面试题14期

消息队列篇—详谈ActiveMQ消息队列模式的分析及使用

更多推荐↓↓↓
 

关注微信公众号“Java精选”(w_z90110),回复关键字领取资料:如HadoopDubboCAS源码等等,免费领取资料视频和项目。 


涵盖:程序人生、搞笑视频、算法与数据结构、黑客技术与网络安全、前端开发、Java、Python、Redis缓存、Spring源码、各大主流框架、Web开发、大数据技术、Storm、Hadoop、MapReduce、Spark、elasticsearch、单点登录统一认证、分布式框架、集群、安卓开发、iOS开发、C/C++、.NET、Linux、Mysql、Oracle、NoSQL非关系型数据库、运维等。

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存