前言:之前写过一个在线购物的小商城,现在还记得当初遇到了一个让我很难受的事情。什么事情呢?就是有大量订单的情况下,有部分订单未支付,我们需要将订单及时地删除或者标记未未支付状态。怎么做才能做到效率呢?

面对这个问题,我刚开始的方法是:开一个定时器,每间隔10分钟,轮训一次数据库,如果下单时间与当前时间大于10分钟,那么至该订单为过期状态。 对于这种解决方案,仔细想想有什么觉得不妥当的地方呢? 当然是有的: ①:效率不高,轮询数据库,每次都要扫描到很多记录,并且未付款的订单其实只是占少部分,牺牲了系统资源,问题虽然得到了解决,但效率不妥当。 ②:不够优雅 为什么这么说呢?因为 如果我们设定定时器间隔太小,例如10S执行一次,那么对数据库的性能消耗显然过大,但如果我们设定10分钟的话,假设定时器刚执行一遍任务,仅隔十几秒钟,又有一些订单是过期状态,但是我们却不能对这些订单及时做出修改,而要等到下一个定时器的运行周期,才可以更改这些订单的状态,所以说,很不优雅。误差也太大。

作为一个有一丝洁癖的我来说,这种写法,我接受不了,但是由于我是个菜鸡,我当时也想不出有什么新的解决方案。 既然解决不了,那么这个问题在我心里其实已经扎根了很久,我想着,终有一天,我要想到一个完美的解决方案。 后来,我接触了很多技术,RabbitMQ带给了我一丝惊喜,因为我发现,它的特性:延迟消息,真的不要太棒。 废话不多说。我们来看看实战吧~

本次demo设计技术栈:RabbitMQ、Spring Data Jpa 、Spring Boot

一、了解RabbitMQ

为了方便学习,本文图片来自:RabbitMQ六种模式介绍(1),每种模式对应的代码实现可参考:RabbitMQ六种模式介绍(2)

1.1 RabbitMQ的六种订阅模式

1.1.1 简单模式

功能:一个生产者P发送消息到队列Q,一个消费者C接收

1.1.2 工作队列模式Work Queue

功能:一个生产者,多个消费者,每个消费者获取到的消息唯一,多个消费者只有一个队列

1.1.3 发布/订阅模式Publish/Subscribe

功能:一个生产者发送的消息会被多个消费者获取。一个生产者、一个交换机、多个队列、多个消费者 生产者:可以将消息发送到队列或者是交换机。 消费者:只能从队列中获取消息。

1.1.4 路由模式Routing

说明:生产者发送消息到交换机并且要指定路由key,消费者将队列绑定到交换机时需要指定路由key

1.1.5 通配符模式Topics

说明:生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配;符号#:匹配一个或者多个词lazy.# 可以匹配lazy.irs或者lazy.irs.cor

符号*:只能匹配一个词lazy.* 可以匹配lazy.irs或者lazy.cor

1.1.6 Rpc模式

RPC模式:生产者,多个消费者,路由规则,多个队列 总结 一个队列,一条消息只会被一个消费者消费(有多个消费者的情况也是一样的)。

二、实战演练:

为了方便演示,本demo将过期时间设置为10S,你也可以根据自己的需求更改过期时间。 业务流程:

Github开源地址:Github源代码链接

2.1 建立表模型

该类是一个仅含有简单属性的订单模型

package com.raven.rabbitmq.model;

import com.fasterxml.jackson.annotation.JsonFormat;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;

import com.fasterxml.jackson.databind.annotation.JsonSerialize;

import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;

import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;

import org.hibernate.annotations.GenericGenerator;

import javax.persistence.*;

import java.io.Serializable;

import java.time.LocalDateTime;

@Entity

@Table(name = "order_goods")

@GenericGenerator(name = "jpa-uuid", strategy = "uuid")

