基于Redis实现分布式定时任务

基于Redis实现分布式定时任务

2023年7月21日发(作者:)

基于Redis实现分布式定时任务⽬录1. 技术背景2. 设计思想3. 总结常见问题附录1. 技术背景1.1. Redis Keyspace Notifications从Redis 2.8.0+开始Redis提供了Keyspace Notifications[^1]特性; 这⼀特性使得客户端可以通过发布/订阅来接收redis影响数据集相关事件, 例如:新建KEY对KEY执⾏了LPUSH操作KEY过期1.1.1 配置由于该特性会新增CPU消耗,

keyspance events notifications是默认关闭的, 可通过修改或CONFIG SET 配置notify-keyspace-events来开启,K Keyspace events, published with __keyspace@__ prefix.E Keyevent events, published with __keyevent@__ prefix.g Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...$ String commandsl List commandss Set commandsh Hash commandsz Sorted set commandsx Expired events (events generated every time a key expires)e Evicted events (events generated when a key is evicted for maxmemory)A Alias for g$lshzxe, so that the "AKE" string means all the events.配置中⾄少需要出现K/E, 否则将不会接收到任何事件, 如果配置为KEA则会接收到任何可能的事件。# specify at least one of K or E, no events will be -keyspace-events "KEA"注意: Redis的发布/订阅阅后即焚是不⽀持持久化的, 故如果客户端断开重连则在这期间的消息将丢失!1.1.2 测试订阅事件:6379> PSUBSCRIBE __keyevent@*__: (press Ctrl-C to quit)1) "psubscribe"2) "__keyevent@*__:expired"3) (integer) 1过期⼀个KEYSET foo val EX 10收到通知1) "pmessage"2) "__keyevent@*__:expired"3) "__keyevent@0__:expired"4) "a"1.1.3 RedisKeyExpiredEventedisKeyExpiredEvent⽹上实际有很多其他⽅案, 在spring-data-redis中已提供了对上⾯特性的实现只是很少有⼈介绍到, 我推荐使⽤以下⽅案, 则每当有KEY失效则以下listener会收到消息:public @Bean ApplicationListener redisKeyExpiredEventListener() { return event -> { n(("A Received expire event for key=%s with value %s.", new String(rce()), ue())); }}实现原理是在irationEventMessageListener中订阅事件__keyevent@*__:expired如下:public class KeyExpirationEventMessageListener extends KeyspaceEventMessageListener implements ApplicationEventPublisherAware { private static final Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@*__:expired"); @Override protected void doRegister(RedisMessageListenerContainer listenerContainer) { sageListener(this, KEYEVENT_EXPIRED_TOPIC); } ...

}1.2 Distributed Locks有多种⽅式去实现分布式锁, 关于使⽤Redis做分布式锁我推荐⼤家可以看看附录[^2]官⽅的⽂章, ⾥⾯详细介绍了官⽅推荐的正确的实现⽅式。1.2.1 RedisLockRegistry在Spring Integration[^3]中从4.0开始就提供了⼀种基于redis的分布式锁实现RedisLockRegistry, 可⽤过⽤obtain⽅法直接获取到也很简单:// 1. 创建对象public @Bean RedisLockRegistry redisLockRegistry(RedisConnectionFactory connectionFactory) { return new RedisLockRegistry(connectionFactory, "Foo-API");

}@Autowiredprivate RedisLockRegistry redisLockRegistry;// 并发⽅法public void foo() { lock = null; try { lock = (LockKey(trigger)); if (!k()) { // 未获取到锁 return; } // 已成功获取到分布式锁 } finally { // Unlock safely if (lock != null) try { (); } catch (Exception e) { /* NOTHING */ } }}1.2.3 根据实际的需求选择使⽤tryLock/lock来实现我们的具体场景, java中对该对象定义如下:public interface Lock { /** * Acquires the lock. * * If the lock is not available then the current thread becomes * disabled for thread scheduling purposes and lies dormant until the * lock has been acquired. */ void lock(); /** * Acquires the lock unless the current thread is * {@linkplain Thread#interrupt interrupted}. * * Acquires the lock if it is available and returns immediately. * * If the lock is not available then the current thread becomes * disabled for thread scheduling purposes and lies dormant until * one of two things happens: * *

