定时任务解决方案 定时任务概述 在很多应用中我们都是需要执行一些定时任务的,比如定时发送短信,定时统计数据,在实际使用中我们使用什么定时任务框架来实现我们的业务,定时任务使用中会遇到哪些坑,如何最大化的提高定时任务的性能。
我们这里主要介绍单机和分布式两大类的解决方案,并且简要介绍两类方案中的常见的应用组件或者框架的应用场景和基本的实现原理,重点分析下单机的定时任务的实现原理和优缺点。
为什么需要定时任务
下面是几个常见的定时任务场景
某系统凌晨要进行数据备份。
某电商平台,用户下单半个小时未支付的情况下需要自动取消订单。
某媒体聚合平台,每 10 分钟动态抓取某某网站的数据为自己所用。
某博客平台,支持定时发送文章。
某基金平台,每晚定时计算用户当日收益情况并推送给用户最新的数据。
定时任务选型 单机定时任务
分布式的定时任务框架也是通过单机的原理而来,这里先介绍单机的几种实现方案,并且简单的对比分析
while+sleep方案
我们自己来实现一个定时任务,可以采用最简单的while循环加上一个sleep休眠方案,sleep是需要休眠的事件,下面就是我们事件的一个每隔5s休眠一次的案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class Scheduled1 { private static final long timeInterval = 5000 ; public static void main (String[] args) { new Thread (()->{ while (true ){ System.out.println("定时任务每隔" +timeInterval+"毫秒执行一次" ); try { Thread.sleep(timeInterval); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } }
实现还是很简单的,但是有一个问题,如果我们不仅仅只有定时5s的,还有3s、10s、20s的如何解决呢?不能每来一个不同的定时任务都需要新启动一个线程,这样会造成很多缺点:代码量巨大、开启线程很多占用内存、上下文切换频繁等
Time定时器
定时计划任务功能在Java中主要使用的就是Timer对象,它在内部使用多线程的方式进行处理,所以它和多线程技术还是有非常大的关联的。在JDK中Timer类主要负责计划任务的功能,也就是在指定的时间开始执行某一个任务,但封装任务的类却是TimerTask类
创建定时任务
通过继承 TimerTask 类 并实现 run() 方法来自定义要执行的任务
1 2 3 4 5 6 public class TimeTask1 extends TimerTask { @Override public void run () { System.out.println("定时任务运行了" ); } }
调度定时任务
通过执行Timer.schedule(TimerTask task,Date time) 在执行时间运行任务
1 2 3 4 5 6 7 8 public class TimeScheduled { private static final long timeInterval = 5000 ; public static void main (String[] args) { Timer timer = new Timer (); timer.schedule(new TimeTask1 (),10 ,timeInterval); } }
我们发现和我们第一个方案差不多,但是Timer的方案更加科学高效,我们发现他是可以支持延时执行,并且是可以支持定点执行
缺点 比如一个 Timer
一个线程,这就导致 Timer
的任务的执行只能串行执行,一个任务执行时间过长的话会影响其他任务
Timer
类上的有一段注释是这样写的
大概的意思就是:ScheduledThreadPoolExecutor
支持多线程执行定时任务并且功能更强大,是 Timer
的替代品。
线程池方式
ScheduledExecutorService
是一个接口,有多个实现类,比较常用的是 ScheduledThreadPoolExecutor
,
jdk自带的一个类是基于线程池设计的定时任务类,每个调度任务都会分配到线程池中的一个线程去执行,也就是说,任务是并发执行,互不影响
代码案例 1 2 3 4 5 6 public class ScheduledExecutor { public static void main (String[] args) { ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); service.scheduleAtFixedRate(() -> System.out.println("执行任务" ), 10 , 5 , TimeUnit.SECONDS); } }
不论是使用 Timer
还是 ScheduledExecutorService
都无法使用 Cron 表达式指定任务执行的具体时间。
springTask
SpringTask是Spring自主研发的轻量级定时任务工具,相比于Quartz更加简单方便,且不需要引入其他依赖即可使用
启动SpringTask
在配置类中添加一个@EnableScheduling注解即可开启SpringTask的定时任务
1 2 3 4 5 6 7 8 @SpringBootApplication @EnableScheduling public class SpringTaskApplication { public static void main (String[] args) { SpringApplication.run(SpringTaskApplication.class); } }
创建任务类
我们直接通过 Spring 提供的 @Scheduled
注解即可定义定时任务,非常方便!
1 2 3 4 5 6 7 @Component public class SpringTask { @Scheduled(cron = "0/5 * * * * ?") public void testTask () throws InterruptedException { System.out.println("执行SpringTask任务,时间:" + LocalDateUtils.getLocalDateTimeStr()); } }
支持Cron表达式
Spring Task 支持 Cron 表达式
Cron 表达式主要用于定时作业(定时任务)系统定义执行时间或执行频率的表达式,非常厉害,你可以通过 Cron 表达式进行设置定时任务每天或者每个月什么时候执行等等操作
推荐一个在线 Cron 表达式生成器:http://cron.qqe2.com/
优缺点
优点:简单,轻量,支持 Cron 表达式
缺点 :功能单一
分布式定时任务
上面提到的一些定时任务的解决方案都是在单机下执行的,适用于比较简单的定时任务场景比如每天凌晨备份一次数据
如果我们需要一些高级特性比如支持任务在分布式场景下的分片和高可用的话,我们就需要用到分布式任务调度框架了
Quartz
Quartz是一个完全由Java编写的开源任务调度的框架,通过触发器设置作业定时运行规则,控制作业的运行时间,其中quartz集群通过故障切换和负载平衡的功能,能给调度器带来高可用性和伸缩性
quartz也是用的比较多的定时任务,很多分布式定时任务或者定制定时任务都是基于quartz来实现的,比如elastic-job就是借鉴quartz来实现的
优缺点
优点:可以与 Spring
集成,并且支持动态添加任务和集群。
缺点 :分布式支持不友好,没有内置 UI 管理控制台、使用麻烦(相比于其他同类型框架来说)
Elastic-Job
Elastic-job是当当网张亮主导开发的分布式任务调度框架,结合zookeeper技术解决quartz框架在分布式系统中重复的定时任务导致的不可预见的错误,功能丰富强大,实现任务高可用以及分片
Elastic-Job 中的定时调度都是由执行器自行触发,这种设计也被称为去中心化设计(调度和处理都是执行器单独完成)。
功能列表
ElasticJob
支持任务在分布式场景下的分片和高可用、任务可视化管理等功能
优缺点总结
优点 :可以与 Spring
集成、支持分布式、支持集群、性能不错
缺点 :依赖了额外的中间件比如 Zookeeper(复杂度增加,可靠性降低、维护成本变高)
XXL-JOB
XXL-JOB
于 2015 年开源,是一款优秀的轻量级分布式任务调度框架,支持任务可视化管理、弹性扩容缩容、任务失败重试和告警、任务分片等功能
功能列表
根据 XXL-JOB
官网介绍,其解决了很多 Quartz
的不足。
不同于 Elastic-Job
的去中心化设计, XXL-JOB
的采用了中心化设计(调度中心调度多个执行器执行任务)
和 Quzrtz
类似 XXL-JOB
也是基于数据库锁调度任务,存在性能瓶颈,不过,一般在任务量不是特别大的情况下,没有什么影响的,可以满足绝大部分公司的要求。
优缺点总结:
优点:开箱即用(学习成本比较低)、与 Spring 集成、支持分布式、支持集群、内置了 UI 管理控制台。
缺点:不支持动态添加任务(如果一定想要动态创建任务也是支持的)。
组件对比
下图是常见的几个分布式定时任务的对比
feature
quartz
elastic-job-lite
xxl-job
依赖
mysql
jdk1.7+, zookeeper 3.4.6+ ,maven3.0.4+
mysql ,jdk1.7+ , maven3.0+
HA
多节点部署,通过竞争数据库锁来保证只有一个节点执行任务
通过zookeeper的注册与发现,可以动态的添加服务器。支持水平扩容
集群部署
任务分片
—
支持
支持
文档完善
完善
完善
完善
管理界面
无
支持
支持
难易程度
简单
较复杂
简单
公司
OpenSymphony
当当网
个人
高级功能
—
弹性扩容,多种作业模式,失效转移,运行状态收集,多线程处理数据,幂等性,容错处理,spring命名空间支持
弹性扩容,分片广播,故障转移,Rolling实时日志,GLUE(支持在线编辑代码,免发布),任务进度监控,任务依赖,数据加密,邮件报警,运行报表,国际化
缺点
没有管理界面,以及不支持任务分片等。不适用于分布式场景
需要引入zookeeper , mesos, 增加系统复杂度, 学习成本较高
调度中心通过获取 DB锁来保证集群中执行任务的唯一性, 如果短任务很多,随着调度中心集群数量增加,那么数据库的锁竞争会比较厉害,性能不好。
使用企业
大众化产品,对分布式调度要求不高的公司大面积使用
36氪,当当网,国美,金柚网,联想,唯品会,亚信,平安,猪八戒
大众点评,运满满,优信二手车,拍拍贷
Quartz框架
上面我们也介绍了quartz,纯Java编写的定时任务框架
1 ![img](./../../../../../blog/docs/03_微服务/12_开发辅助框架或库/C:/工作台/笔记/08_其他/定时调度/定时调度/quartz05.png)
quartz特点
Quartz是一个优秀的任务调度框架, 具有以下特点
强大的调度功能,例如支持丰富多样的调度方法,可以满足各种常规及特殊需求;
负载均衡
高可用
quartz 架构体系
Quartz 设计有四个核心类,分别是Scheduler(调度器)、Job(任务) 、Trigger(触发器)、JobDetail(任务详情),他们是使用Quartz的关键。
调度器作为作业的总指挥,触发器作为作业的操作者,作业为应用的功能模块,其关系如下图所示:
Job接口
定时任务的接口,具体定时任务需要实现该接口
定义需要执行的任务,该类是一个接口,只定义了一个方法execute(JobExecutionContext context)
,在实现类的execute
方法中编写所需要定时执行的Job(任务),JobExcutionContext
类提供了调度应用的一些信息。Job运行时的信息保存在JobDataMap实例中。
1 2 3 4 5 6 public class MyJob implements Job { @Override public void execute (JobExecutionContext jobExecutionContext) throws JobExecutionException { System.out.println("开始执行定时任务..." ); } }
Trigger接口
负责设置调度策略,该类是一个接口,描述触发job执行的时间触发规则
有以下这些子类,其中经常用到的是cronTigger
公共属性
triggerKey:表示Trigger身份的属性
jobKey:Trigger触发时被执行的Job的身份
startTime:Trigger第一次触发的时间
endTime:Trigger失效的时间点
优先级(priority):如果Trigger很多,或者Quartz线程池的工作线程太少,Quartz可能没有足够的资源同时触发所有的Trigger,这种情况下,如果希望某些Trigger优先被触发,就需要给它设置优先级,Trigger默认的优先级为5,优先级priority属性的值可以是任意整数,正数、负数都可以。(只有同时触发的Trigger之间才会比较优先级)
SimpleTrigger
指定从某一个时间开始,以一定时间间隔(单位:毫秒)执行的任务
关键属性
repeatInterval:重复间隔
repeatCount:重复次数,实际执行次数是repeatCount+1(因为在startTime的时候一定会执行一次)
代码示例
1 2 3 4 5 6 7 8 9 10 11 12 TriggerBuilder.newTrigger() .withIdentity("my_job_tigger" , "my_job_tigger_group" ) .startAt(new Date (System.currentTimeMillis() + 5000 )) .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5 ).withRepeatCount(10 )) .endAt(new Date (System.currentTimeMillis() + 15000 )) .forJob("自定义JOB" ) .build();
CalendarIntervalTrigger
类似于SimpleTrigger,指定从某一个时间开始,以一定的时间间隔执行的任务
但是不同的是SimpleTrigger指定的时间间隔为毫秒,没办法指定每隔一个月执行一次(每月的时间间隔不是固定值),而CalendarIntervalTrigger支持的间隔单位有秒,分钟,小时,天,月,年,星期
优点
更方便,比如每隔1小时执行,你不用自己去计算1小时等于多少毫秒
支持不是固定长度的间隔,比如间隔为月和年。但劣势是精度只能到秒
关键属性
interval 执行间隔:intervalUnit 执行间隔的单位(秒,分钟,小时,天,月,年,星期)
代码示例
1 2 3 4 5 6 7 8 9 10 11 12 TriggerBuilder.newTrigger() .withIdentity("my_job_tigger" , "my_job_tigger_group" ) .startNow() .withSchedule(CalendarIntervalScheduleBuilder.calendarIntervalSchedule().withInterval(10 , DateBuilder.IntervalUnit.SECOND)) .endAt(new Date (System.currentTimeMillis() + 15000 )) .forJob("calendar_tigger_test" ) .build();
DailyTimeIntervalTrigger
指定每天的某个时间段内,以一定的时间间隔执行任务,并且它可以支持指定星期
关键属性
startTimeOfDay:每天开始时间
endTimeOfDay:每天结束时间
daysOfWeek:需要执行的星期
代码案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 TriggerBuilder.newTrigger() .withIdentity("my_job_tigger" , "my_job_tigger_group" ) .startNow() .withSchedule(DailyTimeIntervalScheduleBuilder.dailyTimeIntervalSchedule() .startingDailyAt(TimeOfDay.hourAndMinuteOfDay(10 , 0 )) .endingDailyAt(TimeOfDay.hourAndMinuteOfDay(20 , 0 )) .onDaysOfTheWeek(DateBuilder.MONDAY, DateBuilder.TUESDAY, DateBuilder.WEDNESDAY, DateBuilder.THURSDAY) .withIntervalInHours(1 ) .withRepeatCount(10 ) ) .endAt(new Date (System.currentTimeMillis() + 15000 )) .forJob("calendar_tigger_test" ) .build();
CronTrigger
适合于更复杂的任务,它支持类型于Linux Cron的语法(并且更强大)
代码案例
1 2 3 4 5 6 7 8 9 10 11 12 TriggerBuilder.newTrigger() .withIdentity("my_job_tigger" , "my_job_tigger_group" ) .startNow() .withSchedule(CronScheduleBuilder.cronSchedule("*/5 * * * * ?" )) .endAt(new Date (System.currentTimeMillis() + 15000 )) .forJob("calendar_tigger_test" ) .build();
JobDetail
描述Job的实现类及其它相关的静态信息,如:Job名字、描述、关联监听器等信息
Quartz在每次执行Job时,都重新创建一个Job实例,所以它不直接接受一个Job的实例,相反它接收一个Job实现类,以便运行时通过newInstance()的反射机制实例化Job。
因此需要通过一个类来描述Job的实现类及其它相关的静态信息,如Job名字、描述、关联监听器等信息,JobDetail承担了这一角色,JobDetail 用来保存我们作业的详细信息。
一个JobDetail可以有多个Trigger,但是一个Trigger只能对应一个JobDetail
1 JobBuilder.newJob(MyJob.class).withIdentity("MyJob_1", "JobGroup_1").build();
Scheduler
调度器就相当于一个容器,装载着任务和触发器
Scheduler负责管理Quartz的运行环境,Quartz它是基于多线程架构的,它启动的时候会初始化一套线程,这套线程会用来执行一些预置的作业。
Trigger和JobDetail可以注册到Scheduler中,Scheduler可以将Trigger绑定到某一JobDetail中,这样当Trigger触发时,对应的Job就被执行
Scheduler拥有一个SchedulerContext,它类似于ServletContext,保存着Scheduler上下文信息,Job和Trigger都可以访问SchedulerContext内的信息。Scheduler使用一个线程池作为任务运行的基础设施,任务通过共享线程池中的线程提高运行效率
创建调度器 Scheduler接口有两个实现类,分别为StdScheduler(标准默认调度器)和RemoteScheduler(远程调度器),我们重点介绍下StdScheduler实例,StdScheduler只提供了一个带参构造方法,此构造需要传递QuartzScheduler和SchedulingContext两个实例参数
1 public StdScheduler(QuartzScheduler sched, SchedulingContext schedCtxt)
然而我们一般不使用构造方法去创建调度器,而是通过调度器工厂来创建,调度器工厂接口SchedulerFactory
提供了两种不同类型的工厂实现,分别是DirectSchedulerFactory
和StdSchedulerFactory
而DirectSchedulerFactory
一般用的比较少,更多的场景下我们使用StdSchedulerFactory
工厂来创建
创建方式
StdSchedulerFactory提供三种方式创建调度器实例
通过java.util.Properties属性实例
通过外部属性文件提供
通过有属性文件内容的 java.io.InputStream 文件流提供
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public static void main (String[] args) { try { StdSchedulerFactory schedulerFactory = new StdSchedulerFactory (); Properties props = new Properties (); props.put(StdSchedulerFactory.PROP_THREAD_POOL_CLASS, "org.quartz.simpl.SimpleThreadPool" ); props.put("org.quartz.threadPool.threadCount" , 5 ); schedulerFactory.initialize(props); Scheduler scheduler = schedulerFactory.getScheduler(); } catch (Exception e) { e.printStackTrace(); } }
集群方案
上面的单机方案存在着单点问题,如果定时任务在多个服务器上运行,则会重复触发,为了解决这些问题,就需要使用quartz的集群方案
集群架构
一个Quartz集群中的每个节点是一个独立的Quartz应用,它又管理着其他的节点。
这就意味着你必须对每个节点分别启动或停止,Quartz集群中,独立的Quartz节点并不与另一节点或是管理节点通信,而是通过相同的数据库表来感知到另一Quartz应用的。
初始化数据库 1 2 3 4 5 6 docker run -itd --name mysql-quartz -p 3306:3306 -v /opt/scheduleTask/quartz:/opt -e MYSQL_ROOT_PASSWORD=123456 mysql:5.7 docker exec -it mysql-quartz bash mysql> create database quartz default charset 'utf8' ; mysql> use quartz; mysql> source /opt/tables_mysql.sql
因为Quartz集群依赖于数据库,所以必须首先创建Quartz数据库表,Quartz发布包中包括了所有被支持的数据库平台的SQL脚本
这些SQL脚本存放于/docs/dbTables 目录下找到对应数据库的SQL文件这里采用的是tables_mysql.sql
对应表简单含义如下
表明
功能
QRTZ_CALENDARS
以 Blob 类型存储 Quartz 的 Calendar 信息
QRTZ_CRON_TRIGGERS
存储 Cron Trigger,包括 Cron 表达式和时区信息
QRTZ_FIRED_TRIGGERS
存储与已触发的 Trigger 相关的状态信息,以及相联 Job 的执行信息
QRTZ_PAUSED_TRIGGER_GRPS
存储已暂停的 Trigger 组的信息
QRTZ_SCHEDULER_STATE
存储少量的有关 Scheduler 的状态信息,和别的 Scheduler 实例(假如是用于一个集群中)
QRTZ_LOCKS
存储程序的悲观锁的信息(假如使用了悲观锁)
QRTZ_JOB_DETAILS
存储每一个已配置的 Job 的详细信息
QRTZ_SIMPLE_TRIGGERS
存储简单的 Trigger,包括重复次数,间隔,以及已触发的次数
QRTZ_BLOG_TRIGGERS
Trigger 作为 Blob 类型存储(用于 Quartz 用户用 JDBC 创建他们自己定制的 Trigger 类型,JobStore 并不知道如何存储实例的时候)
QRTZ_TRIGGER_LISTENERS
存储已配置的 TriggerListener 的信息
QRTZ_TRIGGERS
存储已配置的 Trigger 的信息
引入pom
将需要的pom文件引入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 <dependency > <groupId > org.quartz-scheduler</groupId > <artifactId > quartz</artifactId > <version > 2.2.1</version > </dependency > <dependency > <groupId > mysql</groupId > <artifactId > mysql-connector-java</artifactId > <version > 5.1.38</version > </dependency > <dependency > <groupId > commons-lang</groupId > <artifactId > commons-lang</artifactId > <version > 2.6</version > </dependency >
编辑quartz.properties 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 org.quartz.scheduler.instanceName : DefaultQuartzScheduler org.quartz.scheduler.rmi.export : false org.quartz.scheduler.rmi.proxy : false org.quartz.scheduler.wrapJobExecutionInUserTransaction : false org.quartz.threadPool.class : org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount : 10 org.quartz.threadPool.threadPriority : 5 org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread : true org.quartz.jobStore.misfireThreshold : 60000 org.quartz.jobStore.class :org.quartz.impl.jdbcjobstore.JobStoreTX org.quartz.jobStore.driverDelegateClass :org.quartz.impl.jdbcjobstore.StdJDBCDelegate org.quartz.jobStore.useProperties :true org.quartz.jobStore.tablePrefix :qrtz_ org.quartz.jobStore.dataSource :qzDS org.quartz.dataSource.qzDS.driver :com.mysql.jdbc.Driver org.quartz.dataSource.qzDS.URL :jdbc:mysql://192.168.10.30:3306/quartz org.quartz.dataSource.qzDS.user :root org.quartz.dataSource.qzDS.password :123456 org.quartz.dataSource.qzDS.maxConnection :10
整合SpringBoot 注册Quartz注册工厂
该类是将quartz自己创建的类交给spring进行管理以及自动注入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Component public class QuartzJobFactory extends AdaptableJobFactory { @Autowired private AutowireCapableBeanFactory capableBeanFactory; @Override protected Object createJobInstance (TriggerFiredBundle bundle) throws Exception { Object jobInstance = super .createJobInstance(bundle); capableBeanFactory.autowireBean(jobInstance); return jobInstance; } }
注册调度工厂 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Configuration public class QuartzConfig { @Autowired private QuartzJobFactory jobFactory; @Bean public SchedulerFactoryBean schedulerFactoryBean () throws IOException { PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean (); propertiesFactoryBean.setLocation(new ClassPathResource ("/quartz.properties" )); propertiesFactoryBean.afterPropertiesSet(); SchedulerFactoryBean factory = new SchedulerFactoryBean (); factory.setQuartzProperties(propertiesFactoryBean.getObject()); factory.setJobFactory(jobFactory); factory.setApplicationContextSchedulerContextKey("applicationContextKey" ); factory.setWaitForJobsToCompleteOnShutdown(true ); factory.setOverwriteExistingJobs(false ); factory.setStartupDelay(10 ); return factory; } @Bean(name = "scheduler") public Scheduler scheduler () throws IOException, SchedulerException { Scheduler scheduler = schedulerFactoryBean().getScheduler(); return scheduler; } }
配置Quartz数据源
默认 Quartz 的数据连接池是 c3p0 ,由于性能不太稳定,不推荐使用,因此我们将其改成driud
数据连接池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 public class DruidConnectionProvider implements ConnectionProvider { public String driver; public String URL; public String user; public String password; public int maxConnection; public String validationQuery; private boolean validateOnCheckout; private int idleConnectionValidationSeconds; public String maxCachedStatementsPerConnection; private String discardIdleConnectionsSeconds; public static final int DEFAULT_DB_MAX_CONNECTIONS = 10 ; public static final int DEFAULT_DB_MAX_CACHED_STATEMENTS_PER_CONNECTION = 120 ; private DruidDataSource datasource; @Override public Connection getConnection () throws SQLException { return datasource.getConnection(); } @Override public void shutdown () throws SQLException { datasource.close(); } @Override public void initialize () throws SQLException { if (this .URL == null ) { throw new SQLException ("DBPool could not be created: DB URL cannot be null" ); } if (this .driver == null ) { throw new SQLException ("DBPool driver could not be created: DB driver class name cannot be null!" ); } if (this .maxConnection < 0 ) { throw new SQLException ("DBPool maxConnectins could not be created: Max connections must be greater than zero!" ); } datasource = new DruidDataSource (); try { datasource.setDriverClassName(this .driver); } catch (Exception e) { try { throw new SchedulerException ("Problem setting driver class name on datasource: " + e.getMessage(), e); } catch (SchedulerException e1) { } } datasource.setUrl(this .URL); datasource.setUsername(this .user); datasource.setPassword(this .password); datasource.setMaxActive(this .maxConnection); datasource.setMinIdle(1 ); datasource.setMaxWait(0 ); datasource.setMaxPoolPreparedStatementPerConnectionSize(DEFAULT_DB_MAX_CONNECTIONS); if (this .validationQuery != null ) { datasource.setValidationQuery(this .validationQuery); if (!this .validateOnCheckout) { datasource.setTestOnReturn(true ); } else { datasource.setTestOnBorrow(true ); } datasource.setValidationQueryTimeout(this .idleConnectionValidationSeconds); } } public String getDriver () { return driver; } public void setDriver (String driver) { this .driver = driver; } public String getURL () { return URL; } public void setURL (String URL) { this .URL = URL; } public String getUser () { return user; } public void setUser (String user) { this .user = user; } public String getPassword () { return password; } public void setPassword (String password) { this .password = password; } public int getMaxConnection () { return maxConnection; } public void setMaxConnection (int maxConnection) { this .maxConnection = maxConnection; } public String getValidationQuery () { return validationQuery; } public void setValidationQuery (String validationQuery) { this .validationQuery = validationQuery; } public boolean isValidateOnCheckout () { return validateOnCheckout; } public void setValidateOnCheckout (boolean validateOnCheckout) { this .validateOnCheckout = validateOnCheckout; } public int getIdleConnectionValidationSeconds () { return idleConnectionValidationSeconds; } public void setIdleConnectionValidationSeconds (int idleConnectionValidationSeconds) { this .idleConnectionValidationSeconds = idleConnectionValidationSeconds; } public DruidDataSource getDatasource () { return datasource; } public void setDatasource (DruidDataSource datasource) { this .datasource = datasource; } public String getDiscardIdleConnectionsSeconds () { return discardIdleConnectionsSeconds; } public void setDiscardIdleConnectionsSeconds (String discardIdleConnectionsSeconds) { this .discardIdleConnectionsSeconds = discardIdleConnectionsSeconds; } }
创建完成之后,还需要在quartz.properties
配置文件中设置以下数据源
1 2 #数据库连接池,将其设置为druid org.quartz.dataSource.qzDS.connectionProvider.class=cn.itcast.config.DruidConnectionProvider
1 2 3 4 5 6 7 8 9 { "jobName" : "myJob" , "groupName" : "default" , "jobClass" : "cn.itcast.quartz.MyJob" , "cronExpression" : "0/5 * * * * ?" , "param" : { "hello" : "world" } }
任务管理
默认quartz的功能是有限的,我们可以自己实现quartz的任务管理,比如添加、删除、暂停、运行定时任务
管理接口
该接口是定时任务的管理接口,可以对定时任务进行管理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 public interface QuartzJobService { void addJob (String clazzName, String jobName, String groupName, String cronExp, Map<String, Object> param) ; void pauseJob (String jobName, String groupName) ; void resumeJob (String jobName, String groupName) ; void runOnce (String jobName, String groupName) ; void updateJob (String jobName, String groupName, String cronExp, Map<String, Object> param) ; void deleteJob (String jobName, String groupName) ; void startAllJobs () ; void pauseAllJobs () ; void resumeAllJobs () ; void shutdownAllJobs () ; }
管理实现类
该类是定时任务的具体实现,是实现了quartz的各种操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 @Service public class QuartzJobServiceImpl implements QuartzJobService { private static final Logger log = LoggerFactory.getLogger(QuartzJobServiceImpl.class); @Autowired private Scheduler scheduler; @Override public void addJob (String clazzName, String jobName, String groupName, String cronExp, Map<String, Object> param) { try { Class<? extends Job > jobClass = (Class<? extends Job >) Class.forName(clazzName); JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, groupName).build(); CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExp); CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, groupName).withSchedule(scheduleBuilder).build(); if (param != null ) { trigger.getJobDataMap().putAll(param); } scheduler.scheduleJob(jobDetail, trigger); } catch (Exception e) { log.error("创建任务失败" , e); } } @Override public void pauseJob (String jobName, String groupName) { try { scheduler.pauseJob(JobKey.jobKey(jobName, groupName)); } catch (SchedulerException e) { log.error("暂停任务失败" , e); } } @Override public void resumeJob (String jobName, String groupName) { try { scheduler.resumeJob(JobKey.jobKey(jobName, groupName)); } catch (SchedulerException e) { log.error("恢复任务失败" , e); } } @Override public void runOnce (String jobName, String groupName) { try { scheduler.triggerJob(JobKey.jobKey(jobName, groupName)); } catch (SchedulerException e) { log.error("立即运行一次定时任务失败" , e); } } @Override public void updateJob (String jobName, String groupName, String cronExp, Map<String, Object> param) { try { TriggerKey triggerKey = TriggerKey.triggerKey(jobName, groupName); CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); if (cronExp != null ) { CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExp); trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build(); } if (param != null ) { trigger.getJobDataMap().putAll(param); } scheduler.rescheduleJob(triggerKey, trigger); } catch (Exception e) { log.error("更新任务失败" , e); } } @Override public void deleteJob (String jobName, String groupName) { try { scheduler.pauseTrigger(TriggerKey.triggerKey(jobName, groupName)); scheduler.unscheduleJob(TriggerKey.triggerKey(jobName, groupName)); scheduler.deleteJob(JobKey.jobKey(jobName, groupName)); } catch (Exception e) { log.error("删除任务失败" , e); } } @Override public void startAllJobs () { try { scheduler.start(); } catch (Exception e) { log.error("开启所有的任务失败" , e); } } @Override public void pauseAllJobs () { try { scheduler.pauseAll(); } catch (Exception e) { log.error("暂停所有任务失败" , e); } } @Override public void resumeAllJobs () { try { scheduler.resumeAll(); } catch (Exception e) { log.error("恢复所有任务失败" , e); } } @Override public void shutdownAllJobs () { try { if (!scheduler.isShutdown()) { scheduler.shutdown(true ); } } catch (Exception e) { log.error("关闭所有的任务失败" , e); } } }
API接口
通过实现该接口可以通过外部API对定时任务进行管理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 @RestController @RequestMapping("/quartz") public class QuartzController { private static final Logger log = LoggerFactory.getLogger(QuartzController.class); @Autowired private QuartzJobService quartzJobService; @RequestMapping("/addJob") public Object addJob (@RequestBody QuartzConfigDTO configDTO) { quartzJobService.addJob(configDTO.getJobClass(), configDTO.getJobName(), configDTO.getGroupName(), configDTO.getCronExpression(), configDTO.getParam()); return HttpStatus.OK; } @RequestMapping("/pauseJob") public Object pauseJob (@RequestBody QuartzConfigDTO configDTO) { quartzJobService.pauseJob(configDTO.getJobName(), configDTO.getGroupName()); return HttpStatus.OK; } @RequestMapping("/resumeJob") public Object resumeJob (@RequestBody QuartzConfigDTO configDTO) { quartzJobService.resumeJob(configDTO.getJobName(), configDTO.getGroupName()); return HttpStatus.OK; } @RequestMapping("/runOnce") public Object runOnce (@RequestBody QuartzConfigDTO configDTO) { quartzJobService.runOnce(configDTO.getJobName(), configDTO.getGroupName()); return HttpStatus.OK; } @RequestMapping("/updateJob") public Object updateJob (@RequestBody QuartzConfigDTO configDTO) { quartzJobService.updateJob(configDTO.getJobName(), configDTO.getGroupName(), configDTO.getCronExpression(), configDTO.getParam()); return HttpStatus.OK; } @RequestMapping("/deleteJob") public Object deleteJob (@RequestBody QuartzConfigDTO configDTO) { quartzJobService.deleteJob(configDTO.getJobName(), configDTO.getGroupName()); return HttpStatus.OK; } @RequestMapping("/startAllJobs") public Object startAllJobs () { quartzJobService.startAllJobs(); return HttpStatus.OK; } @RequestMapping("/pauseAllJobs") public Object pauseAllJobs () { quartzJobService.pauseAllJobs(); return HttpStatus.OK; } @RequestMapping("/resumeAllJobs") public Object resumeAllJobs () { quartzJobService.resumeAllJobs(); return HttpStatus.OK; } @RequestMapping("/shutdownAllJobs") public Object shutdownAllJobs () { quartzJobService.shutdownAllJobs(); return HttpStatus.OK; } }
测试
可以通过Postman通过接口动态对定时任务进行管理
添加定时任务
通过PostMan添加任务
添加完成后,可以在控制台看到任务正在执行
ElasticJob
ElasticJob 诞生于 2015 年,当时业界虽然有 QuartZ 等出类拔萃的定时任务框架,但缺乏分布式方面的探索
分布式调度云平台产品的缺失,使得 ElasticJob 从出现便备受关注,它有效的弥补了作业在分布式领域的短板,并且提供了一站式的自动化运维管控端。
quartz的不足 我们上面详细的介绍了 Quartz 的架构原理以及应用实践,虽然 Quartz 也可以通过集群方式来保证服务高可用,但是它也有弊端,那就是服务节点数量的增加,并不能提升任务的执行效率,即不能实现水平扩展!
之所以产生这样的结果,是因为 Quartz 在分布式集群环境下是通过数据库锁方式来实现有且只有一个有效的服务节点来运行服务,从而保证服务在集群环境下定时任务不会被重复调用!
如果需要运行的定时任务很少的话,使用 Quartz 不会有太大的问题,但是如果 现在有这么一个需求,例如一个考勤系统,每天9点需要对昨天的用户打卡进行统计,计算是否正常出勤,涉及到各种审批,请假等等,如果一个公司有几十万员工,如果在一个实例上面运行,可能需要连续跑上好几天才能完成任务。
类似这样场景还有很多很多,很显然 Quartz 很难满足我们这种大批量、任务执行周期长的任务调度!
ElasticJob 在技术选型时,选择站在了巨人的肩膀上而不是重复制造轮子的理念,将定时任务事实标准的 QuartZ 与 分布式协调的利器 ZooKeeper 完美结合,快速而稳定的搭建了全新概念的分布式调度框架。
基本介绍
Elastic-Job提供了一种轻量级,无中心化解决方案。
没有统一的调度中心,集群的每个节点都是对等的, 节点之间通过注册中心进行分布式协调。E-Job 存在主节点的概念,但是主节点没有调度 的功能,而是用于处理一些集中式任务,如分片,清理运行时信息等。
Elastic-Job 最开始只有一个 elastic-job-core 的项目,在 2.X 版本以后主要分为 Elastic-Job-Lite 和 Elastic-Job-Cloud 两个子项目
Elastic-Job-Lite 定位为轻量级无中心化解决方案,使用 jar 包的形式提供分布式任务的协调服务。而 Elastic-Job-Cloud 使用 Mesos + Docker 的解决方案,额外提供资源治理、应用分发以及进程隔离等服务,跟 Lite 的区别只是部署方式不同,他们使用相同的 API,只要开发一次
比较项
Elastic-Job-Lite
Elastic-Job-Cloud
无中心化
是
否
资源分配
不支持
支持
作业模式
常驻
常驻 + 瞬时
部署依赖
ZooKeeper
ZooKeeper + Mesos
功能列表
分布式调度协调
弹性扩容缩容
失效转移
错过执行作业重触发
作业分片一致性,保证同一分片在分布式环境中仅一个执行实例
自诊断并修复分布式不稳定造成的问题
支持并行调度
支持作业生命周期操作
丰富的作业类型
Spring整合以及命名空间提供
运维平台
整体架构图
App 应用程序,内部包含任务执行业务逻辑和Elastic-Job-Lite组件 ,其中执行任务需要实现ElasticJob接口完成,与Elastic-Job-Lite组件的集成,并进行任务的相关配置。应用程序可启动多个实例,也就出现了多个任务执行实例
Elastic-Job-Lite Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务,此组件负责任务的调度,并产生日志及任务调度记录。
无中心化,是指没有调度中心这一概念,每个运行在集群中的作业服务器都是对等的,各个作业节点是自治的、平等的、节点之间通过注册中心进行分布式协调
Registry 以Zookeeper作为Elastic-Job的注册中心组件,存储了执行任务的相关信息,同时Elastic-Job利用该 组件进行执行任务实例的选举
Console Elastic-Job提供了运维平台,它通过读取Zookeeper数据展现任务执行状态,或更新Zookeeper数据修 改全局配置,通过Elastic-Job-Lite组件产生的数据来查看任务执行历史记录。
elastic-job、zk 和 quartz 关系如下
安装部署
参考文档
弹性调度
弹性调度是 ElasticJob 最重要的功能,能够让任务通过分片进行水平扩展的任务处理。
任务分片
任务的分片执行是指一个批量任务如果由一台服务执行速度会比较慢,那么对任务进行分片交给多台服务器进行执行,这样执行的效率就会得到提高。
ElasticJob 中任务分片项的概念,使得任务可以在分布式的环境下运行,每台任务服务器只运行分配给该服务器的分片,随着服务器的增加或宕机,ElasticJob 会近乎实时的感知服务器数量的变更,从而重新为分布式的任务服务器分配更加合理的任务分片项,使得任务可以随着资源的增加而提升效率。
任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。
举例说明,如果作业分为 4 片,用两台服务器执行,则每个服务器分到 2 片,分别负责作业的 50% 的负载,如下图所示。
个性化分片参数
ElasticJob 可以设置分片项和自定义分片参数,个性化参数可以和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码
例如按照地区进行统计数据,北京=1,上海=2,广州=3,如果仅仅按照1、2、3进行分片,对开发者来说很不友好,需要了解具体数字所代表的含义,而使用个性化参数可以让代码的可读性更高,如果使用以下配置
1 shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
那么在代码中可以更清晰的理解具体分片键的含义,或者使用枚举类型来让业务代码的可读性更高
分片策略
框架默认提供了三种分片策略,所有的分片策略都是接口JobShardingStrategy
的实现
AverageAllocationJobShardingStrategy 策略说明
基于平均分配算法的分片策略,也是默认的分片策略,如果分片不能整除,则不能整除的多余分片将依次追加到序号小的服务器
如果有3
台服务器,分成9
片,则每台服务器分到的分片是:1=[0,1,2], 2=[3,4,5], 3=[6,7,8]
如果有3
台服务器,分成8
片,则每台服务器分到的分片是:1=[0,1,6], 2=[2,3,7], 3=[4,5]
如果有3
台服务器,分成10
片,则每台服务器分到的分片是:1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8]
OdevitySortByNameJobShardingStrategy 策略说明
根据作业名的哈希值奇偶数决定IP
升降序算法的分片策略,用于不同的作业平均分配负载至不同的服务器。
作业名的哈希值为奇数则IP
升序。
作业名的哈希值为偶数则IP
降序。
特点
AverageAllocationJobShardingStrategy
的缺点是,一旦分片数小于作业服务器数,作业将永远分配至IP
地址靠前的服务器,导致IP
地址靠后的服务器空闲。而OdevitySortByNameJobShardingStrategy
则可以根据作业名称重新分配服务器负载。
如果有3
台服务器,分成2
片,作业名称的哈希值为奇数,则每台服务器分到的分片是:1=[0], 2=[1], 3=[]
如果有3
台服务器,分成2
片,作业名称的哈希值为偶数,则每台服务器分到的分片是:3=[0], 2=[1], 1=[]
RotateServerByNameJobShardingStrategy 策略说明
根据作业名的哈希值对服务器列表进行轮转的分片策略
动态调度
ElasticJob 可以根据节点的数量动态进行任务的分派,可以提高业务的执行效率以及提高吞吐量
当新增加作业服务器时,ElasticJob 会通过注册中心的临时节点的变化感知到新服务器的存在,并在下次任务调度的时候重新分片,新的服务器会承载一部分作业分片,如下图所示。
如果将分片项设置为大于服务器的数量,最好是大于服务器倍数的数量,作业将会合理的利用分布式资源,动态的分配分片项。
例如三台服务器,分成 10 片,则分片项分配结果为服务器 A = 0,1,2;服务器 B = 3,4,5;服务器 C = 6,7,8,9
如果服务器 C 崩溃,则分片项分配结果为服务器 A = 0,1,2,3,4; 服务器 B = 5,6,7,8,9。 在不丢失分片项的情况下,最大限度的利用现有资源提高吞吐量。
高可用
当定时任务服务器宕机时,注册中心同样会通过临时节点感知,并将在下次运行时将分片转移至仍存活的服务器,以达到作业高可用的效果
本次由于服务器宕机而未执行完的作业,则可以通过失效转移的方式继续执行。如下图所示。
作用类型
elastic-job提供了三种类型的作业类型,分别是Simple
,Dataflow
,ScriptJob
Simple SimpleJob需要实现SimpleJob接口,意为简单实现,未经过任何封装,与quartz原生接口相似,比如示例代码中所使用的job
Dataflow
Dataflow类型用于处理数据流,需实现DataflowJob接口,适用于不间歇的数据处理。
该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据
注意事项
可通过DataflowJobConfiguration配置是否流式处理。
流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; 非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。
如果采用流式作业处理方式,建议processData处理数据后更新其状态,避免fetchData再次抓取到,从而使得作业永不停止。
代码案例
这里模拟定时处理订单状态,拉取订单然后进行处理订单。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public class MyDataFlowJob implements DataflowJob <Order> { Logger log = LoggerFactory.getLogger(MyDataFlowJob.class); private static List<Order> orders = new ArrayList <>(); { for (int i = 0 ; i < 100 ; i++) { Order order = new Order (); order.setOrderId(i); order.setStatus(0 ); orders.add(order); } } @Override public List<Order> fetchData (ShardingContext shardingContext) { List<Order> orderList = orders.stream().filter(o -> o.getStatus() == 0 ) .filter(o -> o.getOrderId() % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem()) .collect(Collectors.toList()); List<Order> subList = null ; if (orderList != null && orderList.size() > 0 ) { int endIndex = 10 ; if (orderList.size() < 10 ) { endIndex = orderList.size() - 1 ; } subList = orderList.subList(0 , endIndex); } try { TimeUnit.SECONDS.sleep(3 ); } catch (InterruptedException e) { e.printStackTrace(); } log.info("分片项:{},我抓取的数据:{}" , shardingContext.getShardingItem(), subList); return subList; } @Override public void processData (ShardingContext shardingContext, List<Order> list) { list.forEach(o -> o.setStatus(1 )); try { TimeUnit.SECONDS.sleep(5 ); } catch (InterruptedException e) { e.printStackTrace(); } log.info("分片项:{},处理中....." , shardingContext.getShardingItem()); } }
事件跟踪
在elastic-job中,有一块很重要的功能,与作业的执行密切相关但是又不能影响作业的执行,这就是事件跟踪,定时任务执行一个事件,需要记录定时任务执行时间,状态等等信息,这些都可以通过事件跟踪来完成
控制台
Elastic-job控制台能够对elasticjob的作业做运维工作,比如暂停定时任务,修改定时任务执行时间,分片策略等等
设计理念
elastic-job控制台和Elastic Job并无直接关系,是通过读取Elastic Job的注册中心数据展现作业状态,或更新注册中心数据修改全局配置。
控制台只能控制作业本身是否运行,但不能控制作业进程的启停,因为控制台和作业本身服务器是完全分布式的,控制台并不能控制作业服务器。
支持功能
查看作业以及服务器状态
快捷的修改以及删除作业设置
启用和禁用作业
跨注册中心查看作业
查看作业运行轨迹和运行状态
安装部署 下载ElasticJob-UI
访问https://shardingsphere.apache.org/elasticjob/current/cn/downloads/
地址,找到ElasticJob-UI 的tar包下载即可,注意下载ElasticJob-Lite-UI
的tar包
启动服务
解压后,找到bin
目录下的启动类,启动即可
访问测试
ElasticJob-UI 默认启动端口是8088,可以在application.properties
配置文件进行修改,启动后出现如下访问界面,用户名密码都是root
登录后就可以进入主界面
配置注册中心
ElasticJob-UI因为设计的时候就是和Elastic Job
服务相分离的,通过zk
来查看和控制作业的状态,所以使用elastic-job
第一步就是配置注册中心,如下图添加注册中心
配置项说明
注册中心名称:可以随意命名,注册中心的名称
注册中心地址:也就是zookeeper地址,我们的地址时localhost:2181
命名空间地址:这个一般是我们在elastic-job
配置的命名空间的地址我们配置的是elasticjob-lite-springboot
登录凭证:zk的登录凭证,默认没有配置可以忽略
作业操作
可以在作业操作栏目对作业进行操作,比如暂停以及触发任务,和修改任务的执行计划等等
xxl-job
XXL-JOB是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展,其中“XXL”是主要作者,大众点评许雪里 名字的缩写
简单来说,就是xxl-job调度web工程部署一个节点.然后executor可以有多个工程,分别部署在不同的节点,用来找到对应节点上的shell脚本。达到分布式的定时调度。
和ElasticJob的区别 相同点
E-Job和X-job都有普遍的用户基础和完整的技术文档,都能满足定时任务的基本功能需求
不同点
X-Job 侧重的业务实现的简单和管理的方便,学习成本简单,失败策略和路由策略丰富,推荐使用在“用户基数相对少,服务器数量在一定范围内”的情景下使用
E-Job 关注的是数据,增长了弹性扩容和数据分片的思路,以便于更大限度的利用分布式服务器的资源,可是学习成本相对高些,推荐在“数据量庞大,且部署服务器数量较多”时使用算法
X-Job采用了中心化思想的架构,而E-Job采用了无中心的架构
功能特性 简单灵活
提供Web页面对任务进行管理,管理系统支持用户管理、权限控制;
支持容器部署;
支持通过通用HTTP提供跨平台任务调度;
丰富的任务管理功能
支持页面对任务CRUD操作;
支持在页面编写脚本任务、命令行任务、Java代码任务并执行;
支持任务级联编排,父任务执行结束后触发子任务执行;
支持设置任务优先级;
支持设置指定任务执行节点路由策略,包括轮询、随机、广播、故障转移、忙碌转移等;
支持Cron方式、任务依赖、调度中心API接口方式触发任务执行
高性能
调度中心基于线程池多线程触发调度任务,快任务、慢任务基于线程池隔离调度,提供系统性能和稳定性;
任务调度流程全异步化设计实现,如异步调度、异步运行、异步回调等,有效对密集调度进行流量削峰;
高可用
任务调度中心、任务执行节点均 集群部署,支持动态扩展、故障转移
支持任务配置路由故障转移策略,执行器节点不可用是自动转移到其他节点执行
支持任务超时控制、失败重试配置
支持任务处理阻塞策略:调度当任务执行节点忙碌时来不及执行任务的处理策略,包括:串行、抛弃、覆盖策略
易于监控运维
支持设置任务失败邮件告警,预留接口支持短信、钉钉告警;
支持实时查看任务执行运行数据统计图表、任务进度监控数据、任务完整执行日志;
整体架构设计
将调度行为抽象形成“调度中心”公共平台,而平台自身并不承担业务逻辑,“调度中心 ”负责发起调度请求;
将任务抽象成分散的JobHandler,交由“执行器”统一管理,“执行器 ”负责接收调度请求并执行对应的JobHandler中业务逻辑,因此,“调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性;
调度模块(调度中心)
负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码
调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块; 支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover
功能
任务管理:对调度的任务进行触发时间等配置
日志管理:查看调度的日志情况
执行器管理:管理接入的业务模块
其它,比如用户权限配置和运行统计报表等功能
执行模块(执行器)
负责接收调度请求并执行任务逻辑。
任务模块专注于任务的执行等操作,开发和维护更加简单和高效; 接收“调度中心”的执行请求、终止请求和日志请求等
工作原理
任务执行器根据配置的调度中心的地址,自动注册到调度中心
达到任务触发条件,调度中心下发任务
执行器基于线程池执行任务,并把执行结果放入内存队列中、把执行日志写入日志文件中
执行器的回调线程消费内存队列中的执行结果,主动上报给调度中心
当用户在调度中心查看任务日志,调度中心请求任务执行器,任务执行器读取任务日志文件并返回日志详情
部署调度中心 初始化数据库
https://gitee.com/xuxueli0323/xxl-job
地址下载项目源码并解压, SQL脚本位置在
1 /xxl-job/doc/db/tables_xxl_job.sql
创建用户并授权 1 2 3 mysql> source / opt/ tables_xxl_job.sql; mysql> CREATE USER 'xxl' @'%' IDENTIFIED BY 'xxl' ; mysql> GRANT ALL ON xxl_job.* TO 'xxl' @'%' IDENTIFIED BY 'xxl' WITH GRANT OPTION;
源码结构
解压源码,按照maven格式将源码导入IDE, 使用maven进行编译即可
1 2 3 4 5 xxl-job-admin:调度中心 xxl-job-core:公共依赖 xxl-job-executor-samples:执行器Sample示例(选择合适的版本执行器,可直接使用,也可以参考其并将现有项目改造成执行器) :xxl-job-executor-sample-springboot:Springboot版本,通过Springboot管理执行器,推荐这种方式; :xxl-job-executor-sample-frameless:无框架版本;
配置调度中心
调度中心项目是xxl-job-admin
,作用是统计管理调度平台上面的任务,负责触发以及执行调度任务,并且提供任务管理
修改application.properties
调度中心配置内容说明,可以按需进行修改
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 server.port =8080 server.servlet.context-path =/xxl-job-admin spring.datasource.url =jdbc:mysql://192.168.10.30:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai spring.datasource.username =xxl spring.datasource.password =xxl spring.datasource.driver-class-name =com.mysql.jdbc.Driver spring.mail.host =smtp.qq.com spring.mail.port =25 spring.mail.username =xxx@qq.com spring.mail.password =xxx spring.mail.properties.mail.smtp.auth =true spring.mail.properties.mail.smtp.starttls.enable =true spring.mail.properties.mail.smtp.starttls.required =true spring.mail.properties.mail.smtp.socketFactory.class =javax.net.ssl.SSLSocketFactory xxl.job.accessToken =xxl.job.i18n =zh_CN xxl.job.triggerpool.fast.max =200 xxl.job.triggerpool.slow.max =100 xxl.job.logretentiondays =30
部署项目
可以在本地直接运行xxl-job
服务,或者打包后在服务器上面运行
启动服务
运行项目后访问 http://192.168.10.30:8080/xxl-job-admin ,该地址执行器将会使用到,作为回调地,默认登录账号 admin/123456
, 登录后运行界面如下图所示
Docker部署 创建docker-comopose.yml 1 2 3 4 5 6 7 8 9 10 11 12 version: '3' services: xxl-job-admin: image: xuxueli/xxl-job-admin:2.3.0 restart: always container_name: xxl-job-admin environment: PARAMS: '--spring.datasource.url=jdbc:mysql://192.168.10.30:3306/xxl_job?Unicode=true&characterEncoding=UTF-8 --spring.datasource.username=root --spring.datasource.password=root' ports: - 8080 :8080 volumes: - ./data/applogs:/data/applogs
启动服务
部署执行器 引入依赖
在项目pom文件中引入了 “xxl-job-core” 的maven依赖;
1 2 3 4 5 <dependency > <groupId > com.xuxueli</groupId > <artifactId > xxl-job-core</artifactId > <version > 2.3.0</version > </dependency >
执行器配置文件
如调度中心集群部署存在多个地址则用逗号分隔
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 xxl.job.admin.addresses =http://192.168.10.30:8080/xxl-job-admin xxl.job.accessToken =xxl.job.executor.appname =xxl-job-executor-task xxl.job.executor.address =xxl.job.executor.ip =xxl.job.executor.port =9999 xxl.job.executor.logpath =/data/applogs/xxl-job/jobhandler xxl.job.executor.logretentiondays =30
配置执行器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 @Configuration public class XxlJobConfig { private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class); @Value("${xxl.job.admin.addresses}") private String adminAddresses; @Value("${xxl.job.accessToken}") private String accessToken; @Value("${xxl.job.executor.appname}") private String appname; @Value("${xxl.job.executor.address}") private String address; @Value("${xxl.job.executor.ip}") private String ip; @Value("${xxl.job.executor.port}") private int port; @Value("${xxl.job.executor.logpath}") private String logPath; @Value("${xxl.job.executor.logretentiondays}") private int logRetentionDays; @Bean public XxlJobSpringExecutor xxlJobExecutor () { logger.info(">>>>>>>>>>> xxl-job config init." ); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor (); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setAddress(address); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); xxlJobSpringExecutor.setAccessToken(accessToken); xxlJobSpringExecutor.setLogPath(logPath); xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); return xxlJobSpringExecutor; } }
启动执行器
本地执行器服务启动后,需要在调度中心配置执行器
注意AppName就是执行器项目配置的配置项,名称可以随意,机器地址采用自动注册就可以
作业详情
xxl-job支持很多种任务模式,下面我们挑几个常用的介绍下
BEAN模式
Bean模式任务,支持基于方法的开发方式,每个任务对应一个方法。
原理
每个Bean模式任务都是一个Spring的Bean类实例,它被维护在“执行器”项目的Spring容器中。
任务类需要加“@JobHandler (value=”名称”)”注解,因为“执行器”会根据该注解识别Spring容器中的任务。任务类需要继承统一接口“IJobHandler”,任务逻辑在execute方法中开发,因为“执行器”在接收到调度中心的调度请求时,将会调用“IJobHandler”的execute方法,执行任务逻辑。
编写执行器
为Job方法添加注解 “@XxlJob(value=”自定义jobhandler名称”, init = “JobHandler初始化方法”, destroy = “JobHandler销毁方法”)”,注解value值对应的是调度中心新建任务的JobHandler属性的值。
1 2 3 4 5 6 7 8 9 10 11 12 13 @Component public class XxlJobBeanMethodTask { private static final Logger logger = LoggerFactory.getLogger(XxlJobBeanMethodTask.class); @XxlJob("methodTask") public void methodTask () throws Exception { logger.info("methodTask定时任务启动,总分片:{},当前分片:{},参数:{}" ,XxlJobHelper.getShardIndex(),XxlJobHelper.getShardIndex(),XxlJobHelper.getJobParam()); XxlJobHelper.log("XXL-METHODTASK, methodTask定时任务启动" ); XxlJobHelper.handleSuccess(); } }
新建调度任务
在任务管理,选择对应的执行器新建任务
新增完成后,可以在列表点击执行一次或者点击启动启动定时任务
GLUE模式(Java)
任务以源代码方式维护在调度中心,支持通过Web IDE在线更新,实时编译和生效,因此不需要指定JobHandler
原理 每个 “GLUE模式(Java)” 任务的代码,实际上是“一个继承自“IJobHandler”的实现类的类代码”,“执行器”接收到“调度中心”的调度请求时,会通过Groovy类加载器加载此代码,实例化成Java对象,同时注入此代码中声明的Spring服务(请确保Glue代码中的服务和类引用在“执行器”项目中存在),然后调用该对象的execute方法,执行任务逻辑。
新建调度任务
新建一个任务,注意使用使用GLUE(JAVA)模式
在线编辑代码
添加任务后,选择刚刚添加的GLUE(JAVA)任务,选中指定任务后,点击该任务右侧“GLUE IDE”按钮,将会前往GLUE任务的Web IDE界面
在该界面支持对任务代码进行开发(也可以在IDE中开发完成后,复制粘贴到编辑中)
路由策略
路由策略属于XXLJob的高级功能,可以控制执行器执行的策略
准备工作
启动多台XXL执行器的服务,我们启动三台服务,并查看控制台的注册情况
第一个
当选择该策略时,会选择执行器注册地址的第一台机器执行,如果第一台机器出现故障,则调度任务失败
查看客户端日志
我们查看客户端日志,发现服务器已经调度了第一个节点
查看执行器日志
关闭客户端
我们通过执行器的注册节点发现,第一台服务器的执行器是8089
端口的服务器,我们将8089
服务器给关闭,然后查看执行器日志,我们发现服务执行失败
容错策略
但是并不会一直报错,等一段时间控制台会将已经挂掉的节点剔除出去,第一个节点已经变成了8189
就可以继续执行了
最后一个
当选择该策略时,会选择执行器注册地址的最后一台机器执行,如果最后一台机器出现故障,则调度任务失败,测试方式如上
轮询
当选择该策略时,会按照执行器注册地址轮询分配任务,如果其中一台机器出现故障,调度任务失败,任务不会转移
随机
当选择该策略时,会按照执行器注册地址随机分配任务,如果其中一台机器出现故障,调度任务失败,任务不会转移
一致性HASH
当选择该策略时,每个任务按照Hash算法固定选择某一台机器,如果那台机器出现故障,调度任务失败,任务不会转移。
最不经常使用
当选择该策略时,会优先选择使用频率最低的那台机器,如果其中一台机器出现故障,调度任务失败,任务不会转移(实践表明效果和轮询策略一致)
最近最久未使用
当选择该策略时,会优先选择最久未使用的机器,如果其中一台机器出现故障,调度任务失败,任务不会转移(实践表明效果和轮询策略一致)
故障转移
当选择该策略时,按照顺序依次进行心跳检测,如果其中一台机器出现故障,则会转移到下一个执行器,若心跳检测成功,会选定为目标执行器并发起调度
忙碌转移
当选择该策略时,按照顺序依次进行空闲检测,如果其中一台机器出现故障,则会转移到下一个执行器,若空闲检测成功,会选定为目标执行器并发起调度
分片广播
当选择该策略时,广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数,可根据分片参数开发分片任务,如果其中一台机器出现故障,则该执行器执行失败,不会影响其他执行器
XXL的分片是根据启动的客户端进行分片,
分片参数是调度中心自动传递的,不用我们手动传递,且集群中的每个index序号是固定的,即使集群中有项目宕机,也不影响其他项目的index序号,当重启宕机项目时,它的序号还是原先的
服务启动后会自动的进行服务分片,分片会根据不同的节点进行调度
节点1
节点2
节点3
阻塞策略
调度过于密集执行器来不及处理时的处理策略
准备工作
修改代码加入延时操作模拟定时任务执行超时
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Component public class XxlJobBeanMethodTask { private static final Logger logger = LoggerFactory.getLogger(XxlJobBeanMethodTask.class); @XxlJob("methodTask") public void methodTask () throws Exception { logger.info("methodTask定时任务启动,总分片:{},当前分片:{},参数:{}" ,XxlJobHelper.getShardTotal(),XxlJobHelper.getShardIndex(),XxlJobHelper.getJobParam()); XxlJobHelper.log("XXL-METHODTASK, methodTask定时任务启动" ); Thread.sleep(10000 ); XxlJobHelper.handleSuccess(); } }
单机串行
调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行
测试
将阻塞策略设置为单机串行
,当执行的任务耗时过长,同一个任务将进入执行队列等待执行,这个时候将任务停止掉,执行器还会将队列中的任务执行完成
但是执行器还在继续执行队列中的任务,直到所有任务执行完成
等一段时间后,我们发现定时任务状态都已经正常了
丢弃后续调度
调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败
测试
将阻塞策略设置为丢弃后续调度
,执行器发现调度未完成则直接设置为失败
覆盖之前调度
调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务
测试
将阻塞策略设置为覆盖之前调度
,执行器发现有任务正在调度直接将该任务设置为失败,任何进行后续调度任务
xxl-job常用cron表达式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 */5 * * * * ? 每隔5秒执行一次 0 */1 * * * ? 每隔1分钟执行一次 0 0 5-15 * * ? 每天5-15点整点触发 0 0/3 * * * ? 每三分钟触发一次 0 0-5 14 * * ? 在每天下午2点到下午2:05期间的每1分钟触发 0 0/5 14 * * ? 在每天下午2点到下午2:55期间的每5分钟触发 0 0/5 14,18 * * ? 在每天下午2点到2:55期间和下午6点到6:55期间的每5分钟触发 0 0/30 9-17 * * ? 朝九晚五工作时间内每半小时 0 0 10,14,16 * * ? 每天上午10点,下午2点,4点 0 0 12 ? * WED 表示每个星期三中午12点 0 0 17 ? * TUES,THUR,SAT 每周二、四、六下午五点 0 10,44 14 ? 3 WED 每年三月的星期三的下午2:10和2:44触发 0 15 10 ? * MON-FRI 周一至周五的上午10:15触发 0 0 23 L * ? 每月最后一天23点执行一次 0 15 10 L * ? 每月最后一日的上午10:15触发 0 15 10 ? * 6L 每月的最后一个星期五上午10:15触发 0 15 10 * * ? 2005 2005年的每天上午10:15触发 0 15 10 ? * 6L 2002-2005 2002年至2005年的每月的最后一个星期五上午10:15触发 0 15 10 ? * 6#3 每月的第三个星期五上午10:15触发 "30 * * * * ?" 每半分钟触发任务 "30 10 * * * ?" 每小时的10分30秒触发任务 "30 10 1 * * ?" 每天1点10分30秒触发任务 "30 10 1 20 * ?" 每月20号1点10分30秒触发任务 "30 10 1 20 10 ? *" 每年10月20号1点10分30秒触发任务 "30 10 1 20 10 ? 2011" 2011年10月20号1点10分30秒触发任务 "30 10 1 ? 10 * 2011" 2011年10月每天1点10分30秒触发任务 "30 10 1 ? 10 SUN 2011" 2011年10月每周日1点10分30秒触发任务 "15,30,45 * * * * ?" 每15秒,30秒,45秒时触发任务 "15-45 * * * * ?" 15到45秒内,每秒都触发任务 "15/5 * * * * ?" 每分钟的每15秒开始触发,每隔5秒触发一次 "15-30/5 * * * * ?" 每分钟的15秒到30秒之间开始触发,每隔5秒触发一次 "0 0/3 * * * ?" 每小时的第0分0秒开始,每三分钟触发一次 "0 15 10 ? * MON-FRI" 星期一到星期五的10点15分0秒触发任务 "0 15 10 L * ?" 每个月最后一天的10点15分0秒触发任务 "0 15 10 LW * ?" 每个月最后一个工作日的10点15分0秒触发任务 "0 15 10 ? * 5L" 每个月最后一个星期四的10点15分0秒触发任务 "0 15 10 ? * 5#3" 每个月第三周的星期四的10点15分0秒触发任务
xxl-job数据库更改为Oracle 前言 最近做的银行微服务项目引入了调度中心的概念,于是乎就开始琢磨了,经网上搜索很朋友推荐,就开始关注到XxlJob了,一款不错的分布式任务调度平台,还是开源的! 从官网下载、部署、运行后发现目前版本只支持Mysql数据库,而我们应用使用的Oracle数据库,没有mysql环境,只能适应性改造。 网上搜了些资料,有一些思路但不够全面,经过努力踩坑,总结了一个完整的改造思路,分享给大家! XxlJob的基础部署不赘述了,直接上干货。
资料 官方文档
后台登录用户:admin/123456
admin是负责调度的,excutor是具体的任务的工程,执行器。还可以通过admin后台写shell配置脚本调度
步骤1:建库脚本转Oracle 网上有MySQL转Oracle脚本的方式,比较麻烦,我转了一份,大家直接用即可。
tables_xxl_job_oracle.sql
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 drop table XXL_JOB_INFO;drop table XXL_JOB_LOG;drop table XXL_JOB_LOG_REPORT;drop table XXL_JOB_LOGGLUE;drop table XXL_JOB_REGISTRY;drop table XXL_JOB_GROUP;drop table XXL_JOB_USER;drop table XXL_JOB_LOCK;CREATE TABLE "XXL_JOB_INFO" ("ID" NUMBER(20 ,0 ) NOT NULL , "JOB_GROUP" NUMBER(20 ,0 ) NOT NULL , "JOB_CRON" VARCHAR2(128 ) NOT NULL , "JOB_DESC" VARCHAR2(510 ) NOT NULL , "ADD_TIME" DATE DEFAULT NULL NULL , "UPDATE_TIME" DATE DEFAULT NULL NULL , "AUTHOR" VARCHAR2(64 ) DEFAULT NULL NULL , "ALARM_EMAIL" VARCHAR2(255 ) DEFAULT NULL NULL , "EXECUTOR_ROUTE_STRATEGY" VARCHAR2(50 ) DEFAULT NULL NULL , "EXECUTOR_HANDLER" VARCHAR2(255 ) DEFAULT NULL NULL , "EXECUTOR_PARAM" VARCHAR2(1024 ) DEFAULT NULL NULL , "EXECUTOR_BLOCK_STRATEGY" VARCHAR2(100 ) DEFAULT NULL NULL , "EXECUTOR_TIMEOUT" NUMBER(11 ,0 ) DEFAULT 0 NOT NULL , "EXECUTOR_FAIL_RETRY_COUNT" NUMBER(11 ,0 ) DEFAULT 0 NOT NULL , "GLUE_TYPE" VARCHAR2(100 ) NOT NULL , "GLUE_SOURCE" CLOB DEFAULT NULL NULL , "GLUE_REMARK" VARCHAR2(256 ) DEFAULT NULL NULL , "GLUE_UPDATETIME" DATE DEFAULT NULL NULL , "CHILD_JOBID" VARCHAR2(255 ) DEFAULT NULL NULL , "TRIGGER_STATUS" NUMBER(4 ,0 ) DEFAULT 0 NOT NULL , "TRIGGER_LAST_TIME" NUMBER(20 ,0 ) DEFAULT 0 NOT NULL , "TRIGGER_NEXT_TIME" NUMBER(20 ,0 ) DEFAULT 0 NOT NULL , PRIMARY KEY ("ID")) NOCOMPRESS NOPARALLEL ; COMMENT ON COLUMN "XXL_JOB_INFO"."JOB_GROUP" IS '执行器主键ID' ; COMMENT ON COLUMN "XXL_JOB_INFO"."JOB_CRON" IS '任务执行CRON' ; COMMENT ON COLUMN "XXL_JOB_INFO"."AUTHOR" IS '作者' ; COMMENT ON COLUMN "XXL_JOB_INFO"."ALARM_EMAIL" IS '报警邮件' ; COMMENT ON COLUMN "XXL_JOB_INFO"."EXECUTOR_ROUTE_STRATEGY" IS '执行器路由策略' ; COMMENT ON COLUMN "XXL_JOB_INFO"."EXECUTOR_HANDLER" IS '执行器任务handler' ; COMMENT ON COLUMN "XXL_JOB_INFO"."EXECUTOR_PARAM" IS '执行器任务参数' ; COMMENT ON COLUMN "XXL_JOB_INFO"."EXECUTOR_BLOCK_STRATEGY" IS '阻塞处理策略' ; COMMENT ON COLUMN "XXL_JOB_INFO"."EXECUTOR_TIMEOUT" IS '任务执行超时时间,单位秒' ; COMMENT ON COLUMN "XXL_JOB_INFO"."EXECUTOR_FAIL_RETRY_COUNT" IS '失败重试次数' ; COMMENT ON COLUMN "XXL_JOB_INFO"."GLUE_TYPE" IS 'GLUE类型' ; COMMENT ON COLUMN "XXL_JOB_INFO"."GLUE_SOURCE" IS 'GLUE源代码' ; COMMENT ON COLUMN "XXL_JOB_INFO"."GLUE_REMARK" IS 'GLUE备注' ; COMMENT ON COLUMN "XXL_JOB_INFO"."GLUE_UPDATETIME" IS 'GLUE更新时间' ; COMMENT ON COLUMN "XXL_JOB_INFO"."CHILD_JOBID" IS '子任务ID,多个逗号分隔' ; COMMENT ON COLUMN "XXL_JOB_INFO"."TRIGGER_STATUS" IS '调度状态:0-停止,1-运行' ; COMMENT ON COLUMN "XXL_JOB_INFO"."TRIGGER_LAST_TIME" IS '上次调度时间' ; COMMENT ON COLUMN "XXL_JOB_INFO"."TRIGGER_NEXT_TIME" IS '下次调度时间' ; CREATE TABLE "XXL_JOB_LOG" ("ID" NUMBER(20 ,0 ) NOT NULL , "JOB_GROUP" NUMBER(20 ,0 ) NOT NULL , "JOB_ID" NUMBER(20 ,0 ) NOT NULL , "EXECUTOR_ADDRESS" VARCHAR2(255 ) DEFAULT NULL NULL , "EXECUTOR_HANDLER" VARCHAR2(255 ) DEFAULT NULL NULL , "EXECUTOR_PARAM" VARCHAR2(1024 ) DEFAULT NULL NULL , "EXECUTOR_SHARDING_PARAM" VARCHAR2(40 ) DEFAULT NULL NULL , "EXECUTOR_FAIL_RETRY_COUNT" NUMBER(11 ,0 ) DEFAULT 0 NOT NULL , "TRIGGER_TIME" DATE DEFAULT NULL NULL , "TRIGGER_CODE" NUMBER(11 ,0 ) NOT NULL , "TRIGGER_MSG" CLOB DEFAULT NULL NULL , "HANDLE_TIME" DATE DEFAULT NULL NULL , "HANDLE_CODE" NUMBER(11 ,0 ) NOT NULL , "HANDLE_MSG" CLOB DEFAULT NULL NULL , "ALARM_STATUS" NUMBER(4 ,0 ) DEFAULT 0 NOT NULL , PRIMARY KEY ("ID")) NOCOMPRESS NOPARALLEL ; CREATE INDEX "I_TRIGGER_TIME" ON "XXL_JOB_LOG" ("TRIGGER_TIME");CREATE INDEX "I_HANDLE_CODE" ON "XXL_JOB_LOG" ("HANDLE_CODE");COMMENT ON COLUMN "XXL_JOB_LOG"."JOB_GROUP" IS '执行器主键ID' ; COMMENT ON COLUMN "XXL_JOB_LOG"."JOB_ID" IS '任务,主键ID' ; COMMENT ON COLUMN "XXL_JOB_LOG"."EXECUTOR_ADDRESS" IS '执行器地址,本次执行的地址' ; COMMENT ON COLUMN "XXL_JOB_LOG"."EXECUTOR_HANDLER" IS '执行器任务handler' ; COMMENT ON COLUMN "XXL_JOB_LOG"."EXECUTOR_PARAM" IS '执行器任务参数' ; COMMENT ON COLUMN "XXL_JOB_LOG"."EXECUTOR_SHARDING_PARAM" IS '执行器任务分片参数,格式如 1/2' ; COMMENT ON COLUMN "XXL_JOB_LOG"."EXECUTOR_FAIL_RETRY_COUNT" IS '失败重试次数' ; COMMENT ON COLUMN "XXL_JOB_LOG"."TRIGGER_TIME" IS '调度-时间' ; COMMENT ON COLUMN "XXL_JOB_LOG"."TRIGGER_CODE" IS '调度-结果' ; COMMENT ON COLUMN "XXL_JOB_LOG"."TRIGGER_MSG" IS '调度-日志' ; COMMENT ON COLUMN "XXL_JOB_LOG"."HANDLE_TIME" IS '执行-时间' ; COMMENT ON COLUMN "XXL_JOB_LOG"."HANDLE_CODE" IS '执行-状态' ; COMMENT ON COLUMN "XXL_JOB_LOG"."HANDLE_MSG" IS '执行-日志' ; COMMENT ON COLUMN "XXL_JOB_LOG"."ALARM_STATUS" IS '告警状态:0-默认、1-无需告警、2-告警成功、3-告警失败' ; CREATE TABLE "XXL_JOB_LOG_REPORT" ( "ID" NUMBER(20 ,0 ) NOT NULL , "TRIGGER_DAY" DATE DEFAULT NULL NULL , "RUNNING_COUNT" NUMBER(20 ,0 ) DEFAULT 0 NOT NULL , "SUC_COUNT" NUMBER(20 ,0 ) DEFAULT 0 NOT NULL , "FAIL_COUNT" NUMBER(20 ,0 ) DEFAULT 0 NOT NULL , PRIMARY KEY ("ID") ) NOCOMPRESS NOPARALLEL ; CREATE UNIQUE INDEX "I_TRIGGER_DAY" ON "XXL_JOB_LOG_REPORT" ("TRIGGER_DAY");COMMENT ON COLUMN "XXL_JOB_LOG_REPORT"."ID" IS 'report主键ID' ; COMMENT ON COLUMN "XXL_JOB_LOG_REPORT"."TRIGGER_DAY" IS '调度-时间' ; COMMENT ON COLUMN "XXL_JOB_LOG_REPORT"."RUNNING_COUNT" IS '运行中-日志数量' ; COMMENT ON COLUMN "XXL_JOB_LOG_REPORT"."SUC_COUNT" IS '执行成功-日志数量' ; COMMENT ON COLUMN "XXL_JOB_LOG_REPORT"."FAIL_COUNT" IS '执行失败-日志数量' ; CREATE TABLE "XXL_JOB_LOGGLUE" ("ID" NUMBER(20 ,0 ) NOT NULL , "JOB_ID" NUMBER(20 ,0 ) NOT NULL , "GLUE_TYPE" VARCHAR2(100 ) DEFAULT NULL NULL , "GLUE_SOURCE" CLOB DEFAULT NULL NULL , "GLUE_REMARK" VARCHAR2(256 ) NOT NULL , "ADD_TIME" DATE DEFAULT NULL NULL , "UPDATE_TIME" DATE DEFAULT NULL NULL , PRIMARY KEY ("ID")) NOCOMPRESS NOPARALLEL ; COMMENT ON COLUMN "XXL_JOB_LOGGLUE"."JOB_ID" IS '任务,主键ID' ; COMMENT ON COLUMN "XXL_JOB_LOGGLUE"."GLUE_TYPE" IS 'GLUE类型' ; COMMENT ON COLUMN "XXL_JOB_LOGGLUE"."GLUE_SOURCE" IS 'GLUE源代码' ; COMMENT ON COLUMN "XXL_JOB_LOGGLUE"."GLUE_REMARK" IS 'GLUE备注' ; CREATE TABLE "XXL_JOB_REGISTRY" ("ID" NUMBER(20 ,0 ) NOT NULL , "REGISTRY_GROUP" VARCHAR2(510 ) NOT NULL , "REGISTRY_KEY" VARCHAR2(510 ) NOT NULL , "REGISTRY_VALUE" VARCHAR2(510 ) NOT NULL , "UPDATE_TIME" DATE DEFAULT CURRENT_TIMESTAMP NOT NULL , PRIMARY KEY ("ID")) NOCOMPRESS NOPARALLEL ; CREATE INDEX "I_G_K_V" ON "XXL_JOB_REGISTRY" ("REGISTRY_GROUP", "REGISTRY_KEY", "REGISTRY_VALUE");CREATE TABLE "XXL_JOB_GROUP" ("ID" NUMBER(20 ,0 ) NOT NULL , "APP_NAME" VARCHAR2(128 ) NOT NULL , "TITLE" VARCHAR2(64 ) NOT NULL , "ORDER" NUMBER(11 ,0 ) DEFAULT 0 NOT NULL , "ADDRESS_TYPE" NUMBER(4 ,0 ) DEFAULT 0 NOT NULL , "ADDRESS_LIST" VARCHAR2(512 ) DEFAULT NULL NULL , PRIMARY KEY ("ID")) NOCOMPRESS NOPARALLEL ; COMMENT ON COLUMN "XXL_JOB_GROUP"."APP_NAME" IS '执行器AppName' ; COMMENT ON COLUMN "XXL_JOB_GROUP"."TITLE" IS '执行器名称' ; COMMENT ON COLUMN "XXL_JOB_GROUP"."ORDER" IS '排序' ; COMMENT ON COLUMN "XXL_JOB_GROUP"."ADDRESS_TYPE" IS '执行器地址类型:0=自动注册、1=手动录入' ; COMMENT ON COLUMN "XXL_JOB_GROUP"."ADDRESS_LIST" IS '执行器地址列表,多地址逗号分隔' ; CREATE TABLE "XXL_JOB_USER" ("ID" NUMBER(20 ,0 ) NOT NULL , "USERNAME" VARCHAR2(50 ) NOT NULL , "PASSWORD" VARCHAR2(50 ) NOT NULL , "ROLE" NUMBER(4 ,0 ) NOT NULL , "PERMISSION" VARCHAR2(510 ) DEFAULT NULL NULL , PRIMARY KEY ("ID")) NOCOMPRESS NOPARALLEL ; CREATE UNIQUE INDEX "I_USERNAME" ON "XXL_JOB_USER" ("USERNAME");COMMENT ON COLUMN "XXL_JOB_USER"."USERNAME" IS '账号' ; COMMENT ON COLUMN "XXL_JOB_USER"."PASSWORD" IS '密码' ; COMMENT ON COLUMN "XXL_JOB_USER"."ROLE" IS '角色:0-普通用户、1-管理员' ; COMMENT ON COLUMN "XXL_JOB_USER"."PERMISSION" IS '权限:执行器ID列表,多个逗号分割' ; CREATE TABLE "XXL_JOB_LOCK" ("LOCK_NAME" VARCHAR2(50 ) NOT NULL , PRIMARY KEY ("LOCK_NAME")) NOCOMPRESS NOPARALLEL ; COMMENT ON COLUMN "XXL_JOB_LOCK"."LOCK_NAME" IS '锁名称' ; CREATE SEQUENCE XXL_JOB_GROUP_SEQ MINVALUE 1 MAXVALUE 99999999999999999999 INCREMENT BY 1 START WITH 1000 NOCACHE;CREATE SEQUENCE XXL_JOB_INFO_SEQ MINVALUE 1 MAXVALUE 99999999999999999999 INCREMENT BY 1 START WITH 1000 NOCACHE;CREATE SEQUENCE XXL_JOB_LOG_SEQ MINVALUE 1 MAXVALUE 99999999999999999999 INCREMENT BY 1 START WITH 1000 NOCACHE;CREATE SEQUENCE XXL_JOB_REPORT_SEQ MINVALUE 1 MAXVALUE 99999999999999999999 INCREMENT BY 1 START WITH 1000 NOCACHE;CREATE SEQUENCE XXL_JOB_LOGGLUE_SEQ MINVALUE 1 MAXVALUE 99999999999999999999 INCREMENT BY 1 START WITH 1000 NOCACHE;CREATE SEQUENCE XXL_JOB_REGISTRY_SEQ MINVALUE 1 MAXVALUE 99999999999999999999 INCREMENT BY 1 START WITH 1000 NOCACHE;CREATE SEQUENCE XXL_JOB_USER_SEQ MINVALUE 1 MAXVALUE 99999999999999999999 INCREMENT BY 1 START WITH 1000 NOCACHE;CREATE SEQUENCE XXL_JOB_LOG_REPORT_SEQ MINVALUE 1 MAXVALUE 99999999999999999999 INCREMENT BY 1 START WITH 1000 NOCACHE;INSERT INTO xxl_job_group(id, app_name, title, "ORDER", address_type, address_list) VALUES (1 , 'xxl-job-executor-sample' , '示例执行器' , 1 , 0 , NULL );INSERT INTO xxl_job_info(id, job_group, job_cron, job_desc, add_time, update_time, author, alarm_email, executor_route_strategy, executor_handler, executor_param, executor_block_strategy, executor_timeout, executor_fail_retry_count, glue_type, glue_source, glue_remark, glue_updatetime, child_jobid) VALUES (1 , 1 , '0 0 0 * * ? *' , '测试任务1' , to_date('2018-11-03 22:21:31' ,'YYYY-MM-DD hh24:mi:ss' ), to_date('2018-11-03 22:21:31' ,'YYYY-MM-DD hh24:mi:ss' ), 'XXL' , '' , 'FIRST' , 'demoJobHandler' , '' , 'SERIAL_EXECUTION' , 0 , 0 , 'BEAN' , '' , 'GLUE代码初始化' , to_date('2018-11-03 22:21:31' ,'YYYY-MM-DD hh24:mi:ss' ), '' );INSERT INTO xxl_job_user(id, username, password, role, permission) VALUES (1 , 'admin' , 'e10adc3949ba59abbe56e057f20f883e' , 1 , NULL );INSERT INTO xxl_job_lock ( lock_name) VALUES ( 'schedule_lock' );commit ;
步骤2:配置Oracle数据源 在pom.xml中增加ojdbc6配置。
1 2 3 4 5 6 <dependency > <groupId > com.oracle</groupId > <artifactId > ojdbc6</artifactId > <version > 11.2.0.3</version > </dependency >
在application.properties中增加Oracle数据源配置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 spring.datasource.url =jdbc:oracle:thin:@//localhost:1521/orcl spring.datasource.username =impo spring.datasource.password =impo123 spring.datasource.driver-class-name =oracle.jdbc.driver.OracleDriver spring.datasource.test-while-idle =true spring.datasource.test-on-borrow =false spring.datasource.test-on-return =false spring.datasource.validation-query =SELECT 1 FROM DUAL spring.datasource.time-between-eviction-runs-millis =300000 spring.datasource.min-evictable-idle-time-millis =1800000 spring.datasource.initial-size =5 spring.datasource.max-active =50 spring.datasource.max-wait =60000 spring.datasource.min-idle =5
MySQL的验证语句是SELECT 1 而Oracle的是SELECT 1 FROM DUAL 这个要记得改。
1 spring.datasource.hikari.connection-test-query =SELECT 1 FROM DUAL
步骤3:改造Mybatis的Mapper(关键点) 1、将所有 As t 替换为 t。
2、将所有`号和;号替换为空。
3、设置自动增长列返回值,需指定返回列keyColumn。
1 <insert id ="save" parameterType ="com.xxl.job.admin.core.model.XxlJobGroup" useGeneratedKeys ="true" keyColumn ="ID" keyProperty ="id" >
4、显式设置自动增长列,Oracle使用的SEQUENCE是有差异的。
1 2 INSERT INTO xxl_job_group (id, app_name, title, address_type, address_list)values (XXL_JOB_GROUP_SEQ.NEXTVAL, #{appname}, #{title}, #{addressType}, #{addressList})
注意、这里用到了序列seq
5、update改造,使用更规范的写法排除空值赋值异常,加入jdbcType。
1 2 3 4 5 6 7 UPDATE xxl_job_groupset app_name = #{appname,jdbcType= VARCHAR }, title = #{title,jdbcType= VARCHAR }, address_type = #{addressType,jdbcType= VARCHAR }, address_list = #{addressList,jdbcType= VARCHAR } WHERE id = #{id}
部分可能为null的值需要加上jdbcType,具体为在oracle驱动中,如果发现属性值为null则无法判断具体的类型需要手动指定,报错信息 无效列类型111
Mybatis JdbcType与Oracle、MySql数据类型对应列表,请参考博文:http://blog.csdn.net/loongshawn/article/details/50496460
时间字段用jdbcType=DATE,字符串用jdbcType=VARCHAR,整型用jdbcType=INTEGER。
6、分页改造,Oracle使用rownum,Mysql使用limit。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <select id ="pageList" parameterType ="java.util.HashMap" resultMap ="XxlJobGroup" > SELECT * FROM ( SELECT TMP_PAGE.*, ROWNUM ROW_ID FROM ( SELECT <include refid ="Base_Column_List" /> FROM xxl_job_group t <trim prefix ="WHERE" prefixOverrides ="AND | OR" > <if test ="appname != null and appname != ''" > AND t.app_name like CONCAT(CONCAT('%', #{appname}), '%') </if > <if test ="title != null and title != ''" > AND t.title like CONCAT(CONCAT('%', #{title}), '%') </if > </trim > ORDER BY t.app_name, t.title, t.id ASC ) TMP_PAGE WHERE ROWNUM <![CDATA[ <= ]]> #{pagesize} ) WHERE ROW_ID <![CDATA[ > ]]> #{offset} </select >
这里应该是 #{pagesize} => #{pagesize}+#{offset}
7、Oracle模糊查询性能更优、更简洁的写法,使用INSTR
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <select id ="pageList" parameterType ="java.util.HashMap" resultMap ="XxlJobGroup" > SELECT * FROM ( SELECT TMP_PAGE.*, ROWNUM ROW_ID FROM ( SELECT <include refid ="Base_Column_List" /> FROM xxl_job_group t <trim prefix ="WHERE" prefixOverrides ="AND | OR" > <if test ="appname != null and appname != ''" > AND INSTR(t.app_name,#{appname}) <![CDATA[ > ]]> 0 </if > <if test ="title != null and title != ''" > AND INSTR(t.title,#{title}) <![CDATA[ > ]]> 0 </if > </trim > ORDER BY t.app_name, t.title, t.id ASC ) TMP_PAGE WHERE ROWNUM <![CDATA[ <= ]]> #{pagesize} ) WHERE ROW_ID <![CDATA[ > ]]> #{offset} </select >
8、查询写法,如果返回类型是Map时,非全大写的别名需增加""
号。
1 2 3 4 5 6 7 8 <select id ="findLogReport" resultType ="java.util.Map" > SELECT COUNT(handle_code) "triggerDayCount", SUM(CASE WHEN (trigger_code in (0, 200) and handle_code = 0) then 1 else 0 end) as "triggerDayCountRunning", SUM(CASE WHEN handle_code = 200 then 1 else 0 end) as "triggerDayCountSuc" FROM xxl_job_log WHERE trigger_time BETWEEN #{from} and #{to} </select >
9、Mysql的!
写法要用not
替换,以及limit
用rownum
替换写法。
1 2 3 4 5 6 7 8 9 10 11 12 <select id ="findFailJobLogIds" resultType ="java.lang.Long" > SELECT id FROM( SELECT id FROM xxl_job_log WHERE not ( (trigger_code in (0, 200) and handle_code = 0) OR (handle_code = 200) ) AND alarm_status = 0 ORDER BY id ASC ) t1 where rownum <![CDATA[ <= ]]> #{pagesize} </select >
10、oracle中没有DATE_ADD()函数,此处需要改为numtodsinterval函数
1 2 3 SELECT * FROM xxl_job_registry tWHERE t.update_time < ! [CDATA[ > ]]> sysdate+ numtodsinterval( - #{timeout}, 'second' )
前端图标不展示 1 2 3 4 5 Failed to decode downloaded font: http://localhost:8080/fonts/fontawesome-webfont.woff?v=4.4.0 OTS parsing error: incorrect file size in WOFF header Failed to decode downloaded font: http://localhost:8080/fonts/fontawesome-webfont.ttf?v=4.4.0 OTS parsing error: incorrect entrySelector for table directory
解决方式,pom文件中增加如下内容,不对字体文件进行过滤即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <build > <plugins > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-resources-plugin</artifactId > <configuration > <nonFilteredFileExtensions > <nonFilteredFileExtension > ttf</nonFilteredFileExtension > <nonFilteredFileExtension > woff</nonFilteredFileExtension > <nonFilteredFileExtension > woff2</nonFilteredFileExtension > </nonFilteredFileExtensions > </configuration > </plugin > </plugins > </build >
最后不要忘记重启项目!!清空浏览器缓存!!
新增执行器保存错误 值太大,实际21,最大18
数据库字段长度是18,使用的是UTF-8,3个字节,应该转换一下。18/2*3 = 27