定时任务:有固定周期,有明确的触发时间
延迟任务:没有固定的开始时间,由一个事件触发,在这个事件触发之后的一段时间内触发另一个事件,任务可以立即执行,也可以延迟执行。

场景1:订单下单30分钟后,如果用户没有付钱,则系统自动取消订单;如果期间下单成功,任务取消。
场景2:接口对接接口出现网络问题,1分钟后重试,如果失败,2分钟后重试,直到出现阈值终止。
实现方式
- DelayQueue,基于JVM
- RabbitMQ:TTL+死信队列
- Redis:zset(本项目实现)
实现思路
- 添加任务的时候会先放入数据库中
- list队列存储的是立即执行的任务,如果
执行时间<=当前时间
,则任务进入list队列中,立即执行。
- zset队列中存储的是未来要执行的任务,如果
执行时间<=当前时间+5min
(不需要把所有未来执行的任务都放入zset中,如果执行时间>当前时间+5min
,则还是放入数据库中),则任务进入zset队列,每分钟定时刷新把到期的任务再放入list队列中。

延迟任务是一个通用的服务,任何有延迟需求的任务都可以调用该服务,内存数据库的存储是有限的,需要考虑数据持久化的问题,存储到数据库中可以保证数据安全。
任务量过大以后,zset的性能会下降
如果任务数据特别大,为了防止阻塞,所以只需要把未来几分钟要执行的数据存入缓存即可,是一种优化的形式。
数据库自身解决并发的两种策略
悲观锁:每次拿数据的时候都认为别人会修改,所以在每次拿数据的时候都会上锁。
乐观锁:每次拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据(版本号)。
乐观锁的实现步骤
- 在版本号上加
@Version
注解
@Version private Integer version;
|
- 在mybatis-plus开启乐观锁支持
@Bean public MybatisPlusInterceptor optimisticLockerInterceptor(){ MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor(); interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor()); return interceptor; }
|
配置redis
- 安装redis:
docker run -d --name redis --restart=always -p 6379:6379 redis --requirepass "leadnews"
|
- 项目中集成redis
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency>
|
spring: redis: host: 192.168.140.102 password: leadnews port: 6379
|
实现
1. 拉取任务
public long addTask(Task task) { boolean success = addTaskToDb(task); if (!success) { throw new RuntimeException("添加任务到数据库失败"); } addTaskToCache(task); return task.getTaskId(); }
private boolean addTaskToDb(Task task) { try { Taskinfo taskinfo = new Taskinfo(); BeanUtils.copyProperties(task, taskinfo); taskinfo.setExecuteTime(new Date(task.getExecuteTime())); taskinfoMapper.insert(taskinfo); TaskinfoLogs taskinfoLogs = new TaskinfoLogs(); BeanUtils.copyProperties(taskinfo, taskinfoLogs); taskinfoLogs.setVersion(1); taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED); taskinfoLogsMapper.insert(taskinfoLogs); task.setTaskId(taskinfo.getTaskId()); } catch (BeansException e) { e.printStackTrace(); return false; } return true; }
private void addTaskToCache(Task task) { String key = task.getTaskType() + "_" + task.getPriority(); Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.MINUTE, 5); long nextScheduledTime = calendar.getTimeInMillis(); if(task.getExecuteTime() <= System.currentTimeMillis()) { cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task)); }else if(task.getExecuteTime() <= nextScheduledTime) { cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime()); } }
|
- 在数据库中保存一份,要保存到任务表和任务数据表中
- 添加任务到redis中,如果≤当前时间的,要存入list中,如果≤预设时间的,要存入zset中。
2. 取消任务