* The lock is acquired by the current thread; or * Some other thread {@linkplain Thread#interrupt interrupts} the * current thread, and interruption of lock acquisition is supported. *

* * If the current thread: *

* has its interrupted status set on entry to this method; or * is {@linkplain Thread#interrupt interrupted} while acquiring the * lock, and interruption of lock acquisition is supported, *

* then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * * @throws InterruptedException if the current thread is * interrupted while acquiring the lock (and interruption * of lock acquisition is supported) */ void lockInterruptibly() throws InterruptedException; /** /** * Acquires the lock only if it is free at the time of invocation. * * Acquires the lock if it is available and returns immediately * with the value {@code true}. * If the lock is not available then this method will return * immediately with the value {@code false}. * * A typical usage idiom for this method would be: * {@code * Lock lock = ...; * if (k()) { * try { * // manipulate protected state * } finally { * (); * } * } else { * // perform alternative actions * }} * * This usage ensures that the lock is unlocked if it was acquired, and * doesn't try to unlock if the lock was not acquired. * * @return {@code true} if the lock was acquired and * {@code false} otherwise */ boolean tryLock(); /** * Acquires the lock if it is free within the given waiting time and the * current thread has not been {@linkplain Thread#interrupt interrupted}. * * If the lock is available this method returns immediately * with the value {@code true}. * If the lock is not available then * the current thread becomes disabled for thread scheduling * purposes and lies dormant until one of three things happens: *

* The lock is acquired by the current thread; or * Some other thread {@linkplain Thread#interrupt interrupts} the * current thread, and interruption of lock acquisition is supported; or * The specified waiting time elapses *

* * If the lock is acquired then the value {@code true} is returned. * * If the current thread: *

* has its interrupted status set on entry to this method; or * is {@linkplain Thread#interrupt interrupted} while acquiring * the lock, and interruption of lock acquisition is supported, *

* then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * *

If the specified waiting time elapses then the value {@code false} * is returned. * If the time is * less than or equal to zero, the method will not wait at all. * * @param time the maximum time to wait for the lock * @param unit the time unit of the {@code time} argument * @return {@code true} if the lock was acquired and {@code false} * if the waiting time elapsed before the lock was acquired * * @throws InterruptedException if the current thread is interrupted * while acquiring the lock (and interruption of lock * acquisition is supported) */ boolean tryLock(long time, TimeUnit unit) throws InterruptedException; /** /** * Releases the lock. */ void unlock(); ...}2. 设计思想流程图2.1 任务管理定义任务管理服务, ⽤于受理其他服务程序通过RPC/DB/MQ等任务创建指令, 该服务根据任务等元数据(META DATA)判断任务是需要⽴即执⾏或是延时执⾏。⽴即执⾏ - ⽴即把任务交接给任务执⾏⽴即开始执⾏。延时执⾏ - 将任务数据存⼊Redis并设置TTL = (执⾏时间 - 当前时间)。2.2 执⾏任务根据不同等任务数据调⽤不⽤等任务具体实⽅法去执⾏任务, 例如执⾏⼀条SQL、执⾏⼀个RPC调⽤等, 执⾏成功则任务调度完成, 执⾏不成功则根据任务元数据(META DATA)来控制任务执⾏情况, 例如可约定以下数据:RETRY_INTERVAL = 3000 # 任务失败重试间隔MAX_RETRIES = 3 # 任务失败最⼤重试次数当任务执⾏失败且还满⾜可执⾏条件, 则根据配置RETRY_INTERVAL将任务数据放⼊Redis并设置TTL = RETRY_INTERVAL, 则任务则会在TTL之后重新被执⾏。根据前⾯技术背景中提到当Redis现有当特性, 以及前⾯我们根据KEY的TTL来控制任务的执⾏, 则收到KEY过期事件即代表任务达到执⾏时间了;但在分布式环境中, 多个JVM会同时监听到KEY过期, 为了防⽌任务重复执⾏, 所以在可执⾏任务前需要再结合分布式锁获取到锁的JVM⽅可执⾏任务, 否则直接忽略该事件, 因为其他JVM已经执⾏了该任务。3. 总结本⽂描述的⽅案主要结合了Redis两⼤特性:Keyspace Notifications[^1]基于Redis的分布式锁来实现来分布式任务调度, 都基于Redis来实现, 较⼤程度发挥了其⾃⾝优势, 相较于quartz[^4]更加轻量级。常见问题KEY过期没有触发失效事件检查redis中notify-keyspace-events配置情况, 或者直接通过redis-cli连接到redis执⾏MONITOR指令观察消息情况。附录

发布者:admin,转转请注明出处:http://www.yc00.com/web/1689930058a295211.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信