分布式任务调度

在分布式架构下,一个服务会部署多个实例来运行业务;如果在这种分布式系统环境下运行任务调度,称为分布式任务调度。
1b9e288ea4a640adb8753d959b2e09c9.png
分布式任务调度框架:xxl-job

xxl-job环境搭建

本机

仓库源码:xxl-job
3a97e7e7a8324307b69c7fc01456b02e.png

  1. 初始化调度数据库
  2. 修改数据库连接信息
    615a6a48782e4dc79fecbbf8b646ad04.png

此时启动xxl-job-admin项目,在浏览器输入http://localhost:8080/xxl-job-admin即可看到调度中心页面

docker

docker安装xxl-job

docker run -d \
-e PARAMS="--spring.datasource.url=jdbc:mysql://192.168.140.102:3306/xxl_job?Unicode=true&characterEncoding=UTF-8 \
--spring.datasource.username=root \
--spring.datasource.password=123" \
-p 8888:8080 -v /tmp:/data/applogs \
--name xxl-job-admin \
--restart=always \
xuxueli/xxl-job-admin:2.3.0

此时在浏览器输入http://192.168.140.102:8888/xxl-job-admin即可看到调度中心页面

项目集成xxl-job

  1. 导入xxl-job依赖
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.0</version>
</dependency>
  1. 添加配置
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

@Value("${xxl.job.admin.addresses}")
private String adminAddresses;

@Value("${xxl.job.executor.appname}")
private String appname;

@Value("${xxl.job.executor.port}")
private int port;

@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setPort(port);

return xxlJobSpringExecutor;
}
}

64cad677848d451e808468bc0c2bb4e4.png

xxl:
job:
admin:
addresses: http://192.168.140.102:8888/xxljob-admin
executor:
appname: xxl-job-executor-sample # 去“任务调度中心-执行器管理”里找
port: 9999
  1. 任务代码
    b637cbc732e54579b113eb1f16660532.png
@Component
public class HelloJob {
@XxlJob("demoJobHandler") // 去任务调度中心里查
public void helloJob() {
System.out.println("简单任务执行了");
}
}

任务详解

2be8cff18c93499ea2aa0de40cd1ab1a.png

执行器

任务绑定的执行器,任务触发调度时,会自动发现注册成功的执行器,实现任务自动发现功能。执行器也可以方便的进行任务分组,每个任务必须绑定一个执行器。

报警邮件

任务调度失败时邮件通知的邮箱地址

调度配置

50d0ee72b2aa4699ad89f5aeba0e550a.png

任务配置

092275beee7b40f382fdb6856156c3c0.png

  1. 运行模式:BEAN模式(任务以JobHandler方式维护在执行器端)
  2. JobHandler:运行模式为 “BEAN模式” 时生效,对应执行器中新开发的JobHandler类“@JobHandler”注解自定义的value值

路由策略

d2ef130a21b24fd5831b1321bede1670.png

  • FIRST(第一个):固定选择第一个机器;

  • LAST(最后一个):固定选择最后一个机器;

  • ROUND(轮询):每个微服务轮询的去执行任务

  • RANDOM(随机):随机选择在线的机器;

  • CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。

  • LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;

  • LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;

  • FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;

  • BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;

  • **SHARDING_BROADCAST(分片广播)**:广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;

分片广播

任务路由策略选择“分片广播”的情况下,一次任务调度将会广播触发对应集群中所有执行器执行一次任务。

在同一个时间点,如果需要同时执行大量的任务,肯定是需要用到集群的,此时可以给每台服务器分配多个任务。
45189dbd63b740a28c31ae30de04fa5f.png

假设每秒10000请求,
【轮询方式】:实例A在第一秒接受了全部10000个任务并处理,而实例B空闲,第二秒则是实例B接受并处理10000个任务,A空闲
【分片广播方式】:实例A和实例B两个服务同时执行10000个任务

@Component
public class HelloJob {
@XxlJob("shardingJobHandler")
public void shardingJobHandler() {
// 分片的参数
int shardIndex = XxlJobHelper.getShardIndex(); // 当前某个分片
int shardTotal = XxlJobHelper.getShardTotal(); // 总分片数
// 业务逻辑
List<Integer> list = getList();
for(Integer i : list) {
if(i % shardTotal == shardIndex) {
System.out.println("当前第"+shardIndex+"分片执行了,任务项是:" + i);
}
}
}
public List getList() {
ArrayList<Integer> list = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
list.add(i);
}
return list;
}
}

应用:热点文章定时计算

