分布式任务调度
在分布式架构下,一个服务会部署多个实例来运行业务;如果在这种分布式系统环境下运行任务调度,称为分布式任务调度。

分布式任务调度框架:xxl-job
xxl-job环境搭建
本机
仓库源码:xxl-job

- 初始化调度数据库
- 修改数据库连接信息

此时启动xxl-job-admin项目,在浏览器输入http://localhost:8080/xxl-job-admin
即可看到调度中心页面
docker
docker安装xxl-job
docker run -d \ -e PARAMS="--spring.datasource.url=jdbc:mysql://192.168.140.102:3306/xxl_job?Unicode=true&characterEncoding=UTF-8 \ --spring.datasource.username=root \ --spring.datasource.password=123" \ -p 8888:8080 -v /tmp:/data/applogs \ --name xxl-job-admin \ --restart=always \ xuxueli/xxl-job-admin:2.3.0
|
此时在浏览器输入http://192.168.140.102:8888/xxl-job-admin
即可看到调度中心页面
项目集成xxl-job
- 导入xxl-job依赖
<dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> <version>2.3.0</version> </dependency>
|
- 添加配置
@Configuration public class XxlJobConfig { private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}") private String adminAddresses;
@Value("${xxl.job.executor.appname}") private String appname;
@Value("${xxl.job.executor.port}") private int port; @Bean public XxlJobSpringExecutor xxlJobExecutor() { logger.info(">>>>>>>>>>> xxl-job config init."); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setPort(port);
return xxlJobSpringExecutor; } }
|

xxl: job: admin: addresses: http://192.168.140.102:8888/xxljob-admin executor: appname: xxl-job-executor-sample port: 9999
|
- 任务代码

@Component public class HelloJob { @XxlJob("demoJobHandler") public void helloJob() { System.out.println("简单任务执行了"); } }
|
任务详解

执行器
任务绑定的执行器,任务触发调度时,会自动发现注册成功的执行器,实现任务自动发现功能。执行器也可以方便的进行任务分组,每个任务必须绑定一个执行器。
报警邮件
任务调度失败时邮件通知的邮箱地址
调度配置

任务配置

- 运行模式:BEAN模式(任务以JobHandler方式维护在执行器端)
- JobHandler:运行模式为 “BEAN模式” 时生效,对应执行器中新开发的JobHandler类“@JobHandler”注解自定义的value值
路由策略

FIRST(第一个):固定选择第一个机器;
LAST(最后一个):固定选择最后一个机器;
ROUND(轮询):每个微服务轮询的去执行任务
RANDOM(随机):随机选择在线的机器;
CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;
FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
**SHARDING_BROADCAST(分片广播)**:广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;
分片广播
任务路由策略选择“分片广播”的情况下,一次任务调度将会广播触发对应集群中所有执行器执行一次任务。
在同一个时间点,如果需要同时执行大量的任务,肯定是需要用到集群的,此时可以给每台服务器分配多个任务。

假设每秒10000请求,
【轮询方式】:实例A在第一秒接受了全部10000个任务并处理,而实例B空闲,第二秒则是实例B接受并处理10000个任务,A空闲
【分片广播方式】:实例A和实例B两个服务同时执行10000个任务
@Component public class HelloJob { @XxlJob("shardingJobHandler") public void shardingJobHandler() { int shardIndex = XxlJobHelper.getShardIndex(); int shardTotal = XxlJobHelper.getShardTotal(); List<Integer> list = getList(); for(Integer i : list) { if(i % shardTotal == shardIndex) { System.out.println("当前第"+shardIndex+"分片执行了,任务项是:" + i); } } } public List getList() { ArrayList<Integer> list = new ArrayList<>(); for (int i = 0; i < 10000; i++) { list.add(i); } return list; } }
|
应用:热点文章定时计算
spring传统的定时任务@Scheduled,但是存在一些问题:
- 集群任务的重复执行问题
- cron表达式定义在代码中,修改不方便
- 定时任务失败了,无法重试,也没有统计
- 如果任务量过大,不能有效地分片执行
需求

