消息队列学习基础
作者: Fysddsw_lc(来自:掘金)
原文链接:
https://juejin.im/post/5c0522b4e51d4579402b8da2
什么是MOM
MOM 就是面向消息中间件(Message-oriented middleware),是用于以分布式应用或系统中的异步、松耦合、可靠、可扩展和安全通信的一类软件。MOM 的总体思想是它作为消息发送器和消息接收器之间的消息中介,这种中介提供了一个全新水平的松耦合。
MOM思想就是A和B两个应用程序不直接发送消息。之前A和B直接发送消息有很多效率问题,如A发送之后B没有及时接受,那么A就一直再在那里堵塞并发性不好,A必须等B接受完之后有返结果了A才可以结束。而MOM就是为了解决这样的问题,不让A与B之间交互,在A和B之间加一个消息中间件,A把消息放到消息中间上,就可以走了,去做别的事情,B什么时候来消息中间件取消息A不用知道也不用管。这样就提高了效率提供并发性,等B去走后可以通过状态,通知,回调等方式通知A就可以。市面上实现这种思想的技术有很多,IBM(MQSEVICES)、Microsoft(MSMQ)以及BEA的MessageMQ等。处于百家争鸣阶段都是各自实现各自的,没有统一实现标准。此时SUN为了实现统一标准就出现了JMS统一实现规范。JMS主要有2种消息模型,点到点和发布订阅两种。
什么是消息队列
消息队列是在消息的传输过程中保存消息的容器,用于接收消息并以文件的方式存储,一个消息队列可以被一个也可以被多个消费者消费。
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题。实现高性能、高可用、可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。
目前在生产环境,使用较多的消息队列有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ等。
消息队列优点
将数据从一个应用程序传到另一个应用程序,或者从软件的一个模块传送到另外一个模块
负责建立网络通信的通道,进行数据的可靠传送
保证数据不重发,不丢失
能够实现跨平台操作,能够为不同操作系统上的软件集成技工数据传送服务
消息队列的应用场景
下面详细介绍一下消息队列在实际应用中常用的使用场景。场景分为异步处理、应用解耦、流量削锋和消息通讯四个场景。
异步处理
场景说明
用户注册后,需要发送注册邮件和发送注册信息,传统的做法有两种:串行方式并行方式
串行方式
将注册信息写入数据库成功后,发送注册邮件,然后发送注册短信,而所有任务执行完成后,返回信息给客户端
并行方式
将注册信息写入数据库成功后,同时进行发送注册邮件和发送注册短信的操作。而所有任务执行完成后,返回信息给客户端。同串行方式相比,并行方式可以提高执行效率,减少执行时间。
上面的比较可以发现,假设三个操作均需要50ms的执行时间,排除网络因素,则最终执行完成,串行方式需要150ms,而并行方式需要100ms。
因为cpu在单位时间内处理的请求数量是一致的,假设:CPU每1秒吞吐量是100此,则串行方式1秒内可执行的请求量为1000/150,不到7次;并行方式1秒内可执行的请求量为1000/100,为10次。
由上可以看出,传统串行和并行的方式会受到系统性能的局限,那么如何解决这个问题?
我们需要引入消息队列,将不是必须的业务逻辑,异步进行处理,由此改造出来的流程为
根据上述的流程,用户的响应的时间基本相当于将数据写入数据库的时间,发送注册邮件,发送注册短信的消息在写入消息队列后,即可返回执行结果,写入消息队列的时间很快,几乎可以忽略,也有此可以将系统吞吐量提升至20QPS,比串行方式提升近3倍,比并行方式提升2倍。
应用解耦
场景说明
用户下单后,订单系统需要通知库存系统。
传统的做法为:订单系统调用库存系统的接口。如下图所示:
传统方式具有如下缺点:
假设库存系统访问失败,则订单减少库存失败,导致订单创建失败
订单系统同库存系统过度耦合
如何解决上述的缺点呢?需要引入消息队列,引入消息队列后的架构如下图所示:
引入消息队列,实现应用解耦
订单系统:用户下单后,订单系统进行数据持久化处理,然后将消息写入消息队列,返回订单创建成功
库存系统:使用拉/推的方式,获取下单信息,库存系统根据订单信息,进行库存操作。
假如在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其后续操作了。由此实现了订单系统与库存系统的应用解耦。
流量削锋
流量削峰
也是消息对列中的常用场景,一般在秒杀或团抢活动中使用广泛。
应用场景
秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。
可以控制参与活动的人数;
可以缓解短时间内高流量对应用的巨大压力;
流量削锋处理方式系统图如下:
流量削锋方式系统图
服务器在接收到用户请求后,首先写入消息队列。这时如果消息队列中消息数量超过最大数量,则直接拒绝用户请求或返回跳转到错误页面;
秒杀业务根据秒杀规则读取消息队列中的请求信息,进行后续处理。
日志处理
日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。
日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下:
消息队列应用于日志处理的架构
日志采集客户端:负责日志数据采集,定时写受写入Kafka队列;
Kafka消息队列:负责日志数据的接收,存储和转发;
日志处理应用:订阅并消费kafka队列中的日志数据;
服务的技术架构设计
Kafka:接收用户日志的消息队列。
Logstash:做日志解析,统一成JSON输出给Elasticsearch。
Elasticsearch:实时日志分析服务的核心技术,一个schemaless,实时的数据存储服务,通过index组织数据,兼具强大的搜索和统计功能。
Kibana:基于Elasticsearch的数据可视化组件,超强的数据可视化能力是众多公司选择ELK stack的重要原因。
消息通讯
消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列、聊天室等。
点对点通讯
点对点通讯架构设计
在点对点通讯架构设计中,客户端A和客户端B共用一个消息队列,即可实现消息通讯功能。
聊天室通讯
聊天室通讯架构设计
客户端A、客户端B、直至客户端N订阅同一消息队列,进行消息的发布与接收,即可实现聊天通讯方案架构设计。
JMS消息服务
讲消息队列就不得不提JMS。JMS(Java Message Service,Java消息服务) JMS 叫做 Java 消息服务(Java Message Service),是 Java 平台上有关面向 MOM 的技术规范,旨在通过提供标准的产生、发送、接收和处理消息的 API 简化企业应用的开发,类似于 JDBC 和关系型数据库通信方式的抽象。
API是一个消息服务的标准/规范,允许应用程序组件基于JavaEE平台创建,发送,接收和读取消息。他是分布式通信耦合度更低,消息服务更加可靠以及异步性。
在EJB架构中,有消息bean可以无缝的与JM消息服务集成。在J2EE架构模式中,有消息服务者模式,用于实现消息与应用直接的解耦。
常用概念
Provider:纯 Java 语言编写的 JMS 接口实现(比如 ActiveMQ 就是)
Domains:消息传递方式,包括点对点(P2P)、发布/订阅(Pub/Sub)两种
Connection factory:客户端使用连接工厂来创建与 JMS provider 的连接
Destination:消息被寻址、发送以及接收的对象
消息模型
在JMS标准中,有两种消息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)
P2P 模式
P2P(点对点)模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,知道他们被消费或者超时。
P2P 消息域使用 queue 作为 Destination,消息可以被同步或异步的发送和接收,每个消息只会给一个 Consumer 传送一次。Consumer 可以使用 MessageConsumer.receive() 同步地接收消息,也可以通过使用MessageConsumer.setMessageListener() 注册一个 MessageListener 实现异步接收。
多个 Consumer 可以注册到同一个 queue 上,但一个消息只能被一个 Consumer 所接收,然后由该 Consumer 来确认消息。并且在这种情况下,Provider 对所有注册的 Consumer 以轮询的方式发送消息。
P2P的特点
每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中,其他的消费者就不能得到这条消息了。)
发送者和接收者质检在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,他不会影响到消息发送到队列。
消费者必须确认对消息的接收
收到消息后消费者必须确认消息已被接收,否则JMS服务提供者会认为该消息没有被接收,那么这条消息仍然可以被其他人接收。程序可以自动进行确认,不需要人工干预。
非持久的消息最多只发送一次
非持久的消息最多只发送一次,表示消息有可能未被发送,造成未被发送的原因可能有:
1、 JMS服务提供者出现宕机等情况,造成非持久信息的丢失
2、 队列中的消息过期,未被接收
持久的消息严格发送一次
我们可以将比较重要的消息设置为持久化的消息,持久化后的消息不会因为JMS服务提供者的故障或者其他原因造成消息丢失。
如果希望发送的每个消息都会被成功处理的话,那么需要p2p 模式
Pub/Sub模式
包含三个角色:主题(Topic),发布者(Publisher),订阅者(Subscriber)。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。
Pub/Sub(发布/订阅,Publish/Subscribe)消息域使用 topic 作为 Destination,发布者向 topic 发送消息,订阅者注册接收来自 topic 的消息。发送到 topic 的任何消息都将自动传递给所有订阅者。接收方式(同步和异步)与 P2P 域相同。
除非显式指定,否则 topic 不会为订阅者保留消息。当然,这可以通过持久化(Durable)订阅来实现消息的保存。这种情况下,当订阅者与 Provider 断开时,Provider 会为它存储消息。当持久化订阅者重新连接时,将会受到所有的断连期间未消费的消息。
Pub/Sub的特点
每个消息都可以有多个(0,1,……)订阅者,每条消息可以有多个消费者,如果报纸和杂志一样,谁订阅了谁都可以获得。
发布者和订阅者之间有时间上的依赖性。订阅者只能消费他们订阅之后出版的消息,针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。这就要求订阅者必须先订阅,生产者再发布。即订阅者必须先运行,再等待生产者的运行,这和点对点类型有所差异。
为了消费消息,订阅者必须保持运行的状态。即订阅者必须保持活动状态等待发布者发布的消息,如果订阅者在发布者发布消息之后才运行,则不能获得先前发布者发布的消息。
为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型。
消息消费
在JMS中,消息的产生和消费都是异步的。对于消费来说,JMS的消息者可以通过两种方式来小消费消息。
同步
订阅者或接收者通过receive方法来接受消息,receive在接收到消息之前(或超时之前)将一直阻塞。异步
订阅者或接收者亦可以注册未一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage的方法。
JDNI:Java命名和目录接口,是一种标准的Java命名系统接口。可以在网络上查找和访问服务。通过指定一个资源名称,该名称对应于数据库或者命名服务中的一个记录,同时返回资源连接建立所必需的信息。
JNDI在JMS中起到查找二号访问发送目标或消息来源的作用。
JMS编程
JMS通用步骤
获取连接工厂
使用连接工厂创建连接
启动连接
从连接创建会话
获取 Destination
创建 Producer,或
创建 Producer
创建 message
创建 Consumer,或发送或接收message发送或接收 message
创建 Consumer
注册消息监听器(可选)
发送或接收 message
关闭资源(connection, session, producer, consumer 等)
JMS编程模型
1.ConnectionFactory
创建Connection对象的工厂,针对两周不同的JMS消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。
2.Destination
Destination的意思是消息生产者的消息发送目标或着说消息消费者的消息来源。对于消息生产者来说。他的Destination是某个队列(queue)或者某个主题(Topic);对于消息消费者来说,他的Destination也是某个队列或主题(即消息来源)。
所以,Destination实际上就是两种类型的对象:Queue,Topic可以通过JNDI来查找Destination
3.Connection
Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP Socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。
4.Session
Session是操作消息的接口。可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。
5.消息的生产者
消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。
6.消息消费者
消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。
7. MessageListener
消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。
深入学习JMS对掌握JAVA架构、EJB架构有很好的帮助,消息中间件也是大型分布式系统必须的组件。本次分享主要做全局性介绍,具体的深入需要大家学习,实践,总结,领会。
JMS编程实战
这里拿ActiveMQ 举例
public class JMSDemo {
ConnectionFactory connectionFactory;
Connection connection;
Session session;
Destination destination;
MessageProducer producer;
MessageConsumer consumer;
Message message;
boolean useTransaction = false;
try {
Context ctx = new InitialContext();
connectionFactory = (ConnectionFactory) ctx.lookup("ConnectionFactoryName");
//使用ActiveMQ时:connectionFactory = new ActiveMQConnectionFactory(user, password, getOptimizeBrokerUrl(broker));
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("TEST.QUEUE");
//生产者发送消息
producer = session.createProducer(destination);
message = session.createTextMessage("this is a test");
//消费者同步接收
consumer = session.createConsumer(destination);
message = (TextMessage) consumer.receive(1000);
System.out.println("Received message: " + message);
//消费者异步接收
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
if (message != null) {
doMessageEvent(message);
}
}
});
} catch (JMSException e) {
...
} finally {
producer.close();
session.close();
connection.close();
}
}
推荐阅读: