项目理解
一个使用了ActiveMQ消息队列的订单-物流系统,两个系统之间通过消息队列来通信,主要有order.created.queue
和logistics.updated.queue
两个消息队列。
order.created.queue是一个订单创建消息队列。当新订单创建时,系统会将订单信息发送到此队列,通知其他系统或者服务去处理与该订单相关的后续操作。
logistics.updated.queue是一个物流更新消息队列。当订单的物流信息发生更新(比如订单发货、配送进度等)时,系统会将物流信息发送到此队列,通知其他系统或者服务更新或处理订单的物流状态。
消息队列在这个场景中充当了异步通信机制,不是传统意义上的“推送通知”(比如给用户发送消息),而是指在系统内部,消息队列作为一种通信机制,允许服务之间通过异步消息交换来实现解耦。队列本身不会主动“传递”消息给目标服务,而是由目标服务去监听这个队列,然后被动拉取新消息进行处理。
ActiveMQ-Exercise/
├── README.md # 项目说明文档,描述项目的目的、使用方法等
├── common # 公共模块,包含多个共享的类、配置、DTO等
│ ├── pom.xml # 公共模块的Maven配置文件
│ ├── src
│ │ └── main
│ │ └── java
│ │ └── com
│ │ └── example
│ │ └── common
│ │ ├── config
│ │ │ ├── ActiveMQConfig.java # ActiveMQ配置,连接消息队列
│ │ │ └── JmsConverterConfig.java # JMS消息转换器配置
│ │ ├── constants
│ │ │ └── JmsConstants.java # 定义常量,如消息队列的名称等
│ │ ├── dto
│ │ │ ├── LogisticsOrderDTO.java # 物流订单DTO
│ │ │ ├── LogisticsUpdateDTO.java # 物流更新DTO
│ │ │ └── OrderDTO.java # 订单DTO
│ │ ├── model
│ │ │ └── OrderStatus.java # 订单状态枚举类
│ │ └── util
│ │ └── EntityDtoConverter.java # 实体类与DTO类的转换工具类
├── logistics-system # 物流系统模块
│ ├── logistics-system.iml # IntelliJ IDEA项目文件
│ ├── pom.xml # 物流系统模块的Maven配置文件
│ └── src
│ └── main
│ └── java
│ └── com
│ └── example
│ └── logistics
│ ├── LogisticsSystemApplication.java # 物流系统启动类
│ ├── config
│ │ └── JacksonConfig.java # 配置Jackson,处理JSON序列化和反序列化
│ ├── controller
│ │ └── LogisticsController.java # 处理物流订单的控制器
│ ├── entity
│ │ ├── LogisticsOrder.java # 物流订单实体类
│ │ └── LogisticsOrderItem.java # 物流订单项实体类
│ ├── listener
│ │ └── OrderCreatedListener.java # 监听新订单创建的事件
│ ├── repository
│ │ └── LogisticsOrderRepository.java # 物流订单的数据库操作接口
│ └── service
│ ├── LogisticsService.java # 物流服务接口
│ └── impl
│ └── LogisticsServiceImpl.java # 物流服务实现类
│ └── resources
│ ├── application-local.properties # 本地配置文件,包含数据库、消息队列等配置
│ ├── application.properties # 默认配置文件
│ ├── static
│ │ └── css
│ │ └── apple-style.css # CSS样式文件
│ └── templates
│ └── logistics
│ ├── list.html # 物流订单列表页面
│ └── shipped.html # 物流订单已发货页面
├── order-system # 订单系统模块
│ ├── pom.xml # 订单系统模块的Maven配置文件
│ └── src
│ └── main
│ └── java
│ └── com
│ └── example
│ └── order
│ ├── OrderSystemApplication.java # 订单系统启动类
│ ├── controller
│ │ └── OrderController.java # 处理订单的控制器
│ ├── entity
│ │ ├── Order.java # 订单实体类
│ │ └── OrderItem.java # 订单项实体类
│ ├── listener
│ │ └── LogisticsUpdateListener.java # 监听物流更新事件
│ ├── repository
│ │ └── OrderRepository.java # 订单数据库操作接口
│ └── service
│ ├── OrderService.java # 订单服务接口
│ └── impl
│ └── OrderServiceImpl.java # 订单服务实现类
│ └── resources
│ ├── application-local.properties # 本地配置文件
│ ├── application.properties # 默认配置文件
│ ├── static
│ │ └── css
│ │ └── apple-style.css # CSS样式文件
│ └── templates
│ └── order
│ ├── create.html # 创建订单页面
│ └── list.html # 订单列表页面
└── pom.xml # 根目录的Maven配置文件,管理依赖和构建配置
在本项目中,消息队列的作用可以概括为:订单系统在用户创建订单后,通过 Controller 层的业务逻辑将订单信息封装为消息发送到 ActiveMQ 队列中(如 order.created.queue
),而物流系统则通过监听器持续监听该队列,一旦接收到新消息,立即触发对应的处理逻辑(如入库、更新状态等)。这一过程实现了服务之间的解耦,使得各模块可以异步通信、独立扩展,提高了系统的可靠性与可维护性。
[订单系统前端] --> [OrderController] --> [JmsTemplate 发送消息到 MQ]
↓
[ActiveMQ 消息队列: order.created.queue]
↓
[物流系统 OrderCreatedListener 自动监听并处理消息]
接触到的新技术
技术 / 框架 | 作用说明 | 项目中的使用示例 |
---|---|---|
简化配置和启动流程,快速构建微服务项目 | 构建订单系统和物流系统两个独立的 Spring Boot 服务 | |
Spring JPA (Hibernate) | 简化数据库操作,基于实体类自动生成数据库表及 SQL | 使用 JpaRepository 快速实现订单、物流数据的增删改查 |
ActiveMQ(JMS) | 实现系统间异步通信,消息解耦 | 用于订单系统与物流系统之间通过消息队列进行数据传递 |
JmsTemplate / @JmsListener | 发送和监听消息的核心工具 | OrderServiceImpl 中发送消息,OrderCreatedListener 中接收并处理 |
H2 内存数据库 | 用于开发测试的轻量级数据库,启动即用 | 默认配置存储订单与物流信息,无需手动创建数据库 |
Jackson 配置自定义 ObjectMapper | 处理 JSON 与对象间的序列化、反序列化,尤其是日期格式等 | 在 JacksonConfig.java 中配置 JavaTimeModule 支持时间类型 |
DTO(数据传输对象)设计 | 实体与传输对象分离,增强系统灵活性与安全性 | 使用 OrderDTO 、LogisticsOrderDTO 进行跨系统数据传输 |
用于前端展示,结合 Spring MVC 渲染页面 | 项目中如 order/list.html 、logistics/list.html 等模板 | |
Lombok( | 简化 Java 代码的开发,自动生成 getter/setter 等 | 提高开发效率,减少模板代码,如实体类、服务类中大量使用 |
多模块 Maven 工程结构 | 实现模块复用和结构清晰,便于维护 | common 模块抽象公共部分,order-system 和 logistics-system 各自独立 |
@ConditionalOnProperty(Spring Boot) | 条件注解,只有在配置文件中某个属性满足特定值时,才会启用对应的配置类或Bean。 | 当 application.properties 中 jms.implementation=spring ,或者该属性缺失(matchIfMissing=true )时,启用当前Bean或配置。 |
后续新增
后续新增了底层JMS API的使用方法。
✅ 点对点模型(Queue)
📤 生产者代码(QueueProducer.java)
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class QueueProducer {
public static void main(String[] args) throws JMSException {
// 创建连接工厂,连接到ActiveMQ服务器
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = factory.createConnection();
// 创建会话(不使用事务,自动确认消息)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个队列(Queue)目的地
Queue queue = session.createQueue("myQueue");
// 创建生产者,发送到指定队列
MessageProducer producer = session.createProducer(queue);
// 创建一条文本消息
TextMessage message = session.createTextMessage("Hello from QueueProducer!");
// 发送消息
producer.send(message);
// 输出发送内容
System.out.println("Sent message: " + message.getText());
// 关闭资源
session.close();
connection.close();
}
}
📥 消费者代码(QueueConsumer.java)
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class QueueConsumer {
public static void main(String[] args) throws JMSException {
// 创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = factory.createConnection();
connection.start(); // 启动连接,消费者必须先启动才能接收消息
// 创建会话(不使用事务,自动确认消息)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建队列(与生产者的队列名称必须一致)
Queue queue = session.createQueue("myQueue");
// 创建消费者,从队列接收消息
MessageConsumer consumer = session.createConsumer(queue);
// 接收消息(这是一个阻塞方法,直到接收到消息为止)
Message message = consumer.receive();
// 判断消息类型并打印内容
if (message instanceof TextMessage) {
System.out.println("Received: " + ((TextMessage) message).getText());
}
// 关闭资源
session.close();
connection.close();
}
}
✅ 发布/订阅模型(Topic)
📢 发布者代码(TopicPublisher.java)
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TopicPublisher {
public static void main(String[] args) throws JMSException {
// 创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = factory.createConnection();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建主题(Topic)目的地
Topic topic = session.createTopic("myTopic");
// 创建消息发布者
MessageProducer producer = session.createProducer(topic);
// 创建文本消息
TextMessage message = session.createTextMessage("Hello from TopicPublisher!");
// 发布消息到主题
producer.send(message);
// 打印消息内容
System.out.println("Published message: " + message.getText());
// 关闭资源
session.close();
connection.close();
}
}
📡 订阅者代码(TopicSubscriber.java)
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TopicSubscriber {
public static void main(String[] args) throws JMSException {
// 创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = factory.createConnection();
// 为连接设置客户端 ID(用于持久订阅)
connection.setClientID("client1");
// 启动连接,必须启动后才能接收消息
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建主题(Topic)
Topic topic = session.createTopic("myTopic");
// 创建订阅者(普通订阅;如果你想要持久订阅,请使用 `createDurableSubscriber`)
MessageConsumer consumer = session.createConsumer(topic);
// 接收消息
Message message = consumer.receive();
// 打印消息内容
if (message instanceof TextMessage) {
System.out.println("Received: " + ((TextMessage) message).getText());
}
// 关闭资源
session.close();
connection.close();
}
}