public class Order implements Serializable {

@Id

@GeneratedValue(generator = "jpa-uuid" )

@Column(name = "id",columnDefinition = "varchar(32) comment '订单id'")

public String id;

@Column(name = "user_id",columnDefinition = "varchar(20) comment '用户id'")

public String userId;

@JsonSerialize(using = LocalDateTimeSerializer.class)

@JsonDeserialize(using = LocalDateTimeDeserializer.class)

@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")

@Column(name = "create_time",columnDefinition = "dateTime DEFAULT now() comment '创建时间'")

public LocalDateTime createTime;

@JsonDeserialize(using = LocalDateTimeDeserializer.class)

@JsonSerialize(using = LocalDateTimeSerializer.class)

@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")

@Column(name = "pay_time",columnDefinition = "dateTime DEFAULT null comment '支付时间'")

public LocalDateTime payTime;

@Column(name = "pay_status",columnDefinition = "INT comment '支付状态'")

public int payStatus;

public String getId() {

return id;

}

public void setId(String id) {

this.id = id;

}

public String getUserId() {

return userId;

}

public void setUserId(String userId) {

this.userId = userId;

}

public LocalDateTime getCreateTime() {

return createTime;

}

public void setCreateTime(LocalDateTime createTime) {

this.createTime = createTime;

}

public int getPayStatus() {

return payStatus;

}

public void setPayStatus(int payStatus) {

this.payStatus = payStatus;

}

public LocalDateTime getPayTime() {

return payTime;

}

public void setPayTime(LocalDateTime payTime) {

this.payTime = payTime;

}

@Override

public String toString() {

return "Order{" +

"id='" + id + '\'' +

", userId='" + userId + '\'' +

", createTime=" + createTime +

", payTime=" + payTime +

", payStatus=" + payStatus +

'}';

}

}

2.2 订单配置

订单一些配置属性,我们把它抽取出来。

package com.raven.rabbitmq.config;

public class OrderConfig {

public final static int order_no_pay = 1;

public final static int order_pay = 2;

public final static int order_expired = 3;

}

2.3 RabbitMQ 配置类

如下配置了rabbitMQ的一些信息传递规则

package com.raven.rabbitmq.config;

import org.springframework.amqp.core.*;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;

import org.springframework.amqp.support.converter.MessageConverter;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

import java.util.Map;

//rabbitMQ的配置

@Configuration

public class MQConfig {

//交换机

public static final String EXCHNAGE_DELAY = "EXCHNAGE_DELAY";

// 订单队列

public static final String QUEUE_ORDER = "QUEUE_ORDER";

//死信队列 用来接收延迟队列的消息

public static final String QUEUE_DELAY = "QUEUE_DELAY";

// 检测订单队列 (延迟队列)时间过期后,该数据会被推送至死信队列

public static final String QUEUE_CHECK_ORDER = "QUEUE_CHECK_ORDER";

// 订单支付成功路由键

public static final String QUEUE_PAY_SUCCESS = "QUEUE_PAY_SUCCESS";

//订单路由键

public static final String ROUTINGKEY_QUEUE_ORDER = "ROUTINGKEY_QUEUE_ORDER";

// 成功支付路由健

public static final String ROUTINGKEY_QUEUE_PAY_SUCCESS = "ROUTINGKEY_QUEUE_PAY_SUCCESS";

// 订单检测路由键

public static final String ROUTINGKEY_QUEUE_CHECK_ORDER = "ROUTINGKEY_QUEUE_CHECK_ORDER";

// 死信路由键

public static final String ROUTINGKEY_QUEUE_DELAY = "ROUTINGKEY_QUEUE_DELAY";

//定义交换机

@Bean

public Exchange exchangeDelay(){

return ExchangeBuilder.topicExchange(EXCHNAGE_DELAY).durable(true).build();

}

//检测订单

@Bean(QUEUE_CHECK_ORDER)

public Queue queueCheckOrder(){

Map map = new HashMap<>();

//过期的消息给哪个交换机的名字

map.put("x-dead-letter-exchange", EXCHNAGE_DELAY);

//设置死信交换机把过期的消息给哪个路由键接收

map.put("x-dead-letter-routing-key", ROUTINGKEY_QUEUE_DELAY);

//队列消息过期时间10s

map.put("x-message-ttl", 10000);

return new Queue(QUEUE_CHECK_ORDER,true,false,false,map);

}

//死信队列

@Bean(QUEUE_DELAY)

public Queue queueDelay(){

return new Queue(QUEUE_DELAY,true);

}

// 支付成功队列

@Bean(QUEUE_PAY_SUCCESS)

public Queue queuePaySuccess(){

return new Queue(QUEUE_PAY_SUCCESS,true);

}

// 订单队列

@Bean(QUEUE_ORDER)

public Queue queueOrder(){

return new Queue(QUEUE_ORDER,true);

}

// 绑定队列与交换器

@Bean

public Binding queueOrderBinding(){

return BindingBuilder.bind(queueOrder()).to(exchangeDelay()).with(ROUTINGKEY_QUEUE_ORDER).noargs();

}

@Bean

public Binding queueCheckOrderBinding(){

return BindingBuilder.bind(queueCheckOrder()).to(exchangeDelay()).with(ROUTINGKEY_QUEUE_CHECK_ORDER).noargs();

}

@Bean

public Binding queueDelayBinding(){

return BindingBuilder.bind(queueDelay()).to(exchangeDelay()).with(ROUTINGKEY_QUEUE_DELAY).noargs();

}

@Bean

public Binding queuePayBinding(){

return BindingBuilder.bind(queuePaySuccess()).to(exchangeDelay()).with(ROUTINGKEY_QUEUE_PAY_SUCCESS).noargs();

}

@Bean

public MessageConverter messageConverter(){

return new Jackson2JsonMessageConverter();

}

}

