Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
D
datax-cloud
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
黄营
datax-cloud
Commits
f1857ffe
Commit
f1857ffe
authored
May 15, 2020
by
yuwei
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
2.0.0项目初始化
parent
0c60e535
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
103 additions
and
22 deletions
+103
-22
datax-gateway-dev.yml
datax-config/src/main/resources/config/datax-gateway-dev.yml
+13
-0
datax-service-quartz-dev.yml
...ig/src/main/resources/config/datax-service-quartz-dev.yml
+7
-6
StartedUpRunner.java
.../java/cn/datax/service/quartz/config/StartedUpRunner.java
+12
-0
ScheduleUtil.java
...va/cn/datax/service/quartz/quartz/utils/ScheduleUtil.java
+71
-16
No files found.
datax-config/src/main/resources/config/datax-gateway-dev.yml
View file @
f1857ffe
...
@@ -107,4 +107,16 @@ spring:
...
@@ -107,4 +107,16 @@ spring:
-
name
:
Hystrix
-
name
:
Hystrix
args
:
args
:
name
:
dataSqlConsoleHystrix
name
:
dataSqlConsoleHystrix
fallbackUri
:
forward:/fallback
# 定时任务
-
id
:
datax-service-quartz
uri
:
lb://datax-service-quartz
predicates
:
-
Path=/quartz/**
filters
:
-
SwaggerHeaderFilter
-
StripPrefix=1
-
name
:
Hystrix
args
:
name
:
quartzHystrix
fallbackUri
:
forward:/fallback
fallbackUri
:
forward:/fallback
\ No newline at end of file
datax-config/src/main/resources/config/datax-service-quartz-dev.yml
View file @
f1857ffe
...
@@ -2,25 +2,25 @@
...
@@ -2,25 +2,25 @@
spring
:
spring
:
redis
:
redis
:
database
:
1
database
:
1
host
:
1
92.168.234.10
1
host
:
1
27.0.0.
1
port
:
6379
port
:
6379
password
:
1234@abcd
# 密码(默认为空)
password
:
# 密码(默认为空)
timeout
:
6000ms
# 连接超时时长(毫秒)
timeout
:
6000ms
# 连接超时时长(毫秒)
lettuce
:
lettuce
:
pool
:
pool
:
max-active
:
1000
# 连接池最大连接数(使用负值表示没有限制)
max-active
:
1000
# 连接池最大连接数(使用负值表示没有限制)
max-wait
:
-1ms
# 连接池最大阻塞等待时间(使用负值表示没有限制)
max-wait
:
-1ms
# 连接池最大阻塞等待时间(使用负值表示没有限制)
max-idle
:
10
# 连接池中的最大空闲连接
max-idle
:
10
# 连接池中的最大空闲连接
min-idle
:
5
# 连接池中的最小空闲连接
min-idle
:
5
# 连接池中的最小空闲连接
datasource
:
datasource
:
mysql
:
mysql
:
driver-class-name
:
com.p6spy.engine.spy.P6SpyDriver
driver-class-name
:
com.p6spy.engine.spy.P6SpyDriver
url
:
jdbc:p6spy:mysql://1
92.168.234.100
:3306/data_cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
url
:
jdbc:p6spy:mysql://1
27.0.0.1
:3306/data_cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
username
:
root
username
:
root
password
:
1234@abcd
password
:
1234@abcd
quartz
:
quartz
:
driver-class-name
:
com.mysql.cj.jdbc.Driver
driver-class-name
:
com.mysql.cj.jdbc.Driver
url
:
jdbc:mysql://1
92.168.234.100
:3306/data_cloud_quartz?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
url
:
jdbc:mysql://1
27.0.0.1
:3306/data_cloud_quartz?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
username
:
root
username
:
root
password
:
1234@abcd
password
:
1234@abcd
type
:
com.zaxxer.hikari.HikariDataSource
type
:
com.zaxxer.hikari.HikariDataSource
...
@@ -69,6 +69,7 @@ spring:
...
@@ -69,6 +69,7 @@ spring:
isClustered
:
true
isClustered
:
true
clusterCheckinInterval
:
1000
clusterCheckinInterval
:
1000
useProperties
:
false
useProperties
:
false
misfireThreshold
:
5000
# 设置临界时间为5秒
threadPool
:
threadPool
:
class
:
org.quartz.simpl.SimpleThreadPool
class
:
org.quartz.simpl.SimpleThreadPool
threadCount
:
20
threadCount
:
20
...
...
datax-modules/quartz-service-parent/quartz-service/src/main/java/cn/datax/service/quartz/config/StartedUpRunner.java
View file @
f1857ffe
package
cn
.
datax
.
service
.
quartz
.
config
;
package
cn
.
datax
.
service
.
quartz
.
config
;
import
cn.datax.service.quartz.api.entity.QrtzJobEntity
;
import
cn.datax.service.quartz.quartz.utils.ScheduleUtil
;
import
cn.datax.service.quartz.service.QrtzJobService
;
import
lombok.RequiredArgsConstructor
;
import
lombok.RequiredArgsConstructor
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.boot.ApplicationArguments
;
import
org.springframework.boot.ApplicationArguments
;
import
org.springframework.boot.ApplicationRunner
;
import
org.springframework.boot.ApplicationRunner
;
import
org.springframework.context.ConfigurableApplicationContext
;
import
org.springframework.context.ConfigurableApplicationContext
;
...
@@ -9,6 +13,7 @@ import org.springframework.stereotype.Component;
...
@@ -9,6 +13,7 @@ import org.springframework.stereotype.Component;
import
java.time.LocalDateTime
;
import
java.time.LocalDateTime
;
import
java.time.format.DateTimeFormatter
;
import
java.time.format.DateTimeFormatter
;
import
java.util.List
;
@Component
@Component
@RequiredArgsConstructor
@RequiredArgsConstructor
...
@@ -17,6 +22,9 @@ public class StartedUpRunner implements ApplicationRunner {
...
@@ -17,6 +22,9 @@ public class StartedUpRunner implements ApplicationRunner {
private
final
ConfigurableApplicationContext
context
;
private
final
ConfigurableApplicationContext
context
;
private
final
Environment
environment
;
private
final
Environment
environment
;
@Autowired
private
QrtzJobService
qrtzJobService
;
@Override
@Override
public
void
run
(
ApplicationArguments
args
)
{
public
void
run
(
ApplicationArguments
args
)
{
if
(
context
.
isActive
())
{
if
(
context
.
isActive
())
{
...
@@ -26,6 +34,10 @@ public class StartedUpRunner implements ApplicationRunner {
...
@@ -26,6 +34,10 @@ public class StartedUpRunner implements ApplicationRunner {
"端口号:"
+
environment
.
getProperty
(
"server.port"
)
+
"\n"
+
"端口号:"
+
environment
.
getProperty
(
"server.port"
)
+
"\n"
+
"-----------------------------------------"
;
"-----------------------------------------"
;
System
.
out
.
println
(
banner
);
System
.
out
.
println
(
banner
);
// 项目启动时,初始化定时器
List
<
QrtzJobEntity
>
list
=
qrtzJobService
.
list
();
ScheduleUtil
.
init
(
list
);
}
}
}
}
}
}
datax-modules/quartz-service-parent/quartz-service/src/main/java/cn/datax/service/quartz/quartz/utils/ScheduleUtil.java
View file @
f1857ffe
...
@@ -2,8 +2,11 @@ package cn.datax.service.quartz.quartz.utils;
...
@@ -2,8 +2,11 @@ package cn.datax.service.quartz.quartz.utils;
import
cn.datax.common.core.DataConstant
;
import
cn.datax.common.core.DataConstant
;
import
cn.datax.common.exception.DataException
;
import
cn.datax.common.exception.DataException
;
import
cn.datax.common.utils.ThrowableUtil
;
import
cn.datax.service.quartz.api.entity.QrtzJobEntity
;
import
cn.datax.service.quartz.api.entity.QrtzJobEntity
;
import
cn.datax.service.quartz.quartz.ScheduleJob
;
import
cn.datax.service.quartz.quartz.ScheduleJob
;
import
cn.hutool.core.collection.CollUtil
;
import
lombok.extern.slf4j.Slf4j
;
import
org.quartz.CronScheduleBuilder
;
import
org.quartz.CronScheduleBuilder
;
import
org.quartz.CronTrigger
;
import
org.quartz.CronTrigger
;
import
org.quartz.JobBuilder
;
import
org.quartz.JobBuilder
;
...
@@ -14,7 +17,12 @@ import org.quartz.Scheduler;
...
@@ -14,7 +17,12 @@ import org.quartz.Scheduler;
import
org.quartz.SchedulerException
;
import
org.quartz.SchedulerException
;
import
org.quartz.TriggerBuilder
;
import
org.quartz.TriggerBuilder
;
import
org.quartz.TriggerKey
;
import
org.quartz.TriggerKey
;
import
org.quartz.impl.triggers.CronTriggerImpl
;
import
java.util.Date
;
import
java.util.List
;
@Slf4j
public
class
ScheduleUtil
{
public
class
ScheduleUtil
{
public
static
final
String
QUARTZ_TASK_KEY
=
"__QUARTZ_TASK_KEY__"
;
public
static
final
String
QUARTZ_TASK_KEY
=
"__QUARTZ_TASK_KEY__"
;
...
@@ -34,25 +42,32 @@ public class ScheduleUtil {
...
@@ -34,25 +42,32 @@ public class ScheduleUtil {
return
JobKey
.
jobKey
(
QUARTZ_TASK_KEY
+
jobId
);
return
JobKey
.
jobKey
(
QUARTZ_TASK_KEY
+
jobId
);
}
}
/**
* 获取触发器key
*/
public
static
TriggerKey
getTriggerKey
(
String
jobId
)
{
return
TriggerKey
.
triggerKey
(
QUARTZ_TASK_KEY
+
jobId
);
}
/**
/**
* 获取表达式触发器
* 获取表达式触发器
*/
*/
public
static
CronTrigger
getCronTrigger
(
String
jobId
)
{
public
static
CronTrigger
getCronTrigger
(
QrtzJobEntity
job
)
{
try
{
try
{
return
(
CronTrigger
)
scheduler
.
getTrigger
(
getTriggerKey
(
jobId
));
TriggerKey
triggerKey
=
getTriggerKey
(
job
.
getId
());
CronTrigger
trigger
=
(
CronTrigger
)
scheduler
.
getTrigger
(
triggerKey
);
// 如果不存在则创建一个定时任务
if
(
null
==
trigger
){
createScheduleJob
(
job
);
trigger
=
(
CronTrigger
)
scheduler
.
getTrigger
(
triggerKey
);
}
return
trigger
;
}
catch
(
SchedulerException
e
)
{
}
catch
(
SchedulerException
e
)
{
throw
new
DataException
(
"获取定时任务CronTrigger出现异常"
,
e
);
throw
new
DataException
(
"获取定时任务CronTrigger出现异常"
,
e
);
}
}
}
}
/**
/**
* 获取触发器key
*/
public
static
TriggerKey
getTriggerKey
(
String
jobId
)
{
return
TriggerKey
.
triggerKey
(
QUARTZ_TASK_KEY
+
jobId
);
}
/**
* 创建定时任务
* 创建定时任务
*/
*/
public
static
void
createScheduleJob
(
QrtzJobEntity
job
)
{
public
static
void
createScheduleJob
(
QrtzJobEntity
job
)
{
...
@@ -62,9 +77,23 @@ public class ScheduleUtil {
...
@@ -62,9 +77,23 @@ public class ScheduleUtil {
// 表达式调度构建器
// 表达式调度构建器
CronScheduleBuilder
cronScheduleBuilder
=
CronScheduleBuilder
.
cronSchedule
(
job
.
getCronExpression
());
CronScheduleBuilder
cronScheduleBuilder
=
CronScheduleBuilder
.
cronSchedule
(
job
.
getCronExpression
());
// 按新的cronExpression表达式构建一个新的trigger
// 按新的cronExpression表达式构建一个新的trigger
CronTrigger
trigger
=
TriggerBuilder
.
newTrigger
().
withIdentity
(
getTriggerKey
(
job
.
getId
())).
withSchedule
(
cronScheduleBuilder
).
build
();
CronTrigger
trigger
=
TriggerBuilder
.
newTrigger
().
withIdentity
(
getTriggerKey
(
job
.
getId
()))
//withMisfireHandlingInstructionDoNothing
//——不触发立即执行
//——等待下次Cron触发频率到达时刻开始按照Cron频率依次执行
//withMisfireHandlingInstructionIgnoreMisfires
//——以错过的第一个频率时间立刻开始执行
//——重做错过的所有频率周期后
//——当下一次触发频率发生时间大于当前时间后,再按照正常的Cron频率依次执行
//withMisfireHandlingInstructionFireAndProceed
//——以当前时间为触发频率立刻触发一次执行
//——然后按照Cron频率依次执行
.
withSchedule
(
cronScheduleBuilder
.
withMisfireHandlingInstructionDoNothing
())
.
build
();
// 放入参数,运行时的方法可以获取
// 放入参数,运行时的方法可以获取
jobDetail
.
getJobDataMap
().
put
(
JOB_DATA_MAP
,
job
);
jobDetail
.
getJobDataMap
().
put
(
JOB_DATA_MAP
,
job
);
// 重置启动时间
((
CronTriggerImpl
)
trigger
).
setStartTime
(
new
Date
());
// 交给scheduler去调度
// 交给scheduler去调度
scheduler
.
scheduleJob
(
jobDetail
,
trigger
);
scheduler
.
scheduleJob
(
jobDetail
,
trigger
);
// 暂停任务
// 暂停任务
...
@@ -82,13 +111,17 @@ public class ScheduleUtil {
...
@@ -82,13 +111,17 @@ public class ScheduleUtil {
public
static
void
updateScheduleJob
(
QrtzJobEntity
job
)
{
public
static
void
updateScheduleJob
(
QrtzJobEntity
job
)
{
try
{
try
{
TriggerKey
triggerKey
=
getTriggerKey
(
job
.
getId
());
TriggerKey
triggerKey
=
getTriggerKey
(
job
.
getId
());
CronTrigger
trigger
=
getCronTrigger
(
job
);
// 表达式调度构建器
// 表达式调度构建器
CronScheduleBuilder
cronScheduleBuilder
=
CronScheduleBuilder
.
cronSchedule
(
job
.
getCronExpression
());
CronScheduleBuilder
cronScheduleBuilder
=
CronScheduleBuilder
.
cronSchedule
(
job
.
getCronExpression
());
CronTrigger
trigger
=
getCronTrigger
(
job
.
getId
());
// 按新的cronExpression表达式重新构建trigger
// 按新的cronExpression表达式重新构建trigger
trigger
=
trigger
.
getTriggerBuilder
().
withIdentity
(
triggerKey
).
withSchedule
(
cronScheduleBuilder
).
build
();
trigger
=
trigger
.
getTriggerBuilder
().
withIdentity
(
triggerKey
)
.
withSchedule
(
cronScheduleBuilder
.
withMisfireHandlingInstructionDoNothing
())
.
build
();
// 参数
// 参数
trigger
.
getJobDataMap
().
put
(
JOB_DATA_MAP
,
job
);
trigger
.
getJobDataMap
().
put
(
JOB_DATA_MAP
,
job
);
// 重置启动时间
((
CronTriggerImpl
)
trigger
).
setStartTime
(
new
Date
());
scheduler
.
rescheduleJob
(
triggerKey
,
trigger
);
scheduler
.
rescheduleJob
(
triggerKey
,
trigger
);
// 暂停任务
// 暂停任务
if
(
job
.
getStatus
()
==
DataConstant
.
EnableState
.
DISABLE
.
getKey
())
{
if
(
job
.
getStatus
()
==
DataConstant
.
EnableState
.
DISABLE
.
getKey
())
{
...
@@ -139,17 +172,38 @@ public class ScheduleUtil {
...
@@ -139,17 +172,38 @@ public class ScheduleUtil {
*/
*/
public
static
void
deleteJob
(
String
id
)
{
public
static
void
deleteJob
(
String
id
)
{
try
{
try
{
TriggerKey
triggerKey
=
getTriggerKey
(
id
);
TriggerKey
triggerKey
=
getTriggerKey
(
id
);
// 停止触发器
// 停止触发器
scheduler
.
pauseTrigger
(
triggerKey
);
scheduler
.
pauseTrigger
(
triggerKey
);
// 移除触发器
// 移除触发器
scheduler
.
unscheduleJob
(
triggerKey
);
scheduler
.
unscheduleJob
(
triggerKey
);
// 暂停任务
scheduler
.
pauseJob
(
getJobKey
(
id
));
// 删除任务
// 删除任务
scheduler
.
deleteJob
(
getJobKey
(
id
));
scheduler
.
deleteJob
(
getJobKey
(
id
));
}
catch
(
SchedulerException
e
)
{
}
catch
(
SchedulerException
e
)
{
throw
new
DataException
(
"删除定时任务失败"
,
e
);
throw
new
DataException
(
"删除定时任务失败"
,
e
);
}
}
}
}
/**
* 项目启动时,初始化定时器
*/
public
static
void
init
(
List
<
QrtzJobEntity
>
list
)
{
if
(
CollUtil
.
isNotEmpty
(
list
)){
for
(
QrtzJobEntity
job
:
list
)
{
TriggerKey
triggerKey
=
getTriggerKey
(
job
.
getId
());
CronTrigger
trigger
=
null
;
try
{
trigger
=
(
CronTrigger
)
scheduler
.
getTrigger
(
triggerKey
);
}
catch
(
SchedulerException
e
)
{
log
.
error
(
"全局异常信息ex={}, StackTrace={}"
,
e
.
getMessage
(),
ThrowableUtil
.
getStackTrace
(
e
));
}
// 如果不存在则创建一个定时任务
if
(
null
==
trigger
)
{
ScheduleUtil
.
createScheduleJob
(
job
);
}
else
{
ScheduleUtil
.
updateScheduleJob
(
job
);
}
}
}
}
}
}
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment