2023年7月21日发(作者:)
延时队列-基于Redisson实现的延时队列前⾔定时调度基本是每个项⽬都会遇到的业务场景,⼀般地,都会通过任务调度⼯具执⾏定时任务完成,定时任务有两点缺陷,⼀、定时任务执⾏频度限制,实际执⾏的时间可能会晚于理想的设定时间,例如,如果要通过定时任务实现在下单后15分钟仍未⽀付则取消订单的功能,假设定时任务的执⾏频度为每分钟执⾏⼀次,对于有些订单⽽⾔,其实际取消时间是介于15-16分钟之间,不够精确;⼆、定时任务执⾏需要时间,定时任务的执⾏也需要时间,如果业务场景的数据量较⼤,执⾏⼀次定时任务需要⾜够长的时间,进⼀步放⼤了缺点⼀。Redisson延时队列在上⼀篇⽂章中延时队列-基于RabbitMq延时消息插件实现的延时队列介绍了⼀种基于RabbitMq延时消息插件实现的延时队列,除此之外,另⼀种⽐较常见的实现⽅式是Redisson延时队列。在Redisson官⽹⽂档中,使⽤延时队列的代码如下:RQueue distinationQueue = ...RDelayedQueue delayedQueue = getDelayedQueue(distinationQueue);// 10秒钟以后将消息发送到指定队列("msg1", 10, S);// ⼀分钟以后将消息发送到指定队列("msg2", 1, S);当该延时队列不需要使⽤时,应当主动销毁RDelayedQueue delayedQueue = ...y();Redisson的延时队列是对另⼀个队列的再包装,使⽤时要先将延时消息添加到延时队列中,当延时队列中的消息达到设定的延时时间后,该延时消息才会进⾏进⼊到被包装队列中,因此,我们只需要对被包装队列进⾏监听即可。代码部分在上⼀篇⽂章中,我们定义了DelayMessageManager延时消息管理器并提供了⼀种基于RabbitMq的实现,本⽂中我们再提供⼀种基于Redisson延时队列的实现即可。package r;import Util;import essageHandler;import essage;import essageType;import 4j;import on;import ingQueue;import edQueue;import onClient;import ent;import ;import ;import rentHashMap;import static n;/** * @author * @date 2021/10/11 20:59 * @description 基于redisson实现的延时消息管理器 */@Component@Component@Slf4jpublic class RedissonDelayMessageManager implements DelayMessageManager { private RedissonClient redissonClient = (); private RBlockingQueue rBlockingQueue = ckingDeque("redisson-delay-message-queue"); private RDelayedQueue rDelayedQueue = ayedQueue(rBlockingQueue); private final Map handlerMap = new ConcurrentHashMap<>(16); @Override public void add(DelayMessage message) { if (ns(message)) { return; } ("redisson-delay-message-queue,add message = {}", message); (message, perties().getExpire(), perties().getTimeUnit()); } @Override public boolean remove(DelayMessage message) { return (message); } @Override public void destroy() { y(); } @Override public void afterPropertiesSet() { (()).forEach(delayMessageType -> (delayMessageType, getBean(dler()))); Thread thread = new Thread(() -> { while (true) { try { DelayMessage delayMessage = (); ("redisson-delay-message-queue,consume message = {}", delayMessage); (e()).handle(delayMessage); } catch (InterruptedException e) { tackTrace(); eep(1000); afterPropertiesSet(); } } }); mon(true); (); }}代码中,⾸先创建了⼀个Redisson实现的阻塞队列RBlockingQueue的实例rBlockingQueue,然后⼜使⽤该阻塞队列rBlockingQueue创建了⼀个延时队列RDelayedQueue的实例rDelayedQueue,按照前⽂中的描述,延时消息添加后并不是⽴即进⼊到阻塞队列rBlockingQueue中,⽽是到达了设定的延时时间之后才会从延时队列rDelayedQueue进⼊到阻塞队列rBlockingQueue;因此,延时消息的添加由延时队列rDelayedQueue完成⽽延时队列的消费则由阻塞队列rBlockingQueue完成,注意,这⾥如果直接对延时队列rDelayedQueue进⾏监听,则延时消息刚加⼊时就会被消费,达不到延时的效果。相⽐于Redisson官⽹⽂档延时队列中给出的代码⽰例,这⾥被包装队列使⽤阻塞队列RBlockingQueue的好处是()会⼀直阻塞直⾄队列内有可消费延时消息,避免⽆意义的循环占⽤CPU。测试单元测试代码与上⼀篇⽂章基本相同,只是将DelayMessageManager的实现换成了RedissonDelayMessageManagerpackage r;import nUtils;import eTask;import ;import essage;import essageType;import ocessingException;import ;import BootTest;import ce;import it;/** * @author * @date 2021/9/22 17:12 * @description 测试 */@SpringBootTest(webEnvironment = _PORT)class DelayMessageManagerTest { @Resource(type = ) DelayMessageManager delayMessageManager; @Test void add() throws JsonProcessingException { DelayMessage delayMessage = new DelayMessage(); QrCode qrCode = new QrCode(); figId("fadfdaf110"); (""); y(String(qrCode)); e(_QR_CODE); essageProperties properties = new essageProperties(); ire(10); eUnit(S); perties(properties); (delayMessage); DelayMessage delayMessage1 = new DelayMessage(); QrCode qrCode1 = new QrCode(); figId("fadfdaf1405"); (""); y(String(qrCode1)); e(_QR_CODE); essageProperties properties1 = new essageProperties(); ire(5); eUnit(S); perties(properties1); (delayMessage1); DelayMessage delayMessage2 = new DelayMessage(); ExecuteTask task = new ExecuteTask(); (1L); y(String(task)); e(E_TASK); essageProperties properties2 = new essageProperties(); ire(9); eUnit(S); perties(properties2); ialId(11235813L); (delayMessage2); //(delayMessage2); }}启动项⽬,并执⾏单元测试向延时队列中加⼊三条延时分别为5s 9s 10s 的延时消息,单元测试控制台输出如下:2021-10-12 22:26:40.690 INFO 21340 --- [ main] onDelayMessageManager : redisson-delay-message-queue,add message = DelayMessage(2021-10-12 22:26:40.693 INFO 21340 --- [ main] onDelayMessageManager : redisson-delay-message-queue,add message = DelayMessage(body=2021-10-12 22:26:40.698 INFO 21340 --- [ main] onDelayMessageManager : redisson-delay-message-queue,add message = DelayMessage(body=项⽬的控制台输出如下:2021-10-12 22:26:45.719 INFO 10452 --- [ Thread-11] onDelayMessageManager : redisson-delay-message-queue,consume message = DelayM2021-10-12 22:26:45.719 INFO 10452 --- [ Thread-11] DelayMessageHandler : ⼆维码延时消息处理中,message = DelayMessage(body={"url":"ww2021-10-12 22:26:49.719 INFO 10452 --- [ Thread-11] onDelayMessageManager : redisson-delay-message-queue,consume message = DelayMessag2021-10-12 22:26:49.719 INFO 10452 --- [ Thread-11] eTaskDelayMessageHandler : 任务延时消息处理中,message=DelayMessage(body={"id":1}, type=E2021-10-12 22:26:50.718 INFO 10452 --- [ Thread-11] onDelayMessageManager : redisson-delay-message-queue,consume message = DelayMessag2021-10-12 22:26:50.719 INFO 10452 --- [ Thread-11] DelayMessageHandler : ⼆维码延时消息处理中,message = DelayMessage(body={"url":"ww在经过5s 9s 10s后,对应的消息均成功被消费。总结在上⼀篇基于RabbitMq延时消息插件实现的延时队列之后,本⽂⼜提供了⼀种基于Redisson实现的延时队列,⼤体上能够满⾜各种业务场景的需求;希望⼤家在写业务代码时能够举⼀反三,不⽌于代码,不⽌于业务,才能在程序员的职业⽣涯⾥有更进⼀步的发展。⽰例代码
发布者:admin,转转请注明出处:http://www.yc00.com/web/1689930010a295208.html
评论列表(0条)