2.4 订单DAO接口

package com.raven.rabbitmq.dao;

import com.raven.rabbitmq.model.Order;

import org.springframework.data.jpa.repository.JpaRepository;

import org.springframework.data.jpa.repository.JpaSpecificationExecutor;

import org.springframework.data.repository.CrudRepository;

public interface OrderDAO extends JpaRepository, CrudRepository, JpaSpecificationExecutor {

}

2.5 消费者

处理消息

package com.raven.rabbitmq.demo;

import com.raven.rabbitmq.config.MQConfig;

import com.raven.rabbitmq.config.OrderConfig;

import com.raven.rabbitmq.dao.OrderDAO;

import com.raven.rabbitmq.model.Order;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.messaging.Message;

import org.springframework.messaging.handler.annotation.Payload;

import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

import java.util.Optional;

@Component

public class Consumer {

@Autowired

OrderDAO orderDAO;

@Autowired

private RabbitTemplate rabbitTemplate;

@RabbitListener(queues = MQConfig.QUEUE_ORDER)

public void handlerOrder(@Payload Order order, Message message){

order.setPayStatus(OrderConfig.order_no_pay);

order.setCreateTime(LocalDateTime.now());

// 保存订单

orderDAO.save(order);

System.out.println("新建了一个订单, orderId:"+order.getId());

System.out.println("审核链接:http://localhost:8081/paySuccess?orderId="+order.getId());

// 发送该订单至核验队列

rabbitTemplate.convertAndSend(

MQConfig.EXCHNAGE_DELAY,

MQConfig.ROUTINGKEY_QUEUE_CHECK_ORDER,

order);

}

// 核验队列(延迟)后 会将消息发送至死信队列。死信队列判断该订单是否过期

@RabbitListener(queues = MQConfig.QUEUE_DELAY)

public void handlerDelayOrder(@Payload Order order, Message message){

System.out.println(order.toString());

// 查找数据库该订单是否已支付

Optional od = orderDAO.findById(order.getId());

od.ifPresent(e->{

if(e.getPayStatus() == OrderConfig.order_pay){

System.out.println(String.format("订单id:%s支付成功~",e.getId()));

}else{

e.setPayStatus(OrderConfig.order_expired);

orderDAO.save(e);

System.out.println(String.format("订单id:%s长时间未支付,已过期",e.getId()));

}

});

}

// 支付成功

@RabbitListener(queues = MQConfig.QUEUE_PAY_SUCCESS)

public void handlerPayOrder(@Payload String orderId, Message message){

if(orderId == null || orderId.equals("")){

return ;

}

Optional orderOptional = orderDAO.findById(orderId);

orderOptional.ifPresent(order->{

order.setPayStatus(OrderConfig.order_pay);

order.setPayTime(LocalDateTime.now());

orderDAO.save(order);

});

}

}

2.6 service层

package com.raven.rabbitmq.service;

import com.raven.rabbitmq.config.MQConfig;

