博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
实现mq延时队列(订单延时取消)
阅读量:4209 次
发布时间:2019-05-26

本文共 6138 字,大约阅读时间需要 20 分钟。

简单实现mq延时队列

1.RabbitMqConfiguration (mq配置类)

@Configuration@Slf4jpublic class RabbitMqConfiguration{    @Bean    public ConnectionFactory connectionFactory(@Value("${rabbitmq.host}") String host,        @Value("${rabbitmq.port}") int port, @Value("${rabbitmq.username}") String username,        @Value("${rabbitmq.password}") String password,        @Value("${rabbitmq.publisher-confirms}") Boolean publisherConfirms) {        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();        connectionFactory.setHost(host);        connectionFactory.setPort(port);        connectionFactory.setUsername(username);        connectionFactory.setPassword(password);        connectionFactory.setPublisherConfirms(publisherConfirms);        log.info("RabbitMq connectionFactory is finash start!");        return connectionFactory;    }    @SuppressWarnings("AlibabaRemoveCommentedCode")    @Bean(name = MqProperties.CONTAINER_FACTORY)    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();        factory.setConnectionFactory(connectionFactory);        factory.setAcknowledgeMode(AcknowledgeMode.NONE);        // 每个队列设置3个消费者避免单个消费者阻塞而失败        factory.setConcurrentConsumers(3);        factory.setMaxConcurrentConsumers(5);        return factory;    }    @Bean    public AmqpTemplate amqpTemplate(@Autowired ConnectionFactory connectionFactory) {        Logger log = LoggerFactory.getLogger(RabbitTemplate.class);        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);        rabbitTemplate.setEncoding("UTF-8");        // 消息发送失败返回到队列中,yml需要配置 publisher-returns: true        rabbitTemplate.setMandatory(true);        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {            String correlationId = message.getMessageProperties().getCorrelationId();            if (log.isDebugEnabled()) {                log.debug("amqp send fail message<{}> replyCode<{}> reason<{}> exchange<{}>  routeKey<{}>",                    correlationId, replyCode, replyText, exchange, routingKey);            }        });        // 消息确认,yml需要配置 publisher-confirms: true        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {            if (ack) {                if (log.isDebugEnabled()) {                    log.debug("amqp send success id<{}>", correlationData.getId());                }            } else {                if (log.isDebugEnabled()) {                    log.debug("amqp send fail reason<{}>", cause);                }            }        });        return rabbitTemplate;    }    /**     * 构建管理类     *     * @param connectionFactory     * @param mqProperties     * @return     */    @Bean    public AmqpAdmin amqpAdmin(@Autowired ConnectionFactory connectionFactory, @Autowired MqProperties mqProperties) {        AmqpAdmin amqpAdmin = new RabbitAdmin(connectionFactory);                // 预提现死信队列        // 死信交换机        DirectExchange exchange =            (DirectExchange)ExchangeBuilder.directExchange(mqProperties.getExchangeOrderPre()).build();        amqpAdmin.declareExchange(exchange);        // TTL消息队列        Map
arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", mqProperties.getExchangeOrderPre()); arguments.put("x-dead-letter-routing-key", mqProperties.getQueueOrderDRT()); Queue orderPreDLX = QueueBuilder.durable(mqProperties.getSendOrderPreDLX()).withArguments(arguments).build(); amqpAdmin.declareQueue(orderPreDLX); // 消息队列的交换机绑定 amqpAdmin.declareBinding(BindingBuilder.bind(orderPreDLX).to(exchange).with(mqProperties.getSendOrderPreDLX())); // 死信转发队列 Queue orderPreDRT = QueueBuilder.durable(mqProperties.getQueueOrderDRT()).withArguments(arguments).build(); amqpAdmin.declareQueue(orderPreDRT); // 转发队列的交换机绑定 amqpAdmin.declareBinding(BindingBuilder.bind(orderPreDRT).to(exchange).with(mqProperties.getQueueOrderDRT())); return amqpAdmin; }}

2.MqSendService(mq服务类,发送mq消息)

/** * 有有效期的的队列 *  * @param exchange 交换机名称 * @param queueName 队列名称 * @param message消息内容 * @param times 延迟时间 单位毫秒 */@Asyncpublic void send(String exchange, String queueName, String message, long times) {    // 消息发送到死信队列上,当消息超时时,会发生到转发队列上,转发队列根据下面封装的queueName,把消息转发的指定队列上    // 发送前,把消息进行封装,转发时应转发到指定 queueName 队列上    MessagePostProcessor processor = new MessagePostProcessor() {        @Override        public Message postProcessMessage(Message message) {            message.getMessageProperties().setExpiration(times + "");            return message;        }    };    amqpTemplate.convertAndSend(exchange, queueName, message, processor);}

3.OrderService(创建订单时发送死信消息)

// 启动延时队列 (为了准时取消,须保证mq服务时间与api服务时间一致)        mqSendService.send(mqProperties.getExchangeOrderPre(), mqProperties.getSendOrderPreDLX(), orderId.toString(),            loanPreStatusDuration);

4.LoanMqListener 接受死信转发队列消息,取消超时订单

@Component@Slf4jpublic class LoanMqListener extends BaseMqListener {    @Autowired    private LoanHandleService loanHandleService;    /**     * 订单到期处理     *      * @param message     */    @RabbitListener(queues = "${mq.response.order.pre.repeat.trade}", containerFactory = MqProperties.CONTAINER_FACTORY)    public void preLoanOverTime(String content) {        log.info("订单:{}延时取消", content);        loanHandleService.cancelLoanByPreTimeOut(Long.parseLong(content));    }}

5.补充.MqProperties

/** * Mq 配置 */@Component@Data@ToStringpublic class MqProperties {    /**     * Mq 容器 factory     */    public static final String CONTAINER_FACTORY = "containerFactory";    /**     * 订单死信队列     */    @Value("${mq.request.order.pre.dead.letter}")    private String sendOrderPreDLX;    /**     * 订单死信转发队列     */    @Value("${mq.response.order.pre.repeat.trade}")    private String queueOrderDRT;    /**     * 订单死信转发队列交换机     */    @Value("${mq.exchange.order.pre}")    private String exchangeOrderPre;}

6.总结:

优点:

  1. 可以实现实时取消订单,及时恢复订单占用资源(如订单中的商品),不占用应用服务器资源

缺点:

  1. 可能会导致消息大量堆积
  2. 死信消息可能会导致运维预警,需要沟通

欢迎补充

转载地址:http://dwrli.baihongyu.com/

你可能感兴趣的文章
windows 下AdNDP 安装使用
查看>>
Project 2013项目管理教程(1):项目管理概述及预备
查看>>
ssh客户端后台运行
查看>>
哥去求职,才说了一句话考官就让我出去
查看>>
【React Native】把现代web科技带给移动开发者(一)
查看>>
【GoLang】Web工作方式
查看>>
Launch Sublime Text 3 from the command line
查看>>
【数据库之mysql】mysql的安装(一)
查看>>
【数据库之mysql】 mysql 入门教程(二)
查看>>
【HTML5/CSS/JS】A list of Font Awesome icons and their CSS content values(一)
查看>>
【HTML5/CSS/JS】<br>与<p>标签区别(二)
查看>>
【HTML5/CSS/JS】开发跨平台应用工具的选择(三)
查看>>
【心灵鸡汤】Give it five minutes不要让一个好主意随风而去
查看>>
【React Native】Invariant Violation: Application AwesomeProject has not been registered
查看>>
【ReactNative】真机上无法调试 could not connect to development server
查看>>
【XCode 4.6】常用快捷键 特别是格式化代码ctrl+i
查看>>
【iOS游戏开发】icon那点事 之 实际应用(二)
查看>>
【iOS游戏开发】icon那点事 之 图标设计(三)
查看>>
【IOS游戏开发】之测试发布(Distribution)
查看>>
【IOS游戏开发】之IPA破解原理
查看>>