Commit 5383e3f4 by yuwei

项目初始化

parent d3ed5ad9
......@@ -31,7 +31,6 @@ import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -77,7 +76,7 @@ public class ModelServiceImpl extends BaseServiceImpl<ModelDao, ModelEntity> imp
public ModelEntity saveModel(ModelDto modelDto) {
ModelEntity model = modelMapstruct.toEntity(modelDto);
model.setIsSync(DataConstant.TrueOrFalse.FALSE.getKey());
model.setModelPhysicalTable("dynamic_" + DateUtil.format(new Date(), DatePattern.PURE_DATETIME_PATTERN));
model.setModelPhysicalTable("dynamic_" + DateUtil.format(LocalDateTime.now(), DatePattern.PURE_DATETIME_PATTERN));
modelDao.insert(model);
String modelId = model.getId();
List<ModelColumnEntity> modelColumns = model.getModelColumns();
......
......@@ -15,6 +15,7 @@ public class SchedulingConfig {
taskScheduler.setPoolSize(5);
taskScheduler.setRemoveOnCancelPolicy(true);
taskScheduler.setThreadNamePrefix("TaskSchedulerThreadPool-");
taskScheduler.initialize();
return taskScheduler;
}
}
......@@ -3,8 +3,8 @@ package cn.datax.service.data.quality.config;
import cn.datax.common.core.DataConstant;
import cn.datax.service.data.quality.api.entity.ScheduleJobEntity;
import cn.datax.service.data.quality.service.ScheduleJobService;
import cn.datax.service.data.quality.task.CronTaskRegistrar;
import cn.datax.service.data.quality.task.SchedulingRunnable;
import cn.datax.service.data.quality.schedule.CronTaskRegistrar;
import cn.datax.service.data.quality.schedule.SchedulingRunnable;
import cn.hutool.core.collection.CollUtil;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.RequiredArgsConstructor;
......
......@@ -32,7 +32,7 @@ import java.util.stream.Collectors;
*/
@Api(tags = {"数据质量监控任务信息表"})
@RestController
@RequestMapping("/quality/scheduleJob")
@RequestMapping("/scheduleJobs")
public class ScheduleJobController extends BaseController {
@Autowired
......@@ -73,4 +73,30 @@ public class ScheduleJobController extends BaseController {
JsonPage<ScheduleJobVo> jsonPage = new JsonPage<>(page.getCurrent(), page.getSize(), page.getTotal(), collect);
return R.ok().setData(jsonPage);
}
/**
* 暂停任务
* @param id
* @return
*/
@ApiOperation(value = "暂停任务", notes = "根据url的id来暂停指定任务")
@ApiImplicitParam(name = "id", value = "ID", required = true, dataType = "String", paramType = "path")
@PostMapping("/pause/{id}")
public R pauseScheduleJobById(@PathVariable("id") String id) {
scheduleJobService.pauseScheduleJobById(id);
return R.ok();
}
/**
* 恢复任务
* @param id
* @return
*/
@ApiOperation(value = "恢复任务", notes = "根据url的id来恢复指定任务")
@ApiImplicitParam(name = "id", value = "ID", required = true, dataType = "String", paramType = "path")
@PostMapping("/resume/{id}")
public R resumeScheduleJobById(@PathVariable("id") String id) {
scheduleJobService.resumeScheduleJobById(id);
return R.ok();
}
}
package cn.datax.service.data.quality.task;
package cn.datax.service.data.quality.schedule;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
......
package cn.datax.service.data.quality.task;
package cn.datax.service.data.quality.schedule;
import cn.datax.common.utils.SpringContextHolder;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Method;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@Slf4j
......@@ -31,20 +37,18 @@ public class SchedulingRunnable implements Runnable {
public void run() {
log.info("定时任务开始执行 - bean:{},方法:{},参数:{}", beanName, methodName, params);
long startTime = System.currentTimeMillis();
Map map = new HashMap();
String batch;
try {
Object target = SpringContextHolder.getBean(beanName);
Method method = null;
Method method = target.getClass().getDeclaredMethod(methodName, Map.class);
if (StrUtil.isNotEmpty(params)) {
method = target.getClass().getDeclaredMethod(methodName, String.class);
} else {
method = target.getClass().getDeclaredMethod(methodName);
map = new ObjectMapper().readValue(params, Map.class);
}
batch = DateUtil.format(LocalDateTime.now(), DatePattern.PURE_DATETIME_PATTERN);
map.put("batch", batch);
ReflectionUtils.makeAccessible(method);
if (StrUtil.isNotEmpty(params)) {
method.invoke(target, params);
} else {
method.invoke(target);
}
method.invoke(target, map);
} catch (Exception ex) {
log.error(String.format("定时任务执行异常 - bean:%s,方法:%s,参数:%s ", beanName, methodName, params), ex);
}
......@@ -76,7 +80,6 @@ public class SchedulingRunnable implements Runnable {
if (params == null) {
return Objects.hash(beanName, methodName);
}
return Objects.hash(beanName, methodName, params);
}
}
package cn.datax.service.data.quality.schedule.task;
import cn.datax.common.core.DataConstant;
import cn.datax.service.data.quality.api.entity.CheckRuleEntity;
import cn.datax.service.data.quality.service.CheckRuleService;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component("qualityTask")
public class QualityTask {
@Autowired
private CheckRuleService checkRuleService;
public void task(Map map) {
System.out.println("执行批次:" + map);
// 获取可执行的核查规则
List<CheckRuleEntity> list = checkRuleService.list(Wrappers.<CheckRuleEntity>lambdaQuery().eq(CheckRuleEntity::getStatus, DataConstant.TrueOrFalse.TRUE.getKey()));
int poolSize = list.size();
// 定义固定长度的线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(16),
new BasicThreadFactory.Builder().namingPattern("executor-schedule-pool-%d").daemon(true).build());
// 定义计数器
final CountDownLatch latch = new CountDownLatch(poolSize);
list.stream().forEach(s -> {
threadPoolExecutor.execute(() -> {
log.info(s.getRuleName() + ":" + LocalDateTime.now());
latch.countDown();
});
});
// 主线程阻塞,等待所有子线程执行完成
try {
latch.await();
} catch (InterruptedException e) {}
// 关闭线程池
threadPoolExecutor.shutdown();
}
}
......@@ -14,4 +14,8 @@ import cn.datax.common.base.BaseService;
public interface ScheduleJobService extends BaseService<ScheduleJobEntity> {
ScheduleJobEntity getScheduleJobById(String id);
void pauseScheduleJobById(String id);
void resumeScheduleJobById(String id);
}
package cn.datax.service.data.quality.service.impl;
import cn.datax.common.core.DataConstant;
import cn.datax.service.data.quality.api.entity.ScheduleJobEntity;
import cn.datax.service.data.quality.schedule.CronTaskRegistrar;
import cn.datax.service.data.quality.schedule.SchedulingRunnable;
import cn.datax.service.data.quality.service.ScheduleJobService;
import cn.datax.service.data.quality.mapstruct.ScheduleJobMapper;
import cn.datax.service.data.quality.dao.ScheduleJobDao;
......@@ -28,9 +31,30 @@ public class ScheduleJobServiceImpl extends BaseServiceImpl<ScheduleJobDao, Sche
@Autowired
private ScheduleJobMapper scheduleJobMapper;
@Autowired
private CronTaskRegistrar cronTaskRegistrar;
@Override
public ScheduleJobEntity getScheduleJobById(String id) {
ScheduleJobEntity scheduleJobEntity = super.getById(id);
return scheduleJobEntity;
}
@Override
public void pauseScheduleJobById(String id) {
ScheduleJobEntity scheduleJobEntity = super.getById(id);
SchedulingRunnable task = new SchedulingRunnable(scheduleJobEntity.getBeanName(), scheduleJobEntity.getMethodName(), scheduleJobEntity.getMethodParams());
cronTaskRegistrar.removeCronTask(task);
scheduleJobEntity.setStatus(DataConstant.TrueOrFalse.FALSE.getKey());
scheduleJobDao.updateById(scheduleJobEntity);
}
@Override
public void resumeScheduleJobById(String id) {
ScheduleJobEntity scheduleJobEntity = super.getById(id);
SchedulingRunnable task = new SchedulingRunnable(scheduleJobEntity.getBeanName(), scheduleJobEntity.getMethodName(), scheduleJobEntity.getMethodParams());
cronTaskRegistrar.addCronTask(task, scheduleJobEntity.getCronExpression());
scheduleJobEntity.setStatus(DataConstant.TrueOrFalse.TRUE.getKey());
scheduleJobDao.updateById(scheduleJobEntity);
}
}
package cn.datax.service.data.quality.task;
import org.springframework.stereotype.Component;
@Component("myTask")
public class MyTask {
public void taskWithParams(String params) {
System.out.println("执行有参示例任务:" + params);
}
public void taskNoParams() {
System.out.println("执行无参示例任务");
}
}
......@@ -21,7 +21,7 @@ public class ScheduleJob extends QuartzJobBean {
@Autowired
private QrtzJobLogService qrtzJobLogService;
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 30, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(50));
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(16));
@Override
protected void executeInternal(JobExecutionContext context) {
......
import request from '@/utils/request'
export function pageCheckJob(data) {
return request({
url: '/data/quality/scheduleJobs/page',
method: 'get',
params: data
})
}
export function pauseCheckJob(id) {
return request({
url: '/data/quality/scheduleJobs/pause/' + id,
method: 'post'
})
}
export function resumeCheckJob(id) {
return request({
url: '/data/quality/scheduleJobs/resume/' + id,
method: 'post'
})
}
<template>
<div class="top-nav">
<div class="log">数据治理</div>
<div class="log">智数通</div>
<el-menu
:active-text-color="variables.menuActiveText"
:default-active="activeMenu"
......
<template>
<div class="app-container">
DataBlood
<el-card class="box-card" shadow="always">
<el-row>
<el-col :span="24">
<el-form ref="queryForm" :model="queryParams" :inline="true" class="demo-form-inline">
<el-form-item label="数据表名">
<el-input
v-model="queryParams.tableName"
placeholder="请输入数据表名"
clearable
size="small"
/>
</el-form-item>
<el-form-item>
<el-button type="primary" @click="handleQuery">查询</el-button>
</el-form-item>
</el-form>
</el-col>
</el-row>
<el-row>
<el-col :span="24">
<el-table
:data="tableDataList"
stripe
border
:max-height="200"
style="width: 100%; margin: 15px 0;"
>
<el-table-column prop="subjectArea" label="数据主题域" align="center" show-overflow-tooltip />
<el-table-column prop="mappingName" label="映射名称" align="center" show-overflow-tooltip />
<el-table-column prop="sourceTable" label="源表" align="center" show-overflow-tooltip />
<el-table-column prop="targetTable" label="目标表" align="center" show-overflow-tooltip />
</el-table>
</el-col>
</el-row>
<el-row>
<el-col :span="24">
<div id="chart" style="width: 100%; height: 300px;" />
</el-col>
</el-row>
</el-card>
</div>
</template>
<script>
import echarts from 'echarts'
export default {
name: 'DataBlood'
name: 'DataBlood',
data: function() {
return {
queryParams: {
tableName: ''
},
chart: null,
tableDataList: []
}
},
created() {
},
mounted() {
this.chart = echarts.init(document.getElementById('chart'))
},
beforeDestroy() {
if (!this.chart) {
return false
}
this.chart.dispose()
this.chart = null
},
methods: {
handleQuery() {
this.tableDataList = [
{ subjectArea: 'DataCenter', mappingName: 'm_ts_test_table_inc', sourceTable: 'src_test_table', targetTable: 'ts_test_table' },
{ subjectArea: 'DataCenter', mappingName: 'm_ts_test_table_inc', sourceTable: 'ts_test_table', targetTable: 'th_test_table' },
{ subjectArea: 'DataCenter', mappingName: 'm_ts_test_table_inc', sourceTable: 'ts_test_table', targetTable: 'ti_test_table' },
{ subjectArea: 'DataCenter', mappingName: 'm_ods_test_table_inc', sourceTable: 'ti_test_table', targetTable: 't_test_table' }
]
let data = { nodes: [], links: [] }
const nodes = []
const links = []
const colors = ['#fbb4ae', '#b3cde3', '#ccebc5', '#decbe4']
this.tableDataList.forEach(item => {
nodes.push({
name: item.sourceTable,
itemStyle: {
normal: {
color: colors[Math.floor(Math.random() * colors.length)]
}
}
})
nodes.push({
name: item.targetTable,
itemStyle: {
normal: {
color: colors[Math.floor(Math.random() * colors.length)]
}
}
})
links.push({
source: item.sourceTable,
target: item.targetTable,
value: item.mappingName.length,
mapping: item.mappingName
})
})
// nodes数组去重
const res = new Map()
const nodes_uniq = nodes.filter((node) => !res.has(node.name) && res.set(node.name, 1))
data = {
nodes: nodes_uniq,
links: links
}
this.chart.clear()
this.chart.setOption({
title: {
text: '血缘流向'
},
tooltip: {
trigger: 'item',
triggerOn: 'mousemove',
formatter: function(x) {
return x.data.mapping
}
},
animation: false,
series: [
{
type: 'sankey',
focusNodeAdjacency: 'allEdges',
nodeAlign: 'left',
data: data.nodes,
links: data.links,
lineStyle: {
color: 'source',
curveness: 0.5
}
}
]
})
}
}
}
</script>
<style lang="scss" scoped>
.el-card ::v-deep .el-card__body {
height: calc(100vh - 170px);
}
</style>
<template>
<el-card class="box-card" shadow="always">
<el-table
v-loading="loading"
:data="tableDataList"
border
tooltip-effect="dark"
:height="tableHeight"
style="width: 100%;margin: 15px 0;"
>
<el-table-column type="selection" width="55" align="center" />
<el-table-column label="序号" width="55" align="center">
<template slot-scope="scope">
<span>{{ scope.$index +1 }}</span>
</template>
</el-table-column>
<template v-for="(item, index) in tableColumns">
<el-table-column
v-if="item.show"
:key="index"
:prop="item.prop"
:label="item.label"
:formatter="item.formatter"
align="center"
show-overflow-tooltip
/>
</template>
<el-table-column label="操作" align="center" class-name="small-padding fixed-width">
<template slot-scope="scope">
<el-popover
placement="left"
trigger="click"
>
<el-button
:disabled="scope.row.status === '0'"
size="mini"
type="text"
icon="el-icon-delete"
@click="handlePause(scope.row)"
>暂停任务</el-button>
<el-button
:disabled="scope.row.status === '1'"
size="mini"
type="text"
icon="el-icon-delete"
@click="resumePause(scope.row)"
>恢复任务</el-button>
<el-button slot="reference">操作</el-button>
</el-popover>
</template>
</el-table-column>
</el-table>
<el-pagination
:page-sizes="[10, 20, 50, 100]"
layout="total, sizes, prev, pager, next, jumper"
:current-page.sync="queryParams.pageNum"
:page-size.sync="queryParams.pageSize"
:total="total"
@size-change="handleSizeChange"
@current-change="handleCurrentChange"
/>
</el-card>
</template>
<script>
import { pageCheckJob, pauseCheckJob, resumeCheckJob } from '@/api/quality/checkjob'
export default {
name: 'CheckJobList',
data() {
return {
tableHeight: document.body.offsetHeight - 310 + 'px',
// 展示切换
showOptions: {
data: {},
showList: true
},
// 遮罩层
loading: true,
// 表格头
tableColumns: [
{ prop: 'jobName', label: '任务名称', show: true },
{ prop: 'beanName', label: 'bean名称', show: true },
{ prop: 'methodName', label: '方法名称', show: true },
{ prop: 'methodParams', label: '方法参数', show: true },
{ prop: 'cronExpression', label: 'cron表达式', show: true },
{
prop: 'status',
label: '状态',
show: true,
formatter: this.statusFormatter
}
],
// 状态数据字典
statusOptions: [],
// 数据集表格数据
tableDataList: [],
// 总数据条数
total: 0,
// 查询参数
queryParams: {
pageNum: 1,
pageSize: 20
}
}
},
created() {
this.getDicts('sys_job_status').then(response => {
if (response.success) {
this.statusOptions = response.data
}
})
this.getList()
},
methods: {
/** 查询数据集列表 */
getList() {
this.loading = true
pageCheckJob(this.queryParams).then(response => {
this.loading = false
if (response.success) {
const { data } = response
this.tableDataList = data.data
this.total = data.total
}
})
},
handlePause(row) {
this.$confirm('是否暂停该任务', '提示', {
confirmButtonText: '确定',
cancelButtonText: '取消',
type: 'warning'
}).then(() => {
pauseCheckJob(row.id).then(response => {
if (response.success) {
this.$message.success('任务暂停成功')
this.getList()
}
})
}).catch(() => {
})
},
resumePause(row) {
this.$confirm('是否恢复该任务', '提示', {
confirmButtonText: '确定',
cancelButtonText: '取消',
type: 'warning'
}).then(() => {
resumeCheckJob(row.id).then(response => {
if (response.success) {
this.$message.success('任务恢复成功')
this.getList()
}
})
}).catch(() => {
})
},
handleSizeChange(val) {
console.log(`每页 ${val} 条`)
this.queryParams.pageNum = 1
this.queryParams.pageSize = val
this.getList()
},
handleCurrentChange(val) {
console.log(`当前页: ${val}`)
this.queryParams.pageNum = val
this.getList()
},
statusFormatter(row, column, cellValue, index) {
const dictLabel = this.selectDictLabel(this.statusOptions, cellValue)
if (cellValue === '1') {
return <el-tag type='success'>{dictLabel}</el-tag>
} else {
return <el-tag type='warning'>{dictLabel}</el-tag>
}
}
}
}
</script>
<style lang="scss" scoped>
.right-toolbar {
float: right;
}
.el-card ::v-deep .el-card__body {
height: calc(100vh - 170px);
}
</style>
<template>
<div class="app-container">
CheckJob
<transition name="el-zoom-in-center">
<check-job-list v-if="options.showList" @showCard="showCard" />
</transition>
</div>
</template>
<script>
import CheckJobList from './CheckJobList'
export default {
name: 'CheckJob'
name: 'CheckJob',
components: { CheckJobList },
data() {
return {
options: {
data: {},
showList: true
}
}
},
methods: {
showCard(data) {
Object.assign(this.options, data)
}
}
}
</script>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment