分为调度中心 + 执行器
调度中心:提供可视化界面,配置定时任务,定时去调用执行器
调度中心执行器管理:每个springboot作为执行器, 也就是执行器的标识
任务管理:选中执行器,创建改该执行器下的任务
分片广播:集群情况,并行处理任务
场景:大数据表并行处理任务,分散服务器压力
!!现在一张数据表里有大量数据需要某个服务端应用来处理,要求:
2能够并行处理;
3能够较灵活地控制并行任务数量。
4压力较均衡地分散到不同的服务器节点;
因为需要并行处理同一张数据表里的数据,所以比较自然地想到了分片查询数据,可以利用对 id 取模的方法进行分片,避免同一条数据被重复处理。
根据第 1、2 点要求,本来想通过对线程池的动态配置来实现,但结合第 3 点来考虑,服务器节点数量有可能会变化,节点之间相互无感知无通信,自己在应用内实现一套调度机制可能会很复杂。
如果有现成的独立于这些服务器节点之外的调度器就好了——顺着这个思路,就想到了已经接入的分布式任务调度平台 XXL-JOB,而在阅读其 官方文档 后发现「分片广播 & 动态分片」很贴合这种场景。
/**
* 2、分片广播任务
*/
@XxlJob(“shardingJobHandler”)
public void shardingJobHandler() throws Exception {
// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
XxlJobHelper.log(“分片参数:当前分片序号 = {}, 总分片数 = {}”, shardIndex, shardTotal);
// 业务逻辑
for (int i = 0; i < shardTotal; i++) {
if (i == shardIndex) {
XxlJobHelper.log(“第 {} 片, 命中分片开始处理”, i);
} else {
XxlJobHelper.log(“第 {} 片, 忽略”, i);
}
}
}
防止重复执行: 路由策略选择一致性hash,只会固定走1台机器
任务调度中心,去官网clone,跑起来即可springboot整合 执行器
pomorg.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test commons-lang commons-lang 2.6 com.xuxueli xxl-job-core 2.2.0
yml#执行器配置
xxl:job:accessToken: default_token #tokenadmin:# xxl-job后台管理界面的地址 调度中心地址addresses: http://127.0.0.1:8080/xxl-job-adminexecutor:# 此执行器的名称appname: exectorNameHaHa#执行器地址 http://本机ip:port/address: http://127.0.0.1:9997/# 此执行器的ip 执行器IP默认为空,表示自动获取IP 多网卡时可手动设置指定IP,手动设置IP时将会绑定Host。ip: 127.0.0.1# 此执行器的端口port: 9997# 此执行器的日志存放路径logpath: /data/applogs/xxl-job/jobhandler# 此执行器的日志保存时间logretentiondays: 7
server:port: 8082
配置文件@Configuration
public class XxlJobConfig {private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);@Value("${xxl.job.admin.addresses}")private String adminAddresses;@Value("${xxl.job.accessToken}")private String accessToken;@Value("${xxl.job.executor.appname}")private String appname;@Value("${xxl.job.executor.address}")private String address;@Value("${xxl.job.executor.ip}")private String ip;@Value("${xxl.job.executor.port}")private int port;@Value("${xxl.job.executor.logpath}")private String logPath;@Value("${xxl.job.executor.logretentiondays}")private int logRetentionDays;@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAddress(address);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}/*** 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;** 1、引入依赖:* * org.springframework.cloud * spring-cloud-commons * ${version} * ** 2、配置文件,或者容器启动变量* spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'** 3、获取IP* String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();*/}
执行定时业务逻辑@Component
public class JobToPrint extends IJobHandler {@Override@XxlJob("JobToPrint")public ReturnT execute(String param) throws Exception {try {System.out.println("测试job");return SUCCESS;} catch (Exception e){e.printStackTrace();return FAIL;}}
}
上一篇: 最美的瞬间作文500字
下一篇: 《体育之研究》读后感