spring传统的定时任务@Scheduled,但是存在一些问题:

  1. 集群任务的重复执行问题
  2. cron表达式定义在代码中,修改不方便
  3. 定时任务失败了,无法重试,也没有统计
  4. 如果任务量过大,不能有效地分片执行

需求

4a7ae49e31e841628392f11dd948a393.png

计算热点文章分值业务代码

/**
* 计算热点文章
*/
@Override
public void computeHotArticle() {
// 1. 查询前五天的文章数据
Date dayParam = DateTime.now().minusDays(5).toDate();
List<ApArticle> articleList = articleMapper.findArticleListByLast5days(dayParam);
// 2. 计算文章的分值
List<HotArticleVo> hotArticleVoList = computeArticleScore(articleList);
// 3. 为每个频道缓存30条分值较高的文章
cacheTagToRedis(hotArticleVoList);
}

/**
* 计算文章分值
* @param articleList
* @return
*/
private List<HotArticleVo> computeArticleScore(List<ApArticle> articleList) {
if (articleList == null || articleList.size() == 0) {
return null;
}
List<HotArticleVo> hotArticleVoList = new ArrayList<>();
for (ApArticle apArticle : articleList) {
int score = computeScore(apArticle);
HotArticleVo hotArticleVo = new HotArticleVo();
BeanUtils.copyProperties(apArticle, hotArticleVo);
hotArticleVo.setScore(score);
hotArticleVoList.add(hotArticleVo);
}
return hotArticleVoList;
}

/**
* 计算文章的具体分值
* @param apArticle
* @return
*/
private int computeScore(ApArticle apArticle) {
Integer scere = 0;
if(apArticle.getLikes() != null){
scere += apArticle.getLikes() * ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT;
}
if(apArticle.getViews() != null){
scere += apArticle.getViews();
}
if(apArticle.getComment() != null){
scere += apArticle.getComment() * ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT;
}
if(apArticle.getCollection() != null){
scere += apArticle.getCollection() * ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT;
}
return scere;
}

/**
* 缓存到redis中
* @param hotArticleVoList
*/
private void cacheTagToRedis(List<HotArticleVo> hotArticleVoList) {
// 1. 为每个频道缓存30条分值较高的文章
ResponseResult responseResult = wemediaClient.getChannels();
if(responseResult.getCode().equals(200)) {
// List<WmChannel> wmChannels = (List<WmChannel>)responseResult.getData();
String channelJson = JSON.toJSONString(responseResult.getData());
List<WmChannel> wmChannels = JSON.parseArray(channelJson, WmChannel.class);
// 检索每个频道的文章
if(wmChannels != null && wmChannels.size() > 0) {
for(WmChannel wmChannel : wmChannels) {
// 当前频道的数据,降序排列,最多30条
List<HotArticleVo> list = hotArticleVoList.stream()
.filter(x -> wmChannel.getId() == x.getChannelId())
.limit(30) // 最多30条
.sorted(Comparator.comparing(HotArticleVo::getScore).reversed())// 给文章进行排序
.collect(Collectors.toList());
cacheService.set(ArticleConstants.HOT_ARTICLE_FIRST_PAGE + wmChannel.getId(), JSON.toJSONString(list));
}
}
}
// 2. 设置推荐的数据
List<HotArticleVo> list = hotArticleVoList.stream()
.limit(30) // 最多30条
.sorted(Comparator.comparing(HotArticleVo::getScore).reversed())// 给文章进行排序
.collect(Collectors.toList());
cacheService.set(ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG, JSON.toJSONString(list));
}

设置定时任务(凌晨两点执行一次)

  1. 新增执行器
    70c94d0dea774b75ae5b36cc2263e0a1.png
  2. 新增任务
    405057c1cf6f48349554054187b71db5.png
  3. 引入依赖
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.0</version>
</dependency>
  1. 添加配置
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.port}")
private int port;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setPort(port);

return xxlJobSpringExecutor;
}
}
xxl:
job:
admin:
addresses: http://192.168.140.102:8888/xxl-job-admin
executor:
appname: leadnews-hotarticle-executor
port: 9999
  1. 创建任务
@Component
@Slf4j
@RequiredArgsConstructor
public class ComputeHotArticleJob {
private final HotArticleService hotArticleService;
@XxlJob("computeHotArticleJob")
public void handle() {
log.info("热文章分值计算调度任务开始执行");
hotArticleService.computeHotArticle(); // 调用“计算热点文章分值业务代码”
log.info("热文章分值计算调度任务执行结束");
}
}

只要在查询文章列表的时候,先去判断redis中是否有数据,如果有数据,直接从redis中获取。(redis中存储的数据就是按照文章的热点分值排序后的)