import com.raven.rabbitmq.config.OrderConfig;

import com.raven.rabbitmq.dao.OrderDAO;

import com.raven.rabbitmq.model.Order;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

import java.util.HashMap;

@Service

public class OrderService {

@Autowired

private RabbitTemplate rabbitTemplate;

@Autowired

OrderDAO orderDAO;

public void addOrder(Order order) {

order.setPayStatus(OrderConfig.order_no_pay);

rabbitTemplate.convertAndSend(

MQConfig.EXCHNAGE_DELAY,

MQConfig.ROUTINGKEY_QUEUE_ORDER,

order);

}

public void orderPay(String orderId) {

rabbitTemplate.convertAndSend(

MQConfig.EXCHNAGE_DELAY,

MQConfig.ROUTINGKEY_QUEUE_PAY_SUCCESS,

orderId);

}

}

2.7 controller层

定义下单接口,审核接口

package com.raven.rabbitmq.controller;

import com.raven.rabbitmq.config.MQConfig;

import com.raven.rabbitmq.model.Order;

import com.raven.rabbitmq.service.OrderService;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.PostMapping;

import org.springframework.web.bind.annotation.RequestBody;

import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;

@RestController

public class PayController {

@Autowired

OrderService orderService;

@PostMapping("/createOrder")

public String createOrder(@RequestBody Order order){

orderService.addOrder(order);

return "已生成订单,请在10s内完成支付";

}

@GetMapping("/paySuccess")

public String paySuccess(String orderId){

orderService.orderPay(orderId);

return "您已支付!祝您生活愉快~";

}

}

2.8 项目配置

rabbitMQ以及web端口配置

spring:

rabbitmq:

host: 127.0.0.1

port: 5672

username: guest

password: guest

virtualHost: /

server:

port: 8081

数据库配置,记得改下数据库密码以及创建下数据库哦 sping data Jpa 并不会自动帮我们建立数据库。

spring.datasource.url=jdbc:mysql://localhost:3306/pay_demo?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC

spring.datasource.username=root

spring.datasource.password=xxx

spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

spring.jpa.database=mysql

spring.jpa.hibernate.ddl-auto=create

spring.jpa.show-sql=false

spring.jpa.properties.hibernate.format_sql=false

2.9 POM文件

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

org.springframework.boot

2.5.6

spring-boot-starter-parent

com.raven

rabbitmq

0.0.1-SNAPSHOT

rabbitmq

Demo project for Spring Boot

1.8

org.springframework.boot

spring-boot-starter

junit

junit

test

4.12

org.springframework.boot

spring-boot-starter-test

test

org.springframework.boot

spring-boot-starter-amqp

org.springframework.boot

spring-boot-test

junit

junit

org.springframework

spring-test

com.fasterxml.jackson.datatype

jackson-datatype-guava

2.10.1

com.fasterxml.jackson.datatype

jackson-datatype-jsr310

org.springframework.boot

spring-boot-starter-web

mysql

mysql-connector-java

8.0.13

runtime

org.springframework.boot

spring-boot-starter-data-jpa

org.springframework.boot

spring-boot-maven-plugin

central

aliyun maven

http://maven.aliyun.com/nexus/content/groups/public/

default

true

false

三、演示结果

首先使用idea自带的HTTP测试工具:发送如下请求 如下:创建了一笔订单:userId为:10086。 用户Id:10086 建立了一笔订单。

POST http://localhost:8081/createOrder

Accept: application/json

Content-Type: application/json

{"userId": 10086}

我们可以看到控制台会打印:

再来看看数据库的记录: 若你没有点击那个审核链接,在等待十秒之后,会打印如下内容: 同时也会修改数据库对应数据的订单状态,即死信队列会将该数据标记为已过期。 若点击了审核订单: 再过十秒钟,可以看到死信队列检测到该订单通过了,并不会做什么处理。 查询数据库 可以看到我们的订单: 第一笔为过期订单,第二笔为我们审核的订单。

这样,我们就成功以优雅的方式搞定了过期订单。

结语:如果你有更好的解决方案,或者你觉得本文提供的解决方案还有问题或者可以更哈德改进,欢迎留言与我探讨哦。