本文共 6138 字,大约阅读时间需要 20 分钟。
简单实现mq延时队列
1.RabbitMqConfiguration (mq配置类)@Configuration@Slf4jpublic class RabbitMqConfiguration{ @Bean public ConnectionFactory connectionFactory(@Value("${rabbitmq.host}") String host, @Value("${rabbitmq.port}") int port, @Value("${rabbitmq.username}") String username, @Value("${rabbitmq.password}") String password, @Value("${rabbitmq.publisher-confirms}") Boolean publisherConfirms) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setPublisherConfirms(publisherConfirms); log.info("RabbitMq connectionFactory is finash start!"); return connectionFactory; } @SuppressWarnings("AlibabaRemoveCommentedCode") @Bean(name = MqProperties.CONTAINER_FACTORY) public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setAcknowledgeMode(AcknowledgeMode.NONE); // 每个队列设置3个消费者避免单个消费者阻塞而失败 factory.setConcurrentConsumers(3); factory.setMaxConcurrentConsumers(5); return factory; } @Bean public AmqpTemplate amqpTemplate(@Autowired ConnectionFactory connectionFactory) { Logger log = LoggerFactory.getLogger(RabbitTemplate.class); RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setEncoding("UTF-8"); // 消息发送失败返回到队列中,yml需要配置 publisher-returns: true rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { String correlationId = message.getMessageProperties().getCorrelationId(); if (log.isDebugEnabled()) { log.debug("amqp send fail message<{}> replyCode<{}> reason<{}> exchange<{}> routeKey<{}>", correlationId, replyCode, replyText, exchange, routingKey); } }); // 消息确认,yml需要配置 publisher-confirms: true rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { if (log.isDebugEnabled()) { log.debug("amqp send success id<{}>", correlationData.getId()); } } else { if (log.isDebugEnabled()) { log.debug("amqp send fail reason<{}>", cause); } } }); return rabbitTemplate; } /** * 构建管理类 * * @param connectionFactory * @param mqProperties * @return */ @Bean public AmqpAdmin amqpAdmin(@Autowired ConnectionFactory connectionFactory, @Autowired MqProperties mqProperties) { AmqpAdmin amqpAdmin = new RabbitAdmin(connectionFactory); // 预提现死信队列 // 死信交换机 DirectExchange exchange = (DirectExchange)ExchangeBuilder.directExchange(mqProperties.getExchangeOrderPre()).build(); amqpAdmin.declareExchange(exchange); // TTL消息队列 Maparguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", mqProperties.getExchangeOrderPre()); arguments.put("x-dead-letter-routing-key", mqProperties.getQueueOrderDRT()); Queue orderPreDLX = QueueBuilder.durable(mqProperties.getSendOrderPreDLX()).withArguments(arguments).build(); amqpAdmin.declareQueue(orderPreDLX); // 消息队列的交换机绑定 amqpAdmin.declareBinding(BindingBuilder.bind(orderPreDLX).to(exchange).with(mqProperties.getSendOrderPreDLX())); // 死信转发队列 Queue orderPreDRT = QueueBuilder.durable(mqProperties.getQueueOrderDRT()).withArguments(arguments).build(); amqpAdmin.declareQueue(orderPreDRT); // 转发队列的交换机绑定 amqpAdmin.declareBinding(BindingBuilder.bind(orderPreDRT).to(exchange).with(mqProperties.getQueueOrderDRT())); return amqpAdmin; }}
2.MqSendService(mq服务类,发送mq消息)
/** * 有有效期的的队列 * * @param exchange 交换机名称 * @param queueName 队列名称 * @param message消息内容 * @param times 延迟时间 单位毫秒 */@Asyncpublic void send(String exchange, String queueName, String message, long times) { // 消息发送到死信队列上,当消息超时时,会发生到转发队列上,转发队列根据下面封装的queueName,把消息转发的指定队列上 // 发送前,把消息进行封装,转发时应转发到指定 queueName 队列上 MessagePostProcessor processor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) { message.getMessageProperties().setExpiration(times + ""); return message; } }; amqpTemplate.convertAndSend(exchange, queueName, message, processor);}
3.OrderService(创建订单时发送死信消息)
// 启动延时队列 (为了准时取消,须保证mq服务时间与api服务时间一致) mqSendService.send(mqProperties.getExchangeOrderPre(), mqProperties.getSendOrderPreDLX(), orderId.toString(), loanPreStatusDuration);
4.LoanMqListener 接受死信转发队列消息,取消超时订单
@Component@Slf4jpublic class LoanMqListener extends BaseMqListener { @Autowired private LoanHandleService loanHandleService; /** * 订单到期处理 * * @param message */ @RabbitListener(queues = "${mq.response.order.pre.repeat.trade}", containerFactory = MqProperties.CONTAINER_FACTORY) public void preLoanOverTime(String content) { log.info("订单:{}延时取消", content); loanHandleService.cancelLoanByPreTimeOut(Long.parseLong(content)); }}
5.补充.MqProperties
/** * Mq 配置 */@Component@Data@ToStringpublic class MqProperties { /** * Mq 容器 factory */ public static final String CONTAINER_FACTORY = "containerFactory"; /** * 订单死信队列 */ @Value("${mq.request.order.pre.dead.letter}") private String sendOrderPreDLX; /** * 订单死信转发队列 */ @Value("${mq.response.order.pre.repeat.trade}") private String queueOrderDRT; /** * 订单死信转发队列交换机 */ @Value("${mq.exchange.order.pre}") private String exchangeOrderPre;}
6.总结:
优点:缺点:
欢迎补充
转载地址:http://dwrli.baihongyu.com/