计算热点文章分值业务代码
@Override public void computeHotArticle() { Date dayParam = DateTime.now().minusDays(5).toDate(); List<ApArticle> articleList = articleMapper.findArticleListByLast5days(dayParam); List<HotArticleVo> hotArticleVoList = computeArticleScore(articleList); cacheTagToRedis(hotArticleVoList); }
private List<HotArticleVo> computeArticleScore(List<ApArticle> articleList) { if (articleList == null || articleList.size() == 0) { return null; } List<HotArticleVo> hotArticleVoList = new ArrayList<>(); for (ApArticle apArticle : articleList) { int score = computeScore(apArticle); HotArticleVo hotArticleVo = new HotArticleVo(); BeanUtils.copyProperties(apArticle, hotArticleVo); hotArticleVo.setScore(score); hotArticleVoList.add(hotArticleVo); } return hotArticleVoList; }
private int computeScore(ApArticle apArticle) { Integer scere = 0; if(apArticle.getLikes() != null){ scere += apArticle.getLikes() * ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT; } if(apArticle.getViews() != null){ scere += apArticle.getViews(); } if(apArticle.getComment() != null){ scere += apArticle.getComment() * ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT; } if(apArticle.getCollection() != null){ scere += apArticle.getCollection() * ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT; } return scere; }
private void cacheTagToRedis(List<HotArticleVo> hotArticleVoList) { ResponseResult responseResult = wemediaClient.getChannels(); if(responseResult.getCode().equals(200)) { String channelJson = JSON.toJSONString(responseResult.getData()); List<WmChannel> wmChannels = JSON.parseArray(channelJson, WmChannel.class); if(wmChannels != null && wmChannels.size() > 0) { for(WmChannel wmChannel : wmChannels) { List<HotArticleVo> list = hotArticleVoList.stream() .filter(x -> wmChannel.getId() == x.getChannelId()) .limit(30) .sorted(Comparator.comparing(HotArticleVo::getScore).reversed()) .collect(Collectors.toList()); cacheService.set(ArticleConstants.HOT_ARTICLE_FIRST_PAGE + wmChannel.getId(), JSON.toJSONString(list)); } } } List<HotArticleVo> list = hotArticleVoList.stream() .limit(30) .sorted(Comparator.comparing(HotArticleVo::getScore).reversed()) .collect(Collectors.toList()); cacheService.set(ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG, JSON.toJSONString(list)); }
|
设置定时任务(凌晨两点执行一次)
- 新增执行器

- 新增任务

- 引入依赖
<dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> <version>2.3.0</version> </dependency>
|
- 添加配置
@Configuration public class XxlJobConfig { private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class); @Value("${xxl.job.admin.addresses}") private String adminAddresses; @Value("${xxl.job.executor.appname}") private String appname; @Value("${xxl.job.executor.port}") private int port; @Bean public XxlJobSpringExecutor xxlJobExecutor() { logger.info(">>>>>>>>>>> xxl-job config init."); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setPort(port);
return xxlJobSpringExecutor; } }
|
xxl: job: admin: addresses: http://192.168.140.102:8888/xxl-job-admin executor: appname: leadnews-hotarticle-executor port: 9999
|
- 创建任务
@Component @Slf4j @RequiredArgsConstructor public class ComputeHotArticleJob { private final HotArticleService hotArticleService; @XxlJob("computeHotArticleJob") public void handle() { log.info("热文章分值计算调度任务开始执行"); hotArticleService.computeHotArticle(); log.info("热文章分值计算调度任务执行结束"); } }
|
只要在查询文章列表的时候,先去判断redis中是否有数据,如果有数据,直接从redis中获取。(redis中存储的数据就是按照文章的热点分值排序后的)