@Override public boolean cancelTask(long taskId) { Task task = updateDb(taskId, ScheduleConstants.CANCELLED); if(task == null) { throw new RuntimeException("删除任务失败"); } removeTaskFromCache(task); return true; }
private void removeTaskFromCache(Task task) { try { String key = task.getTaskType() + "_" + task.getPriority(); if(task.getExecuteTime() <= System.currentTimeMillis()) { cacheService.lRemove(ScheduleConstants.TOPIC + key, 0, JSON.toJSONString(task)); }else { cacheService.zRemove(ScheduleConstants.FUTURE + key, JSON.toJSONString(task)); } } catch (Exception e) { throw new RuntimeException(e); } }
private Task updateDb(long taskId, int status) { Task task = null; try { taskinfoMapper.deleteById(taskId); TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId); taskinfoLogs.setStatus(status); taskinfoLogsMapper.updateById(taskinfoLogs); task = new Task(); BeanUtils.copyProperties(taskinfoLogs, task); task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime()); } catch (Exception e) { throw new RuntimeException(e); } return task; }
|
- 在数据库中,从任务表中删除taskId的任务,更新任务日志表中的日志状态
- 在缓存中,删除list或zset中的任务
3. 消费任务
从list中消费任务

public Task poll(int type, int priority) { Task task = null; try { String key = type + "_" + priority; String taskJson = cacheService.lRightPop(ScheduleConstants.TOPIC + key); if(taskJson != null) { task = JSON.parseObject(taskJson, Task.class); updateDb(task.getTaskId(), ScheduleConstants.EXECUTED); } } catch (Exception e) { e.printStackTrace(); } return task; }
|
未来数据定时刷新
从zset中定时刷新任务到list中

获取zset中所有的key:
Set<String> keys = cacheService.keys("future_*");
|
- scan命令:是一个基于游标的迭代器,每次被调用之后,都会向用户返回一个新的游标,用户在下次迭代时需要使用这个新的游标作为SCAN命令的游标参数。

Set<String> scan = cacheService.scan("future_*");
|
- 添加定时任务,每分钟刷新,从zset中取出符合条件的数据后放入redis管道中

@Scheduled(cron = "0 */1 * * * ?") public void refresh() { log.info("refresh..."); Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*"); for (String futureKey : futureKeys) { String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1]; Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis()); if(!tasks.isEmpty()) { cacheService.refreshWithPipeline(futureKey, topicKey, tasks); } } }
|
- 在启动类上开启定时任务
【目前存在的问题】:如果启动了两个服务,那么这两个服务会在同一时间去执行定时任务,会导致方法抢占的问题。但是我们的需求是只要时间到了,有一台服务在执行定时任务即可。
【解决办法】:分布式锁来解决方法抢占的问题
分布式锁:控制分布式系统有序的去对共享资源进行操作,通过互斥来保证数据的一致性。

服务A在访问的Redis服务的时候,先加锁,此时只有A可以访问Redis服务,B访问Redis服务会失败。A释放锁后,B才可以访问。
A获取锁后,其他客户端不能操作,只能等待A释放锁以后,其他客户端才能操作。
- 加锁代码:
public String tryLock(String name, long expire) { name = name + "_lock"; String token = UUID.randomUUID().toString(); RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory(); RedisConnection conn = factory.getConnection(); try {
Boolean result = conn.set( name.getBytes(), token.getBytes(), Expiration.from(expire, TimeUnit.MILLISECONDS), RedisStringCommands.SetOption.SET_IF_ABSENT ); if (result != null && result) return token; } finally { RedisConnectionUtils.releaseConnection(conn, factory,false); } return null; }
|
- 修改refresh()方法:
@Scheduled(cron = "0 */1 * * * ?") public void refresh() { String token = cacheService.tryLock("FUTRUE_TASK_SYNC", 1000 * 30); if(StringUtils.isNotBlank(token)) { } }
|
4. 数据库任务定时同步到redis中
@PostConstruct @Scheduled(cron = "0 */5 * * * ?") public void reloadData() { clearCache(); Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.MINUTE, 5); long nextScheduledTime = calendar.getTimeInMillis(); List<Taskinfo> taskinfoList = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime, calendar.getTime())); if(taskinfoList != null && !taskinfoList.isEmpty()) { for (Taskinfo taskinfo : taskinfoList) { Task task = new Task(); BeanUtils.copyProperties(taskinfo, task); task.setExecuteTime(taskinfo.getExecuteTime().getTime()); addTaskToCache(task); } } log.info("数据库任务同步到redis中"); }
|