Commit c1248219 by yuwei

Merge remote-tracking branch 'origin/master'

# Conflicts:
#	datax-modules/data-market-service-parent/data-market-service-mapping/src/main/java/cn/datax/service/data/market/mapping/config/RabbitMqListenerConfig.java
parents eae9c125 f2f67696
...@@ -25,5 +25,5 @@ public class RabbitMqConstant { ...@@ -25,5 +25,5 @@ public class RabbitMqConstant {
/** /**
* TOPIC类型的路由键:工作流 {}占位符替换 * TOPIC类型的路由键:工作流 {}占位符替换
*/ */
public static final String TOPIC_WORKFLOW_KEY = "topic.workflow.key.{}"; public static final String TOPIC_WORKFLOW_KEY = "topic.workflow.key.";
} }
...@@ -12,6 +12,16 @@ spring: ...@@ -12,6 +12,16 @@ spring:
max-wait: -1ms # 连接池最大阻塞等待时间(使用负值表示没有限制) max-wait: -1ms # 连接池最大阻塞等待时间(使用负值表示没有限制)
max-idle: 10 # 连接池中的最大空闲连接 max-idle: 10 # 连接池中的最大空闲连接
min-idle: 5 # 连接池中的最小空闲连接 min-idle: 5 # 连接池中的最小空闲连接
rabbitmq:
host: localhost
port: 5672
username: admin
password: 1234@abcd
listener:
simple:
acknowledge-mode: manual
concurrency: 1
max-concurrency: 10
datasource: datasource:
dynamic: dynamic:
primary: mysql primary: mysql
......
...@@ -12,6 +12,15 @@ spring: ...@@ -12,6 +12,15 @@ spring:
max-wait: -1ms # 连接池最大阻塞等待时间(使用负值表示没有限制) max-wait: -1ms # 连接池最大阻塞等待时间(使用负值表示没有限制)
max-idle: 10 # 连接池中的最大空闲连接 max-idle: 10 # 连接池中的最大空闲连接
min-idle: 5 # 连接池中的最小空闲连接 min-idle: 5 # 连接池中的最小空闲连接
rabbitmq:
host: localhost
port: 5672
username: admin
password: 1234@abcd
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
datasource: datasource:
dynamic: dynamic:
primary: mysql primary: mysql
......
...@@ -30,7 +30,6 @@ public class RabbitMqListenerConfig { ...@@ -30,7 +30,6 @@ public class RabbitMqListenerConfig {
@Autowired @Autowired
private MappingHandlerMapping mappingHandlerMapping; private MappingHandlerMapping mappingHandlerMapping;
/** /**
* api发布与撤销 * api发布与撤销
* @param map type 1:发布 2:撤销 * @param map type 1:发布 2:撤销
...@@ -65,7 +64,7 @@ public class RabbitMqListenerConfig { ...@@ -65,7 +64,7 @@ public class RabbitMqListenerConfig {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}else { }else {
//记录日志 //记录日志
log.error("消息消费失败处理:{}",e.getMessage()); log.error("消息消费失败处理:{}", e.getMessage());
//第一个参数为消息的index,第二个参数是是否批量处理,第三个参数为是否让被拒绝的消息重新入队列 //第一个参数为消息的index,第二个参数是是否批量处理,第三个参数为是否让被拒绝的消息重新入队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} }
......
package cn.datax.service.data.market.mapping.service;
public interface QueueHandlerService {
void handlerRelease(String id);
void handlerCancel(String id);
}
package cn.datax.service.data.market.mapping.service.impl;
import cn.datax.service.data.market.api.entity.DataApiEntity;
import cn.datax.service.data.market.api.feign.DataApiServiceFeign;
import cn.datax.service.data.market.mapping.handler.MappingHandlerMapping;
import cn.datax.service.data.market.mapping.service.QueueHandlerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class QueueHandlerServiceImpl implements QueueHandlerService {
@Autowired
private DataApiServiceFeign dataApiServiceFeign;
@Autowired
private MappingHandlerMapping mappingHandlerMapping;
@Override
public void handlerRelease(String id) {
DataApiEntity dataApiEntity = dataApiServiceFeign.getDataApiById(id);
if (dataApiEntity != null) {
mappingHandlerMapping.registerMapping(dataApiEntity);
}
}
@Override
public void handlerCancel(String id) {
DataApiEntity dataApiEntity = dataApiServiceFeign.getDataApiById(id);
if (dataApiEntity != null) {
mappingHandlerMapping.unregisterMapping(dataApiEntity);
}
}
}
...@@ -363,7 +363,7 @@ public class DataApiServiceImpl extends BaseServiceImpl<DataApiDao, DataApiEntit ...@@ -363,7 +363,7 @@ public class DataApiServiceImpl extends BaseServiceImpl<DataApiDao, DataApiEntit
@Override @Override
public void releaseDataApi(String id) { public void releaseDataApi(String id) {
Map<String, Object> map = new HashMap<>(); Map<String, Object> map = new HashMap<>(2);
map.put("id", id); map.put("id", id);
map.put("type", "1"); map.put("type", "1");
String obj = (String) Optional.ofNullable(rabbitTemplate.convertSendAndReceive(RabbitMqConstant.FANOUT_API_QUEUE, "", map)).orElse(""); String obj = (String) Optional.ofNullable(rabbitTemplate.convertSendAndReceive(RabbitMqConstant.FANOUT_API_QUEUE, "", map)).orElse("");
...@@ -377,7 +377,7 @@ public class DataApiServiceImpl extends BaseServiceImpl<DataApiDao, DataApiEntit ...@@ -377,7 +377,7 @@ public class DataApiServiceImpl extends BaseServiceImpl<DataApiDao, DataApiEntit
@Override @Override
public void cancelDataApi(String id) { public void cancelDataApi(String id) {
Map<String, Object> map = new HashMap<>(); Map<String, Object> map = new HashMap<>(2);
map.put("id", id); map.put("id", id);
map.put("type", "2"); map.put("type", "2");
String obj = (String) Optional.ofNullable(rabbitTemplate.convertSendAndReceive(RabbitMqConstant.FANOUT_API_QUEUE, "", map)).orElse(""); String obj = (String) Optional.ofNullable(rabbitTemplate.convertSendAndReceive(RabbitMqConstant.FANOUT_API_QUEUE, "", map)).orElse("");
......
...@@ -2,6 +2,9 @@ package cn.datax.service.data.masterdata.config; ...@@ -2,6 +2,9 @@ package cn.datax.service.data.masterdata.config;
import cn.datax.common.rabbitmq.config.RabbitMqConstant; import cn.datax.common.rabbitmq.config.RabbitMqConstant;
import cn.datax.common.utils.ThrowableUtil; import cn.datax.common.utils.ThrowableUtil;
import cn.datax.service.data.masterdata.api.entity.ModelEntity;
import cn.datax.service.data.masterdata.dao.ModelDao;
import cn.datax.service.workflow.api.enums.VariablesEnum;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
...@@ -9,29 +12,41 @@ import org.springframework.amqp.rabbit.annotation.Exchange; ...@@ -9,29 +12,41 @@ import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import java.util.Map;
@Slf4j @Slf4j
@Configuration @Configuration
public class RabbitMqListenerConfig { public class RabbitMqListenerConfig {
@Autowired
private ModelDao modelDao;
/** /**
* 消费工作流 业务编码 6011 * 消费工作流 业务编码 5011
* @param id * @param map
* @param channel * @param channel
* @param message * @param message
* @return * @return
* @throws Exception * @throws Exception
*/ */
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(name = RabbitMqConstant.TOPIC_EXCHANGE_WORKFLOW, type = "topic", durable = "true", autoDelete = "false"), @RabbitListener(bindings = @QueueBinding(exchange = @Exchange(name = RabbitMqConstant.TOPIC_EXCHANGE_WORKFLOW, type = "topic", durable = "true", autoDelete = "false"),
key = {""}, key = { RabbitMqConstant.TOPIC_WORKFLOW_KEY + "5011" },
value = @Queue(value = RabbitMqConstant.TOPIC_WORKFLOW_QUEUE, durable = "true", exclusive = "false", autoDelete = "false"))) value = @Queue(value = RabbitMqConstant.TOPIC_WORKFLOW_QUEUE, durable = "true", exclusive = "false", autoDelete = "false")))
public String fanoutQueueRelease(String id, Channel channel, Message message) throws Exception { public void fanoutQueueRelease(Map map, Channel channel, Message message) throws Exception {
try { try {
log.info("fanoutQueueRelease接收到了:{}", id); log.info("接收到了消息:{}", map);
String businessKey = (String) map.get(VariablesEnum.businessKey.toString());
String businessCode = (String) map.get(VariablesEnum.businessCode.toString());
String flowStatus = (String) map.get("flowStatus");
ModelEntity model = new ModelEntity();
model.setId(businessKey);
model.setFlowStatus(flowStatus);
modelDao.updateById(model);
// 手动确认 // 手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
return id;
}catch (Exception e){ }catch (Exception e){
log.error("全局异常信息ex={}, StackTrace={}", e.getMessage(), ThrowableUtil.getStackTrace(e)); log.error("全局异常信息ex={}, StackTrace={}", e.getMessage(), ThrowableUtil.getStackTrace(e));
if (message.getMessageProperties().getRedelivered()){ if (message.getMessageProperties().getRedelivered()){
...@@ -40,11 +55,10 @@ public class RabbitMqListenerConfig { ...@@ -40,11 +55,10 @@ public class RabbitMqListenerConfig {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}else { }else {
//记录日志 //记录日志
log.error("消息消费失败处理:{}",e.getMessage()); log.error("消息消费失败处理:{}", e.getMessage());
//第一个参数为消息的index,第二个参数是是否批量处理,第三个参数为是否让被拒绝的消息重新入队列 //第一个参数为消息的index,第二个参数是是否批量处理,第三个参数为是否让被拒绝的消息重新入队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} }
} }
return null;
} }
} }
...@@ -67,6 +67,7 @@ public class ModelController extends BaseController { ...@@ -67,6 +67,7 @@ public class ModelController extends BaseController {
public R getModelList() { public R getModelList() {
QueryWrapper<ModelEntity> queryWrapper = new QueryWrapper<>(); QueryWrapper<ModelEntity> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("status", DataConstant.EnableState.ENABLE.getKey()); queryWrapper.eq("status", DataConstant.EnableState.ENABLE.getKey());
queryWrapper.eq("flow_status", DataConstant.AuditState.AGREE.getKey());
List<ModelEntity> list = modelService.list(queryWrapper); List<ModelEntity> list = modelService.list(queryWrapper);
List<ModelVo> collect = list.stream().map(modelMapstruct::toVO).collect(Collectors.toList()); List<ModelVo> collect = list.stream().map(modelMapstruct::toVO).collect(Collectors.toList());
return R.ok().setData(collect); return R.ok().setData(collect);
......
...@@ -31,7 +31,6 @@ import org.springframework.transaction.annotation.Propagation; ...@@ -31,7 +31,6 @@ import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -70,14 +69,14 @@ public class ModelServiceImpl extends BaseServiceImpl<ModelDao, ModelEntity> imp ...@@ -70,14 +69,14 @@ public class ModelServiceImpl extends BaseServiceImpl<ModelDao, ModelEntity> imp
private static String BIND_GB_CODE = "gb_code"; private static String BIND_GB_CODE = "gb_code";
private static String BIND_GB_NAME = "gb_name"; private static String BIND_GB_NAME = "gb_name";
private static String DEFAULT_BUSINESS_CODE = "6011"; private static String DEFAULT_BUSINESS_CODE = "5011";
@Override @Override
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public ModelEntity saveModel(ModelDto modelDto) { public ModelEntity saveModel(ModelDto modelDto) {
ModelEntity model = modelMapstruct.toEntity(modelDto); ModelEntity model = modelMapstruct.toEntity(modelDto);
model.setIsSync(DataConstant.TrueOrFalse.FALSE.getKey()); model.setIsSync(DataConstant.TrueOrFalse.FALSE.getKey());
model.setModelPhysicalTable("dynamic_" + DateUtil.format(new Date(), DatePattern.PURE_DATETIME_PATTERN)); model.setModelPhysicalTable("dynamic_" + DateUtil.format(LocalDateTime.now(), DatePattern.PURE_DATETIME_PATTERN));
modelDao.insert(model); modelDao.insert(model);
String modelId = model.getId(); String modelId = model.getId();
List<ModelColumnEntity> modelColumns = model.getModelColumns(); List<ModelColumnEntity> modelColumns = model.getModelColumns();
......
package cn.datax.service.data.metadata.api.entity; package cn.datax.service.data.metadata.api.entity;
import cn.datax.common.base.DataScopeBaseEntity; import cn.datax.common.base.DataScopeBaseEntity;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
...@@ -55,12 +56,16 @@ public class MetadataChangeRecordEntity extends DataScopeBaseEntity { ...@@ -55,12 +56,16 @@ public class MetadataChangeRecordEntity extends DataScopeBaseEntity {
/** /**
* 数据源 * 数据源
*/ */
@TableField(exist = false)
private String sourceId; private String sourceId;
@TableField(exist = false)
private String sourceName; private String sourceName;
/** /**
* 数据库表 * 数据库表
*/ */
@TableField(exist = false)
private String tableId; private String tableId;
@TableField(exist = false)
private String tableName; private String tableName;
} }
...@@ -36,9 +36,9 @@ public class MetadataSourceEntity extends DataScopeBaseEntity { ...@@ -36,9 +36,9 @@ public class MetadataSourceEntity extends DataScopeBaseEntity {
private String sourceName; private String sourceName;
/** /**
* 元数据同步1是0否 * 元数据同步(0否,1是)
*/ */
private Integer sourceSync; private String isSync;
/** /**
* 数据源连接信息 * 数据源连接信息
......
...@@ -28,5 +28,5 @@ public class MetadataSourceVo implements Serializable { ...@@ -28,5 +28,5 @@ public class MetadataSourceVo implements Serializable {
private String dbType; private String dbType;
private String sourceName; private String sourceName;
private DbSchema dbSchema; private DbSchema dbSchema;
private Integer sourceSync; private String isSync;
} }
...@@ -85,7 +85,7 @@ public class AsyncTask { ...@@ -85,7 +85,7 @@ public class AsyncTask {
}); });
} }
} }
dataSource.setSourceSync(Integer.valueOf(DataConstant.TrueOrFalse.TRUE.getKey())); dataSource.setIsSync(DataConstant.TrueOrFalse.TRUE.getKey());
metadataSourceDao.updateById(dataSource); metadataSourceDao.updateById(dataSource);
log.info("异步任务执行完成!耗时{}秒", (System.currentTimeMillis() - start / 1000)); log.info("异步任务执行完成!耗时{}秒", (System.currentTimeMillis() - start / 1000));
} }
......
...@@ -84,7 +84,7 @@ public class MetadataSourceServiceImpl extends BaseServiceImpl<MetadataSourceDao ...@@ -84,7 +84,7 @@ public class MetadataSourceServiceImpl extends BaseServiceImpl<MetadataSourceDao
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public void saveMetadataSource(MetadataSourceDto metadataSourceDto) { public void saveMetadataSource(MetadataSourceDto metadataSourceDto) {
MetadataSourceEntity dataSource = metadataSourceMapper.toEntity(metadataSourceDto); MetadataSourceEntity dataSource = metadataSourceMapper.toEntity(metadataSourceDto);
dataSource.setSourceSync(Integer.valueOf(DataConstant.TrueOrFalse.FALSE.getKey())); dataSource.setIsSync(DataConstant.TrueOrFalse.FALSE.getKey());
metadataSourceDao.insert(dataSource); metadataSourceDao.insert(dataSource);
} }
...@@ -158,7 +158,7 @@ public class MetadataSourceServiceImpl extends BaseServiceImpl<MetadataSourceDao ...@@ -158,7 +158,7 @@ public class MetadataSourceServiceImpl extends BaseServiceImpl<MetadataSourceDao
@Override @Override
public void syncMetadata(String id) { public void syncMetadata(String id) {
MetadataSourceEntity metadataSourceEntity = super.getById(id); MetadataSourceEntity metadataSourceEntity = super.getById(id);
if (Integer.valueOf(DataConstant.TrueOrFalse.TRUE.getKey()).equals(metadataSourceEntity.getSourceSync())) { if (DataConstant.TrueOrFalse.TRUE.getKey().equals(metadataSourceEntity.getIsSync())) {
throw new DataException("元数据已同步"); throw new DataException("元数据已同步");
} }
// 异步执行同步任务 // 异步执行同步任务
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
<result column="remark" property="remark" /> <result column="remark" property="remark" />
<result column="db_type" property="dbType" /> <result column="db_type" property="dbType" />
<result column="source_name" property="sourceName" /> <result column="source_name" property="sourceName" />
<result column="source_sync" property="sourceSync" /> <result column="is_sync" property="isSync" />
</resultMap> </resultMap>
<resultMap id="ExtendResultMap" type="cn.datax.service.data.metadata.api.entity.MetadataSourceEntity" extends="BaseResultMap"> <resultMap id="ExtendResultMap" type="cn.datax.service.data.metadata.api.entity.MetadataSourceEntity" extends="BaseResultMap">
...@@ -31,7 +31,7 @@ ...@@ -31,7 +31,7 @@
update_by, update_by,
update_time, update_time,
remark, remark,
db_type, source_name, source_sync db_type, source_name, is_sync
</sql> </sql>
<sql id="Extend_Column_List"> <sql id="Extend_Column_List">
......
package cn.datax.service.data.quality.api.dto;
import cn.datax.common.validate.ValidationGroups;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.io.Serializable;
/**
* <p>
* 核查规则信息表 实体DTO
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@ApiModel(value = "核查规则信息表Model")
@Data
public class CheckRuleDto implements Serializable {
private static final long serialVersionUID=1L;
@ApiModelProperty(value = "主键ID")
@NotBlank(message = "主键ID不能为空", groups = {ValidationGroups.Update.class})
private String id;
@ApiModelProperty(value = "规则名称")
private String ruleName;
@ApiModelProperty(value = "规则类型")
private String ruleTypeId;
@ApiModelProperty(value = "规则级别(3高、2中、1低)")
private String ruleLevel;
@ApiModelProperty(value = "数据源主键")
private String ruleSourceId;
@ApiModelProperty(value = "数据源")
private String ruleSource;
@ApiModelProperty(value = "数据表主键")
private String ruleTableId;
@ApiModelProperty(value = "数据表")
private String ruleTable;
@ApiModelProperty(value = "数据表名称")
private String ruleTableComment;
@ApiModelProperty(value = "核查字段主键")
private String ruleColumnId;
@ApiModelProperty(value = "核查字段")
private String ruleColumn;
@ApiModelProperty(value = "核查字段名称")
private String ruleColumnComment;
@ApiModelProperty(value = "核查脚本")
private String ruleSql;
@ApiModelProperty(value = "状态")
@NotNull(message = "状态不能为空", groups = {ValidationGroups.Insert.class, ValidationGroups.Update.class})
private String status;
@ApiModelProperty(value = "备注")
private String remark;
}
package cn.datax.service.data.quality.api.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import java.io.Serializable;
import java.time.LocalDateTime;
import lombok.Data;
import lombok.experimental.Accessors;
/**
* <p>
* 核查报告信息表
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Data
@Accessors(chain = true)
@TableName(value = "quality_check_report", autoResultMap = true)
public class CheckReportEntity implements Serializable {
private static final long serialVersionUID=1L;
/**
* 主键
*/
@TableId(value = "id", type = IdType.ASSIGN_ID)
private String id;
/**
* 核查规则主键
*/
private String checkRuleId;
/**
* 核查时间
*/
private LocalDateTime checkDate;
/**
* 核查结果
*/
private String checkResult;
/**
* 核查数量
*/
private Integer checkTotalCount;
/**
* 报错数量
*/
private Integer checkErrorCount;
/**
* 核查批次号
*/
private String checkBatch;
/**
* 规则名称
*/
@TableField(exist = false)
private String ruleName;
/**
* 规则类型
*/
@TableField(exist = false)
private String ruleType;
/**
* 数据源
*/
@TableField(exist = false)
private String ruleSource;
/**
* 数据表
*/
@TableField(exist = false)
private String ruleTable;
/**
* 核查字段
*/
@TableField(exist = false)
private String ruleColumn;
}
package cn.datax.service.data.quality.api.entity;
import cn.datax.common.base.DataScopeBaseEntity;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
/**
* <p>
* 核查规则信息表
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Data
@EqualsAndHashCode(callSuper = true)
@Accessors(chain = true)
@TableName(value = "quality_check_rule", autoResultMap = true)
public class CheckRuleEntity extends DataScopeBaseEntity {
private static final long serialVersionUID=1L;
/**
* 规则名称
*/
private String ruleName;
/**
* 规则类型主键
*/
private String ruleTypeId;
/**
* 规则类型
*/
@TableField(exist = false)
private String ruleType;
/**
* 规则级别(3高、2中、1低)
*/
private String ruleLevel;
/**
* 数据源主键
*/
private String ruleSourceId;
/**
* 数据源
*/
private String ruleSource;
/**
* 数据表主键
*/
private String ruleTableId;
/**
* 数据表
*/
private String ruleTable;
/**
* 数据表名称
*/
private String ruleTableComment;
/**
* 核查字段主键
*/
private String ruleColumnId;
/**
* 核查字段
*/
private String ruleColumn;
/**
* 核查字段名称
*/
private String ruleColumnComment;
/**
* 核查脚本
*/
private String ruleSql;
/**
* 最近核查批次号(关联确定唯一核查报告)
*/
private String lastCheckBatch;
}
package cn.datax.service.data.quality.api.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;
/**
* <p>
* 规则类型信息表
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Data
@Accessors(chain = true)
@TableName("quality_rule_type")
public class RuleTypeEntity implements Serializable {
private static final long serialVersionUID=1L;
/**
* 主键
*/
@TableId(value = "id", type = IdType.ASSIGN_ID)
private String id;
/**
* 类型名称
*/
private String name;
}
package cn.datax.service.data.quality.api.entity;
import lombok.Data;
@Data
public class RuleTypeReportEntity extends RuleTypeEntity {
private static final long serialVersionUID=1L;
/**
* 报错数量
*/
private Integer checkErrorCount;
}
package cn.datax.service.data.quality.api.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;
/**
* <p>
* 数据质量监控任务信息表
* </p>
*
* @author yuwei
* @since 2020-09-29
*/
@Data
@Accessors(chain = true)
@TableName("quality_schedule_job")
public class ScheduleJobEntity implements Serializable {
private static final long serialVersionUID=1L;
/**
* 主键
*/
@TableId(value = "id", type = IdType.ASSIGN_ID)
private String id;
/**
* 任务名称
*/
private String jobName;
/**
* bean名称
*/
private String beanName;
/**
* 方法名称
*/
private String methodName;
/**
* 方法参数
*/
private String methodParams;
/**
* cron表达式
*/
private String cronExpression;
/**
* 状态(1运行 0暂停)
*/
private String status;
}
package cn.datax.service.data.quality.api.query;
import cn.datax.common.base.BaseQueryParams;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* <p>
* 核查报告信息表 查询实体
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class CheckReportQuery extends BaseQueryParams {
private static final long serialVersionUID=1L;
private String ruleTypeId;
private String ruleName;
private String ruleSource;
private String ruleTable;
private String ruleColumn;
}
package cn.datax.service.data.quality.api.query;
import cn.datax.common.base.BaseQueryParams;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* <p>
* 核查规则信息表 查询实体
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class CheckRuleQuery extends BaseQueryParams {
private static final long serialVersionUID=1L;
private String ruleTypeId;
private String ruleName;
private String ruleSource;
private String ruleTable;
private String ruleColumn;
}
package cn.datax.service.data.quality.api.query;
import cn.datax.common.base.BaseQueryParams;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* <p>
* 规则类型信息表 查询实体
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class RuleTypeQuery extends BaseQueryParams {
private static final long serialVersionUID=1L;
private String name;
}
package cn.datax.service.data.quality.api.query;
import cn.datax.common.base.BaseQueryParams;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* <p>
* 数据质量监控任务信息表 查询实体
* </p>
*
* @author yuwei
* @since 2020-09-29
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class ScheduleJobQuery extends BaseQueryParams {
private static final long serialVersionUID=1L;
}
package cn.datax.service.data.quality.api.vo;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* <p>
* 核查报告信息表 实体VO
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Data
public class CheckReportVo implements Serializable {
private static final long serialVersionUID=1L;
private String id;
private String checkRuleId;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private LocalDateTime checkDate;
private String checkResult;
private Integer checkTotalCount;
private Integer checkErrorCount;
private String ruleName;
private String ruleType;
private String ruleSource;
private String ruleTable;
private String ruleColumn;
}
package cn.datax.service.data.quality.api.vo;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* <p>
* 核查规则信息表 实体VO
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Data
public class CheckRuleVo implements Serializable {
private static final long serialVersionUID=1L;
private String id;
private String status;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private LocalDateTime createTime;
private String remark;
private String ruleName;
private String ruleTypeId;
private String ruleType;
private String ruleLevel;
private String ruleSourceId;
private String ruleSource;
private String ruleTableId;
private String ruleTable;
private String ruleTableComment;
private String ruleColumnId;
private String ruleColumn;
private String ruleColumnComment;
private String ruleSql;
}
package cn.datax.service.data.quality.api.vo;
import lombok.Data;
import java.io.Serializable;
/**
* <p>
* 规则类型信息表 实体VO
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Data
public class RuleTypeVo implements Serializable {
private static final long serialVersionUID=1L;
private String id;
private String name;
}
package cn.datax.service.data.quality.api.vo;
import lombok.Data;
import java.io.Serializable;
/**
* <p>
* 数据质量监控任务信息表 实体VO
* </p>
*
* @author yuwei
* @since 2020-09-29
*/
@Data
public class ScheduleJobVo implements Serializable {
private static final long serialVersionUID=1L;
private String id;
private String status;
private String jobName;
private String beanName;
private String methodName;
private String methodParams;
private String cronExpression;
}
package cn.datax.service.data.quality.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@Configuration
public class SchedulingConfig {
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
// 定时任务执行线程池核心线程数
taskScheduler.setPoolSize(5);
taskScheduler.setRemoveOnCancelPolicy(true);
taskScheduler.setThreadNamePrefix("TaskSchedulerThreadPool-");
taskScheduler.initialize();
return taskScheduler;
}
}
package cn.datax.service.data.quality.config; package cn.datax.service.data.quality.config;
import cn.datax.common.core.DataConstant;
import cn.datax.service.data.quality.api.entity.ScheduleJobEntity;
import cn.datax.service.data.quality.service.ScheduleJobService;
import cn.datax.service.data.quality.schedule.CronTaskRegistrar;
import cn.datax.service.data.quality.schedule.SchedulingRunnable;
import cn.hutool.core.collection.CollUtil;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner; import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.ConfigurableApplicationContext;
...@@ -9,7 +18,9 @@ import org.springframework.stereotype.Component; ...@@ -9,7 +18,9 @@ import org.springframework.stereotype.Component;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.List;
@Slf4j
@Component @Component
@RequiredArgsConstructor @RequiredArgsConstructor
public class StartedUpRunner implements ApplicationRunner { public class StartedUpRunner implements ApplicationRunner {
...@@ -17,6 +28,12 @@ public class StartedUpRunner implements ApplicationRunner { ...@@ -17,6 +28,12 @@ public class StartedUpRunner implements ApplicationRunner {
private final ConfigurableApplicationContext context; private final ConfigurableApplicationContext context;
private final Environment environment; private final Environment environment;
@Autowired
private CronTaskRegistrar cronTaskRegistrar;
@Autowired
private ScheduleJobService scheduleJobService;
@Override @Override
public void run(ApplicationArguments args) { public void run(ApplicationArguments args) {
if (context.isActive()) { if (context.isActive()) {
...@@ -26,6 +43,15 @@ public class StartedUpRunner implements ApplicationRunner { ...@@ -26,6 +43,15 @@ public class StartedUpRunner implements ApplicationRunner {
"端口号:" + environment.getProperty("server.port") + "\n" + "端口号:" + environment.getProperty("server.port") + "\n" +
"-----------------------------------------"; "-----------------------------------------";
System.out.println(banner); System.out.println(banner);
List<ScheduleJobEntity> list = scheduleJobService.list(Wrappers.<ScheduleJobEntity>lambdaQuery().eq(ScheduleJobEntity::getStatus, DataConstant.TrueOrFalse.TRUE.getKey()));
if (CollUtil.isNotEmpty(list)) {
list.stream().forEach(job -> {
SchedulingRunnable task = new SchedulingRunnable(job.getBeanName(), job.getMethodName(), job.getMethodParams());
cronTaskRegistrar.addCronTask(task, job.getCronExpression());
});
}
log.info("定时任务已加载完毕...");
} }
} }
} }
package cn.datax.service.data.quality.controller;
import cn.datax.common.core.JsonPage;
import cn.datax.common.core.R;
import cn.datax.service.data.quality.api.entity.CheckReportEntity;
import cn.datax.service.data.quality.api.vo.CheckReportVo;
import cn.datax.service.data.quality.api.query.CheckReportQuery;
import cn.datax.service.data.quality.mapstruct.CheckReportMapper;
import cn.datax.service.data.quality.service.CheckReportService;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import cn.datax.common.base.BaseController;
import java.util.List;
import java.util.stream.Collectors;
/**
* <p>
* 核查报告信息表 前端控制器
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Api(tags = {"核查报告信息表"})
@RestController
@RequestMapping("/checkReports")
public class CheckReportController extends BaseController {
@Autowired
private CheckReportService checkReportService;
@Autowired
private CheckReportMapper checkReportMapper;
/**
* 通过ID查询信息
*
* @param id
* @return
*/
@ApiOperation(value = "获取详细信息", notes = "根据url的id来获取详细信息")
@ApiImplicitParam(name = "id", value = "ID", required = true, dataType = "String", paramType = "path")
@GetMapping("/{id}")
public R getCheckReportById(@PathVariable String id) {
CheckReportEntity checkReportEntity = checkReportService.getCheckReportById(id);
return R.ok().setData(checkReportMapper.toVO(checkReportEntity));
}
/**
* 分页查询信息
*
* @param checkReportQuery
* @return
*/
@ApiOperation(value = "分页查询", notes = "")
@ApiImplicitParams({
@ApiImplicitParam(name = "checkReportQuery", value = "查询实体checkReportQuery", required = true, dataTypeClass = CheckReportQuery.class)
})
@GetMapping("/page")
public R getCheckReportPage(CheckReportQuery checkReportQuery) {
QueryWrapper<CheckReportEntity> queryWrapper = new QueryWrapper<>();
queryWrapper.eq(StrUtil.isNotBlank(checkReportQuery.getRuleTypeId()), "r.rule_type_id", checkReportQuery.getRuleTypeId());
queryWrapper.like(StrUtil.isNotBlank(checkReportQuery.getRuleName()), "r.rule_name", checkReportQuery.getRuleName());
queryWrapper.like(StrUtil.isNotBlank(checkReportQuery.getRuleSource()), "r.rule_source", checkReportQuery.getRuleSource());
queryWrapper.like(StrUtil.isNotBlank(checkReportQuery.getRuleTable()), "r.rule_table", checkReportQuery.getRuleTable());
queryWrapper.like(StrUtil.isNotBlank(checkReportQuery.getRuleColumn()), "r.rule_column", checkReportQuery.getRuleColumn());
// 确定唯一核查报告
queryWrapper.eq("c.check_batch", "r.last_check_batch");
IPage<CheckReportEntity> page = checkReportService.page(new Page<>(checkReportQuery.getPageNum(), checkReportQuery.getPageSize()), queryWrapper);
List<CheckReportVo> collect = page.getRecords().stream().map(checkReportMapper::toVO).collect(Collectors.toList());
JsonPage<CheckReportVo> jsonPage = new JsonPage<>(page.getCurrent(), page.getSize(), page.getTotal(), collect);
return R.ok().setData(jsonPage);
}
}
package cn.datax.service.data.quality.controller;
import cn.datax.common.core.JsonPage;
import cn.datax.common.core.R;
import cn.datax.common.validate.ValidationGroups;
import cn.datax.service.data.quality.api.dto.CheckRuleDto;
import cn.datax.service.data.quality.api.entity.CheckRuleEntity;
import cn.datax.service.data.quality.api.vo.CheckRuleVo;
import cn.datax.service.data.quality.api.query.CheckRuleQuery;
import cn.datax.service.data.quality.mapstruct.CheckRuleMapper;
import cn.datax.service.data.quality.service.CheckRuleService;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import cn.datax.common.base.BaseController;
import java.util.List;
import java.util.stream.Collectors;
/**
* <p>
* 核查规则信息表 前端控制器
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Api(tags = {"核查规则信息表"})
@RestController
@RequestMapping("/checkRules")
public class CheckRuleController extends BaseController {
@Autowired
private CheckRuleService checkRuleService;
@Autowired
private CheckRuleMapper checkRuleMapper;
/**
* 通过ID查询信息
*
* @param id
* @return
*/
@ApiOperation(value = "获取详细信息", notes = "根据url的id来获取详细信息")
@ApiImplicitParam(name = "id", value = "ID", required = true, dataType = "String", paramType = "path")
@GetMapping("/{id}")
public R getCheckRuleById(@PathVariable String id) {
CheckRuleEntity checkRuleEntity = checkRuleService.getCheckRuleById(id);
return R.ok().setData(checkRuleMapper.toVO(checkRuleEntity));
}
/**
* 分页查询信息
*
* @param checkRuleQuery
* @return
*/
@ApiOperation(value = "分页查询", notes = "")
@ApiImplicitParams({
@ApiImplicitParam(name = "checkRuleQuery", value = "查询实体checkRuleQuery", required = true, dataTypeClass = CheckRuleQuery.class)
})
@GetMapping("/page")
public R getCheckRulePage(CheckRuleQuery checkRuleQuery) {
QueryWrapper<CheckRuleEntity> queryWrapper = new QueryWrapper<>();
queryWrapper.eq(StrUtil.isNotBlank(checkRuleQuery.getRuleTypeId()), "r.rule_type_id", checkRuleQuery.getRuleTypeId());
queryWrapper.like(StrUtil.isNotBlank(checkRuleQuery.getRuleName()), "r.rule_name", checkRuleQuery.getRuleName());
queryWrapper.like(StrUtil.isNotBlank(checkRuleQuery.getRuleSource()), "r.rule_source", checkRuleQuery.getRuleSource());
queryWrapper.like(StrUtil.isNotBlank(checkRuleQuery.getRuleTable()), "r.rule_table", checkRuleQuery.getRuleTable());
queryWrapper.like(StrUtil.isNotBlank(checkRuleQuery.getRuleColumn()), "r.rule_column", checkRuleQuery.getRuleColumn());
IPage<CheckRuleEntity> page = checkRuleService.page(new Page<>(checkRuleQuery.getPageNum(), checkRuleQuery.getPageSize()), queryWrapper);
List<CheckRuleVo> collect = page.getRecords().stream().map(checkRuleMapper::toVO).collect(Collectors.toList());
JsonPage<CheckRuleVo> jsonPage = new JsonPage<>(page.getCurrent(), page.getSize(), page.getTotal(), collect);
return R.ok().setData(jsonPage);
}
/**
* 添加
* @param checkRule
* @return
*/
@ApiOperation(value = "添加信息", notes = "根据checkRule对象添加信息")
@ApiImplicitParam(name = "checkRule", value = "详细实体checkRule", required = true, dataType = "CheckRuleDto")
@PostMapping()
public R saveCheckRule(@RequestBody @Validated({ValidationGroups.Insert.class}) CheckRuleDto checkRule) {
CheckRuleEntity checkRuleEntity = checkRuleService.saveCheckRule(checkRule);
return R.ok().setData(checkRuleMapper.toVO(checkRuleEntity));
}
/**
* 修改
* @param checkRule
* @return
*/
@ApiOperation(value = "修改信息", notes = "根据url的id来指定修改对象,并根据传过来的信息来修改详细信息")
@ApiImplicitParams({
@ApiImplicitParam(name = "id", value = "ID", required = true, dataType = "String", paramType = "path"),
@ApiImplicitParam(name = "checkRule", value = "详细实体checkRule", required = true, dataType = "CheckRuleDto")
})
@PutMapping("/{id}")
public R updateCheckRule(@PathVariable String id, @RequestBody @Validated({ValidationGroups.Update.class}) CheckRuleDto checkRule) {
CheckRuleEntity checkRuleEntity = checkRuleService.updateCheckRule(checkRule);
return R.ok().setData(checkRuleMapper.toVO(checkRuleEntity));
}
/**
* 删除
* @param id
* @return
*/
@ApiOperation(value = "删除", notes = "根据url的id来指定删除对象")
@ApiImplicitParam(name = "id", value = "ID", required = true, dataType = "String", paramType = "path")
@DeleteMapping("/{id}")
public R deleteCheckRuleById(@PathVariable String id) {
checkRuleService.deleteCheckRuleById(id);
return R.ok();
}
/**
* 批量删除
* @param ids
* @return
*/
@ApiOperation(value = "批量删除角色", notes = "根据url的ids来批量删除对象")
@ApiImplicitParam(name = "ids", value = "ID集合", required = true, dataType = "List", paramType = "path")
@DeleteMapping("/batch/{ids}")
public R deleteCheckRuleBatch(@PathVariable List<String> ids) {
checkRuleService.deleteCheckRuleBatch(ids);
return R.ok();
}
}
package cn.datax.service.data.quality.controller;
import cn.datax.common.core.JsonPage;
import cn.datax.common.core.R;
import cn.datax.service.data.quality.api.entity.RuleTypeEntity;
import cn.datax.service.data.quality.api.entity.RuleTypeReportEntity;
import cn.datax.service.data.quality.api.vo.RuleTypeVo;
import cn.datax.service.data.quality.api.query.RuleTypeQuery;
import cn.datax.service.data.quality.mapstruct.RuleTypeMapper;
import cn.datax.service.data.quality.service.RuleTypeService;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import cn.datax.common.base.BaseController;
import java.util.List;
import java.util.stream.Collectors;
/**
* <p>
* 规则类型信息表 前端控制器
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Api(tags = {"规则类型信息表"})
@RestController
@RequestMapping("/ruleTypes")
public class RuleTypeController extends BaseController {
@Autowired
private RuleTypeService ruleTypeService;
@Autowired
private RuleTypeMapper ruleTypeMapper;
/**
* 通过ID查询信息
*
* @param id
* @return
*/
@ApiOperation(value = "获取详细信息", notes = "根据url的id来获取详细信息")
@ApiImplicitParam(name = "id", value = "ID", required = true, dataType = "String", paramType = "path")
@GetMapping("/{id}")
public R getRuleTypeById(@PathVariable String id) {
RuleTypeEntity ruleTypeEntity = ruleTypeService.getRuleTypeById(id);
return R.ok().setData(ruleTypeMapper.toVO(ruleTypeEntity));
}
@ApiOperation(value = "获取列表", notes = "")
@GetMapping("/list")
public R getRuleTypeList() {
List<RuleTypeEntity> list = ruleTypeService.list(Wrappers.emptyWrapper());
return R.ok().setData(list);
}
@ApiOperation(value = "获取列表", notes = "")
@GetMapping("/report/list")
public R getRuleTypeListForReport() {
List<RuleTypeReportEntity> list = ruleTypeService.getRuleTypeListForReport();
return R.ok().setData(list);
}
/**
* 分页查询信息
*
* @param ruleTypeQuery
* @return
*/
@ApiOperation(value = "分页查询", notes = "")
@ApiImplicitParams({
@ApiImplicitParam(name = "ruleTypeQuery", value = "查询实体ruleTypeQuery", required = true, dataTypeClass = RuleTypeQuery.class)
})
@GetMapping("/page")
public R getRuleTypePage(RuleTypeQuery ruleTypeQuery) {
QueryWrapper<RuleTypeEntity> queryWrapper = new QueryWrapper<>();
queryWrapper.like(StrUtil.isNotBlank(ruleTypeQuery.getName()), "name", ruleTypeQuery.getName());
IPage<RuleTypeEntity> page = ruleTypeService.page(new Page<>(ruleTypeQuery.getPageNum(), ruleTypeQuery.getPageSize()), queryWrapper);
List<RuleTypeVo> collect = page.getRecords().stream().map(ruleTypeMapper::toVO).collect(Collectors.toList());
JsonPage<RuleTypeVo> jsonPage = new JsonPage<>(page.getCurrent(), page.getSize(), page.getTotal(), collect);
return R.ok().setData(jsonPage);
}
}
package cn.datax.service.data.quality.controller;
import cn.datax.common.core.JsonPage;
import cn.datax.common.core.R;
import cn.datax.service.data.quality.api.entity.ScheduleJobEntity;
import cn.datax.service.data.quality.api.vo.ScheduleJobVo;
import cn.datax.service.data.quality.api.query.ScheduleJobQuery;
import cn.datax.service.data.quality.mapstruct.ScheduleJobMapper;
import cn.datax.service.data.quality.service.ScheduleJobService;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import cn.datax.common.base.BaseController;
import java.util.List;
import java.util.stream.Collectors;
/**
* <p>
* 数据质量监控任务信息表 前端控制器
* </p>
*
* @author yuwei
* @since 2020-09-29
*/
@Api(tags = {"数据质量监控任务信息表"})
@RestController
@RequestMapping("/scheduleJobs")
public class ScheduleJobController extends BaseController {
@Autowired
private ScheduleJobService scheduleJobService;
@Autowired
private ScheduleJobMapper scheduleJobMapper;
/**
* 通过ID查询信息
*
* @param id
* @return
*/
@ApiOperation(value = "获取详细信息", notes = "根据url的id来获取详细信息")
@ApiImplicitParam(name = "id", value = "ID", required = true, dataType = "String", paramType = "path")
@GetMapping("/{id}")
public R getScheduleJobById(@PathVariable String id) {
ScheduleJobEntity scheduleJobEntity = scheduleJobService.getScheduleJobById(id);
return R.ok().setData(scheduleJobMapper.toVO(scheduleJobEntity));
}
/**
* 分页查询信息
*
* @param scheduleJobQuery
* @return
*/
@ApiOperation(value = "分页查询", notes = "")
@ApiImplicitParams({
@ApiImplicitParam(name = "scheduleJobQuery", value = "查询实体scheduleJobQuery", required = true, dataTypeClass = ScheduleJobQuery.class)
})
@GetMapping("/page")
public R getScheduleJobPage(ScheduleJobQuery scheduleJobQuery) {
QueryWrapper<ScheduleJobEntity> queryWrapper = new QueryWrapper<>();
IPage<ScheduleJobEntity> page = scheduleJobService.page(new Page<>(scheduleJobQuery.getPageNum(), scheduleJobQuery.getPageSize()), queryWrapper);
List<ScheduleJobVo> collect = page.getRecords().stream().map(scheduleJobMapper::toVO).collect(Collectors.toList());
JsonPage<ScheduleJobVo> jsonPage = new JsonPage<>(page.getCurrent(), page.getSize(), page.getTotal(), collect);
return R.ok().setData(jsonPage);
}
/**
* 暂停任务
* @param id
* @return
*/
@ApiOperation(value = "暂停任务", notes = "根据url的id来暂停指定任务")
@ApiImplicitParam(name = "id", value = "ID", required = true, dataType = "String", paramType = "path")
@PostMapping("/pause/{id}")
public R pauseScheduleJobById(@PathVariable("id") String id) {
scheduleJobService.pauseScheduleJobById(id);
return R.ok();
}
/**
* 恢复任务
* @param id
* @return
*/
@ApiOperation(value = "恢复任务", notes = "根据url的id来恢复指定任务")
@ApiImplicitParam(name = "id", value = "ID", required = true, dataType = "String", paramType = "path")
@PostMapping("/resume/{id}")
public R resumeScheduleJobById(@PathVariable("id") String id) {
scheduleJobService.resumeScheduleJobById(id);
return R.ok();
}
}
package cn.datax.service.data.quality.dao;
import cn.datax.common.base.BaseDao;
import cn.datax.service.data.quality.api.entity.CheckReportEntity;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.Constants;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
/**
* <p>
* 核查报告信息表 Mapper 接口
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Mapper
public interface CheckReportDao extends BaseDao<CheckReportEntity> {
@Override
<E extends IPage<CheckReportEntity>> E selectPage(E page, @Param(Constants.WRAPPER) Wrapper<CheckReportEntity> queryWrapper);
}
package cn.datax.service.data.quality.dao;
import cn.datax.common.base.BaseDao;
import cn.datax.service.data.quality.api.entity.CheckRuleEntity;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.Constants;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
/**
* <p>
* 核查规则信息表 Mapper 接口
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Mapper
public interface CheckRuleDao extends BaseDao<CheckRuleEntity> {
@Override
<E extends IPage<CheckRuleEntity>> E selectPage(E page, @Param(Constants.WRAPPER) Wrapper<CheckRuleEntity> queryWrapper);
}
package cn.datax.service.data.quality.dao;
import cn.datax.common.base.BaseDao;
import cn.datax.service.data.quality.api.entity.RuleTypeEntity;
import cn.datax.service.data.quality.api.entity.RuleTypeReportEntity;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
/**
* <p>
* 规则类型信息表 Mapper 接口
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Mapper
public interface RuleTypeDao extends BaseDao<RuleTypeEntity> {
List<RuleTypeReportEntity> selectListForReport();
}
package cn.datax.service.data.quality.dao;
import cn.datax.common.base.BaseDao;
import cn.datax.service.data.quality.api.entity.ScheduleJobEntity;
import org.apache.ibatis.annotations.Mapper;
/**
* <p>
* 数据质量监控任务信息表 Mapper 接口
* </p>
*
* @author yuwei
* @since 2020-09-29
*/
@Mapper
public interface ScheduleJobDao extends BaseDao<ScheduleJobEntity> {
}
package cn.datax.service.data.quality.mapstruct;
import cn.datax.service.data.quality.api.entity.CheckReportEntity;
import cn.datax.service.data.quality.api.vo.CheckReportVo;
import org.mapstruct.Mapper;
import java.util.List;
/**
* <p>
* 核查报告信息表 Mapper 实体映射
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Mapper(componentModel = "spring")
public interface CheckReportMapper {
/**
* 将源对象转换为VO对象
* @param e
* @return D
*/
CheckReportVo toVO(CheckReportEntity e);
/**
* 将源对象集合转换为VO对象集合
* @param es
* @return List<D>
*/
List<CheckReportVo> toVO(List<CheckReportEntity> es);
}
package cn.datax.service.data.quality.mapstruct;
import cn.datax.common.mapstruct.EntityMapper;
import cn.datax.service.data.quality.api.dto.CheckRuleDto;
import cn.datax.service.data.quality.api.entity.CheckRuleEntity;
import cn.datax.service.data.quality.api.vo.CheckRuleVo;
import org.mapstruct.Mapper;
/**
* <p>
* 核查规则信息表 Mapper 实体映射
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Mapper(componentModel = "spring")
public interface CheckRuleMapper extends EntityMapper<CheckRuleDto, CheckRuleEntity, CheckRuleVo> {
}
package cn.datax.service.data.quality.mapstruct;
import cn.datax.service.data.quality.api.entity.RuleTypeEntity;
import cn.datax.service.data.quality.api.vo.RuleTypeVo;
import org.mapstruct.Mapper;
import java.util.List;
/**
* <p>
* 规则类型信息表 Mapper 实体映射
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Mapper(componentModel = "spring")
public interface RuleTypeMapper {
/**
* 将源对象转换为VO对象
* @param e
* @return D
*/
RuleTypeVo toVO(RuleTypeEntity e);
/**
* 将源对象集合转换为VO对象集合
* @param es
* @return List<D>
*/
List<RuleTypeVo> toVO(List<RuleTypeEntity> es);
}
package cn.datax.service.data.quality.mapstruct;
import cn.datax.service.data.quality.api.entity.ScheduleJobEntity;
import cn.datax.service.data.quality.api.vo.ScheduleJobVo;
import org.mapstruct.Mapper;
import java.util.List;
/**
* <p>
* 数据质量监控任务信息表 Mapper 实体映射
* </p>
*
* @author yuwei
* @since 2020-09-29
*/
@Mapper(componentModel = "spring")
public interface ScheduleJobMapper {
/**
* 将源对象转换为VO对象
* @param e
* @return D
*/
ScheduleJobVo toVO(ScheduleJobEntity e);
/**
* 将源对象集合转换为VO对象集合
* @param es
* @return List<D>
*/
List<ScheduleJobVo> toVO(List<ScheduleJobEntity> es);
}
package cn.datax.service.data.quality.schedule;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.config.CronTask;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class CronTaskRegistrar implements DisposableBean {
private final Map<Runnable, ScheduledTask> scheduledTasks = new ConcurrentHashMap<>(8);
@Autowired
private TaskScheduler taskScheduler;
public TaskScheduler getScheduler() {
return this.taskScheduler;
}
public void addCronTask(Runnable task, String cronExpression) {
addCronTask(new CronTask(task, cronExpression));
}
public void addCronTask(CronTask cronTask) {
if (cronTask != null) {
Runnable task = cronTask.getRunnable();
if (this.scheduledTasks.containsKey(task)) {
removeCronTask(task);
}
this.scheduledTasks.put(task, scheduleCronTask(cronTask));
}
}
public void removeCronTask(Runnable task) {
ScheduledTask scheduledTask = this.scheduledTasks.remove(task);
if (scheduledTask != null) {
scheduledTask.cancel();
}
}
public ScheduledTask scheduleCronTask(CronTask cronTask) {
ScheduledTask scheduledTask = new ScheduledTask();
scheduledTask.future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());
return scheduledTask;
}
@Override
public void destroy() {
for (ScheduledTask task : this.scheduledTasks.values()) {
task.cancel();
}
this.scheduledTasks.clear();
}
}
package cn.datax.service.data.quality.schedule;
import java.util.concurrent.ScheduledFuture;
public final class ScheduledTask {
volatile ScheduledFuture<?> future;
/**
* 取消定时任务
*/
public void cancel() {
ScheduledFuture<?> future = this.future;
if (future != null) {
future.cancel(true);
}
}
}
package cn.datax.service.data.quality.schedule;
import cn.datax.common.utils.SpringContextHolder;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Method;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@Slf4j
public class SchedulingRunnable implements Runnable {
private String beanName;
private String methodName;
private String params;
public SchedulingRunnable(String beanName, String methodName) {
this(beanName, methodName, null);
}
public SchedulingRunnable(String beanName, String methodName, String params) {
this.beanName = beanName;
this.methodName = methodName;
this.params = params;
}
@Override
public void run() {
log.info("定时任务开始执行 - bean:{},方法:{},参数:{}", beanName, methodName, params);
long startTime = System.currentTimeMillis();
Map map = new HashMap();
String batch;
try {
Object target = SpringContextHolder.getBean(beanName);
Method method = target.getClass().getDeclaredMethod(methodName, Map.class);
if (StrUtil.isNotEmpty(params)) {
map = new ObjectMapper().readValue(params, Map.class);
}
batch = DateUtil.format(LocalDateTime.now(), DatePattern.PURE_DATETIME_PATTERN);
map.put("batch", batch);
ReflectionUtils.makeAccessible(method);
method.invoke(target, map);
} catch (Exception ex) {
log.error(String.format("定时任务执行异常 - bean:%s,方法:%s,参数:%s ", beanName, methodName, params), ex);
}
long times = System.currentTimeMillis() - startTime;
log.info("定时任务执行结束 - bean:{},方法:{},参数:{},耗时:{} 毫秒", beanName, methodName, params, times);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SchedulingRunnable that = (SchedulingRunnable) o;
if (params == null) {
return beanName.equals(that.beanName) &&
methodName.equals(that.methodName) &&
that.params == null;
}
return beanName.equals(that.beanName) &&
methodName.equals(that.methodName) &&
params.equals(that.params);
}
@Override
public int hashCode() {
if (params == null) {
return Objects.hash(beanName, methodName);
}
return Objects.hash(beanName, methodName, params);
}
}
package cn.datax.service.data.quality.schedule.task;
import cn.datax.common.core.DataConstant;
import cn.datax.service.data.quality.api.entity.CheckRuleEntity;
import cn.datax.service.data.quality.service.CheckRuleService;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component("qualityTask")
public class QualityTask {
@Autowired
private CheckRuleService checkRuleService;
public void task(Map map) {
System.out.println("执行批次:" + map);
// 获取可执行的核查规则
List<CheckRuleEntity> list = checkRuleService.list(Wrappers.<CheckRuleEntity>lambdaQuery().eq(CheckRuleEntity::getStatus, DataConstant.TrueOrFalse.TRUE.getKey()));
int poolSize = list.size();
// 定义固定长度的线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(16),
new BasicThreadFactory.Builder().namingPattern("executor-schedule-pool-%d").daemon(true).build());
// 定义计数器
final CountDownLatch latch = new CountDownLatch(poolSize);
list.stream().forEach(s -> {
threadPoolExecutor.execute(() -> {
log.info(s.getRuleName() + ":" + LocalDateTime.now());
latch.countDown();
});
});
// 主线程阻塞,等待所有子线程执行完成
try {
latch.await();
} catch (InterruptedException e) {}
// 关闭线程池
threadPoolExecutor.shutdown();
}
}
package cn.datax.service.data.quality.service;
import cn.datax.service.data.quality.api.entity.CheckReportEntity;
import cn.datax.common.base.BaseService;
/**
* <p>
* 核查报告信息表 服务类
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
public interface CheckReportService extends BaseService<CheckReportEntity> {
CheckReportEntity getCheckReportById(String id);
}
package cn.datax.service.data.quality.service;
import cn.datax.service.data.quality.api.entity.CheckRuleEntity;
import cn.datax.service.data.quality.api.dto.CheckRuleDto;
import cn.datax.common.base.BaseService;
import java.util.List;
/**
* <p>
* 核查规则信息表 服务类
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
public interface CheckRuleService extends BaseService<CheckRuleEntity> {
CheckRuleEntity saveCheckRule(CheckRuleDto checkRule);
CheckRuleEntity updateCheckRule(CheckRuleDto checkRule);
CheckRuleEntity getCheckRuleById(String id);
void deleteCheckRuleById(String id);
void deleteCheckRuleBatch(List<String> ids);
}
package cn.datax.service.data.quality.service;
import cn.datax.service.data.quality.api.entity.RuleTypeEntity;
import cn.datax.common.base.BaseService;
import cn.datax.service.data.quality.api.entity.RuleTypeReportEntity;
import java.util.List;
/**
* <p>
* 规则类型信息表 服务类
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
public interface RuleTypeService extends BaseService<RuleTypeEntity> {
RuleTypeEntity getRuleTypeById(String id);
List<RuleTypeReportEntity> getRuleTypeListForReport();
}
package cn.datax.service.data.quality.service;
import cn.datax.service.data.quality.api.entity.ScheduleJobEntity;
import cn.datax.common.base.BaseService;
/**
* <p>
* 数据质量监控任务信息表 服务类
* </p>
*
* @author yuwei
* @since 2020-09-29
*/
public interface ScheduleJobService extends BaseService<ScheduleJobEntity> {
ScheduleJobEntity getScheduleJobById(String id);
void pauseScheduleJobById(String id);
void resumeScheduleJobById(String id);
}
package cn.datax.service.data.quality.service.impl;
import cn.datax.service.data.quality.api.entity.CheckReportEntity;
import cn.datax.service.data.quality.service.CheckReportService;
import cn.datax.service.data.quality.mapstruct.CheckReportMapper;
import cn.datax.service.data.quality.dao.CheckReportDao;
import cn.datax.common.base.BaseServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
/**
* <p>
* 核查报告信息表 服务实现类
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Service
@Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class)
public class CheckReportServiceImpl extends BaseServiceImpl<CheckReportDao, CheckReportEntity> implements CheckReportService {
@Autowired
private CheckReportDao checkReportDao;
@Autowired
private CheckReportMapper checkReportMapper;
@Override
public CheckReportEntity getCheckReportById(String id) {
CheckReportEntity checkReportEntity = super.getById(id);
return checkReportEntity;
}
}
package cn.datax.service.data.quality.service.impl;
import cn.datax.service.data.quality.api.entity.CheckRuleEntity;
import cn.datax.service.data.quality.api.dto.CheckRuleDto;
import cn.datax.service.data.quality.service.CheckRuleService;
import cn.datax.service.data.quality.mapstruct.CheckRuleMapper;
import cn.datax.service.data.quality.dao.CheckRuleDao;
import cn.datax.common.base.BaseServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
/**
* <p>
* 核查规则信息表 服务实现类
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Service
@Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class)
public class CheckRuleServiceImpl extends BaseServiceImpl<CheckRuleDao, CheckRuleEntity> implements CheckRuleService {
@Autowired
private CheckRuleDao checkRuleDao;
@Autowired
private CheckRuleMapper checkRuleMapper;
@Override
@Transactional(rollbackFor = Exception.class)
public CheckRuleEntity saveCheckRule(CheckRuleDto checkRuleDto) {
CheckRuleEntity checkRule = checkRuleMapper.toEntity(checkRuleDto);
checkRuleDao.insert(checkRule);
return checkRule;
}
@Override
@Transactional(rollbackFor = Exception.class)
public CheckRuleEntity updateCheckRule(CheckRuleDto checkRuleDto) {
CheckRuleEntity checkRule = checkRuleMapper.toEntity(checkRuleDto);
checkRuleDao.updateById(checkRule);
return checkRule;
}
@Override
public CheckRuleEntity getCheckRuleById(String id) {
CheckRuleEntity checkRuleEntity = super.getById(id);
return checkRuleEntity;
}
@Override
@Transactional(rollbackFor = Exception.class)
public void deleteCheckRuleById(String id) {
checkRuleDao.deleteById(id);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void deleteCheckRuleBatch(List<String> ids) {
checkRuleDao.deleteBatchIds(ids);
}
}
package cn.datax.service.data.quality.service.impl;
import cn.datax.service.data.quality.api.entity.RuleTypeEntity;
import cn.datax.service.data.quality.api.entity.RuleTypeReportEntity;
import cn.datax.service.data.quality.service.RuleTypeService;
import cn.datax.service.data.quality.mapstruct.RuleTypeMapper;
import cn.datax.service.data.quality.dao.RuleTypeDao;
import cn.datax.common.base.BaseServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
/**
* <p>
* 规则类型信息表 服务实现类
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Service
@Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class)
public class RuleTypeServiceImpl extends BaseServiceImpl<RuleTypeDao, RuleTypeEntity> implements RuleTypeService {
@Autowired
private RuleTypeDao ruleTypeDao;
@Autowired
private RuleTypeMapper ruleTypeMapper;
@Override
public RuleTypeEntity getRuleTypeById(String id) {
RuleTypeEntity ruleTypeEntity = super.getById(id);
return ruleTypeEntity;
}
@Override
public List<RuleTypeReportEntity> getRuleTypeListForReport() {
List<RuleTypeReportEntity> list = ruleTypeDao.selectListForReport();
return list;
}
}
package cn.datax.service.data.quality.service.impl;
import cn.datax.common.core.DataConstant;
import cn.datax.service.data.quality.api.entity.ScheduleJobEntity;
import cn.datax.service.data.quality.schedule.CronTaskRegistrar;
import cn.datax.service.data.quality.schedule.SchedulingRunnable;
import cn.datax.service.data.quality.service.ScheduleJobService;
import cn.datax.service.data.quality.mapstruct.ScheduleJobMapper;
import cn.datax.service.data.quality.dao.ScheduleJobDao;
import cn.datax.common.base.BaseServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
/**
* <p>
* 数据质量监控任务信息表 服务实现类
* </p>
*
* @author yuwei
* @since 2020-09-29
*/
@Service
@Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class)
public class ScheduleJobServiceImpl extends BaseServiceImpl<ScheduleJobDao, ScheduleJobEntity> implements ScheduleJobService {
@Autowired
private ScheduleJobDao scheduleJobDao;
@Autowired
private ScheduleJobMapper scheduleJobMapper;
@Autowired
private CronTaskRegistrar cronTaskRegistrar;
@Override
public ScheduleJobEntity getScheduleJobById(String id) {
ScheduleJobEntity scheduleJobEntity = super.getById(id);
return scheduleJobEntity;
}
@Override
public void pauseScheduleJobById(String id) {
ScheduleJobEntity scheduleJobEntity = super.getById(id);
SchedulingRunnable task = new SchedulingRunnable(scheduleJobEntity.getBeanName(), scheduleJobEntity.getMethodName(), scheduleJobEntity.getMethodParams());
cronTaskRegistrar.removeCronTask(task);
scheduleJobEntity.setStatus(DataConstant.TrueOrFalse.FALSE.getKey());
scheduleJobDao.updateById(scheduleJobEntity);
}
@Override
public void resumeScheduleJobById(String id) {
ScheduleJobEntity scheduleJobEntity = super.getById(id);
SchedulingRunnable task = new SchedulingRunnable(scheduleJobEntity.getBeanName(), scheduleJobEntity.getMethodName(), scheduleJobEntity.getMethodParams());
cronTaskRegistrar.addCronTask(task, scheduleJobEntity.getCronExpression());
scheduleJobEntity.setStatus(DataConstant.TrueOrFalse.TRUE.getKey());
scheduleJobDao.updateById(scheduleJobEntity);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.datax.service.data.quality.dao.CheckReportDao">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="cn.datax.service.data.quality.api.entity.CheckReportEntity">
<result column="id" property="id" />
<result column="check_rule_id" property="checkRuleId" />
<result column="check_date" property="checkDate" />
<result column="check_result" property="checkResult" />
<result column="check_total_count" property="checkTotalCount" />
<result column="check_error_count" property="checkErrorCount" />
<result column="check_batch" property="checkBatch" />
</resultMap>
<resultMap id="ExtendResultMap" type="cn.datax.service.data.quality.api.entity.CheckReportEntity" extends="BaseResultMap">
<result column="rule_name" property="ruleName" />
<result column="rule_type" property="ruleType" />
<result column="rule_source" property="ruleSource" />
<result column="rule_table" property="ruleTable" />
<result column="rule_column" property="ruleColumn" />
</resultMap>
<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id,
check_rule_id, check_date, check_result, check_total_count, check_error_count, check_batch
</sql>
<sql id="Report_Column_List">
${alias}.id,
${alias}.check_rule_id, ${alias}.check_date, ${alias}.check_result, ${alias}.check_total_count, ${alias}.check_error_count, ${alias}.check_batch
</sql>
<select id="selectPage" resultMap="ExtendResultMap">
SELECT r.rule_name, t.name as rule_type, r.rule_source, r.rule_table, r.rule_column,
<include refid="Report_Column_List"><property name="alias" value="c"/></include>
FROM quality_check_report c
LEFT JOIN quality_check_rule r ON r.id = c.check_rule_id
LEFT JOIN quality_rule_type t ON t.id = r.rule_type_id
${ew.customSqlSegment}
</select>
</mapper>
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.datax.service.data.quality.dao.CheckRuleDao">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="cn.datax.service.data.quality.api.entity.CheckRuleEntity">
<result column="id" property="id" />
<result column="status" property="status" />
<result column="create_by" property="createBy" />
<result column="create_time" property="createTime" />
<result column="create_dept" property="createDept" />
<result column="update_by" property="updateBy" />
<result column="update_time" property="updateTime" />
<result column="remark" property="remark" />
<result column="rule_name" property="ruleName" />
<result column="rule_type_id" property="ruleTypeId" />
<result column="rule_type" property="ruleType" />
<result column="rule_level" property="ruleLevel" />
<result column="rule_source_id" property="ruleSourceId" />
<result column="rule_source" property="ruleSource" />
<result column="rule_table_id" property="ruleTableId" />
<result column="rule_table" property="ruleTable" />
<result column="rule_table_comment" property="ruleTableComment" />
<result column="rule_column_id" property="ruleColumnId" />
<result column="rule_column" property="ruleColumn" />
<result column="rule_column_comment" property="ruleColumnComment" />
<result column="rule_sql" property="ruleSql" />
<result column="last_check_batch" property="lastCheckBatch" />
</resultMap>
<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id,
status,
create_by,
create_time,
create_dept,
update_by,
update_time,
remark,
rule_name, rule_type_id, rule_level, rule_source_id, rule_source, rule_table_id, rule_table, rule_table_comment
rule_column_id, rule_column, rule_column_comment, rule_sql, last_check_batch
</sql>
<sql id="Rule_Column_List">
${alias}.id,
${alias}.status,
${alias}.create_by,
${alias}.create_time,
${alias}.create_dept,
${alias}.update_by,
${alias}.update_time,
${alias}.remark,
${alias}.rule_name, ${alias}.rule_type_id, ${alias}.rule_level, ${alias}.rule_source_id, ${alias}.rule_source,
${alias}.rule_table_id, ${alias}.rule_table, ${alias}.rule_table_comment, ${alias}.rule_column_id, ${alias}.rule_column, ${alias}.rule_column_comment, ${alias}.rule_sql, ${alias}.last_check_batch
</sql>
<select id="selectPage" resultMap="BaseResultMap">
SELECT t.name as rule_type,
<include refid="Rule_Column_List"><property name="alias" value="r"/></include>
FROM quality_check_rule r
LEFT JOIN quality_rule_type t ON t.id = r.rule_type_id
${ew.customSqlSegment}
</select>
</mapper>
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.datax.service.data.quality.dao.RuleTypeDao">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="cn.datax.service.data.quality.api.entity.RuleTypeEntity">
<result column="id" property="id" />
<result column="name" property="name" />
</resultMap>
<resultMap id="ExtendResultMap" type="cn.datax.service.data.quality.api.entity.RuleTypeReportEntity" extends="BaseResultMap">
<result column="check_error_count" property="checkErrorCount" />
</resultMap>
<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id,
name
</sql>
<select id="selectListForReport" resultMap="ExtendResultMap">
SELECT t.id, t.name,
(SELECT COALESCE(SUM(c.check_error_count), 0) FROM quality_check_rule r
LEFT JOIN quality_check_report c ON c.check_rule_id = r.id AND c.check_batch = r.last_check_batch
WHERE r.rule_type_id = t.id AND r.status = 1) AS check_error_count
FROM quality_rule_type t
</select>
</mapper>
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.datax.service.data.quality.dao.ScheduleJobDao">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="cn.datax.service.data.quality.api.entity.ScheduleJobEntity">
<result column="id" property="id" />
<result column="status" property="status" />
<result column="job_name" property="jobName" />
<result column="bean_name" property="beanName" />
<result column="method_name" property="methodName" />
<result column="method_params" property="methodParams" />
<result column="cron_expression" property="cronExpression" />
</resultMap>
<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id,
status,
job_name, bean_name, method_name, method_params, cron_expression
</sql>
</mapper>
package cn.datax.service.data.standard.api.dto;
import cn.datax.common.validate.ValidationGroups;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.io.Serializable;
/**
* <p>
* 字典对照信息表 实体DTO
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@ApiModel(value = "字典对照信息表Model")
@Data
public class ContrastDictDto implements Serializable {
private static final long serialVersionUID=1L;
@ApiModelProperty(value = "主键ID")
@NotBlank(message = "主键ID不能为空", groups = {ValidationGroups.Update.class})
private String id;
@ApiModelProperty(value = "字典对照主键")
private String contrastId;
@ApiModelProperty(value = "字典编码")
private String colCode;
@ApiModelProperty(value = "字典名称")
private String colName;
@ApiModelProperty(value = "状态")
@NotNull(message = "状态不能为空", groups = {ValidationGroups.Insert.class, ValidationGroups.Update.class})
private String status;
@ApiModelProperty(value = "备注")
private String remark;
}
package cn.datax.service.data.standard.api.dto;
import cn.datax.common.validate.ValidationGroups;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import javax.validation.constraints.NotBlank;
import java.io.Serializable;
/**
* <p>
* 对照表信息表 实体DTO
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@ApiModel(value = "对照表信息表Model")
@Data
public class ContrastDto implements Serializable {
private static final long serialVersionUID=1L;
@ApiModelProperty(value = "主键ID")
@NotBlank(message = "主键ID不能为空", groups = {ValidationGroups.Update.class})
private String id;
@ApiModelProperty(value = "数据源主键")
private String sourceId;
@ApiModelProperty(value = "数据源")
private String sourceName;
@ApiModelProperty(value = "数据表主键")
private String tableId;
@ApiModelProperty(value = "数据表")
private String tableName;
@ApiModelProperty(value = "数据表名称")
private String tableComment;
@ApiModelProperty(value = "对照字段主键")
private String columnId;
@ApiModelProperty(value = "对照字段")
private String columnName;
@ApiModelProperty(value = "对照字段名称")
private String columnComment;
@ApiModelProperty(value = "标准类别主键")
private String gbTypeId;
@ApiModelProperty(value = "绑定标准字段")
private String bindGbColumn;
}
package cn.datax.service.data.standard.api.entity;
import cn.datax.common.base.DataScopeBaseEntity;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
/**
* <p>
* 字典对照信息表
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Data
@EqualsAndHashCode(callSuper = true)
@Accessors(chain = true)
@TableName(value = "standard_contrast_dict", autoResultMap = true)
public class ContrastDictEntity extends DataScopeBaseEntity {
private static final long serialVersionUID=1L;
/**
* 字典对照主键
*/
private String contrastId;
/**
* 数据源
*/
@TableField(exist = false)
private String sourceName;
/**
* 数据表
*/
@TableField(exist = false)
private String tableName;
/**
* 对照字段
*/
@TableField(exist = false)
private String columnName;
/**
* 标准类别编码
*/
@TableField(exist = false)
private String gbTypeCode;
/**
* 标准类别名称
*/
@TableField(exist = false)
private String gbTypeName;
/**
* 字典编码
*/
private String colCode;
/**
* 字典名称
*/
private String colName;
/**
* 对照的标准字典
*/
private String contrastGbId;
/**
* 对照的标准编码
*/
@TableField(exist = false)
private String contrastGbCode;
/**
* 对照的标准名称
*/
@TableField(exist = false)
private String contrastGbName;
}
package cn.datax.service.data.standard.api.entity;
import cn.datax.common.base.DataScopeBaseEntity;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
/**
* <p>
* 对照表信息表
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Data
@EqualsAndHashCode(callSuper = true)
@Accessors(chain = true)
@TableName(value = "standard_contrast", autoResultMap = true)
public class ContrastEntity extends DataScopeBaseEntity {
private static final long serialVersionUID=1L;
/**
* 数据源主键
*/
private String sourceId;
/**
* 数据源
*/
private String sourceName;
/**
* 数据表主键
*/
private String tableId;
/**
* 数据表
*/
private String tableName;
/**
* 数据表名称
*/
private String tableComment;
/**
* 对照字段主键
*/
private String columnId;
/**
* 对照字段
*/
private String columnName;
/**
* 对照字段名称
*/
private String columnComment;
/**
* 标准类别主键
*/
private String gbTypeId;
/**
* 标准类别编码
*/
@TableField(exist = false)
private String gbTypeCode;
/**
* 标准类别名称
*/
@TableField(exist = false)
private String gbTypeName;
/**
* 绑定标准字段
*/
private String bindGbColumn;
}
package cn.datax.service.data.standard.api.query;
import cn.datax.common.base.BaseQueryParams;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* <p>
* 字典对照信息表 查询实体
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class ContrastDictQuery extends BaseQueryParams {
private static final long serialVersionUID=1L;
private String contrastId;
private String colCode;
private String colName;
}
package cn.datax.service.data.standard.api.query;
import cn.datax.common.base.BaseQueryParams;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* <p>
* 对照表信息表 查询实体
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class ContrastQuery extends BaseQueryParams {
private static final long serialVersionUID=1L;
}
package cn.datax.service.data.standard.api.vo;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* <p>
* 字典对照信息表 实体VO
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Data
public class ContrastDictVo implements Serializable {
private static final long serialVersionUID=1L;
private String id;
private String status;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private LocalDateTime createTime;
private String remark;
private String contrastId;
private String colCode;
private String colName;
private String contrastGbId;
private String contrastGbCode;
private String contrastGbName;
private String sourceName;
private String tableName;
private String columnName;
private String gbTypeCode;
private String gbTypeName;
}
package cn.datax.service.data.standard.api.vo;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
@Data
public class ContrastTreeVo implements Serializable {
private static final long serialVersionUID=1L;
private String id;
private String label;
private String name;
/**
* 数据
*/
private Object data;
private List<ContrastTreeVo> children;
}
package cn.datax.service.data.standard.api.vo;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* <p>
* 对照表信息表 实体VO
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Data
public class ContrastVo implements Serializable {
private static final long serialVersionUID=1L;
private String id;
private String status;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private LocalDateTime createTime;
private String remark;
private String sourceId;
private String sourceName;
private String tableId;
private String tableName;
private String tableComment;
private String columnId;
private String columnName;
private String columnComment;
private String gbTypeId;
private String gbTypeCode;
private String gbTypeName;
private String bindGbColumn;
}
...@@ -23,6 +23,7 @@ public class DictVo implements Serializable { ...@@ -23,6 +23,7 @@ public class DictVo implements Serializable {
private String status; private String status;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private LocalDateTime createTime; private LocalDateTime createTime;
private String remark;
private String typeId; private String typeId;
private String gbTypeCode; private String gbTypeCode;
private String gbTypeName; private String gbTypeName;
......
...@@ -23,7 +23,7 @@ public class TypeVo implements Serializable { ...@@ -23,7 +23,7 @@ public class TypeVo implements Serializable {
private String status; private String status;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private LocalDateTime createTime; private LocalDateTime createTime;
private String createDept; private String remark;
private String gbTypeCode; private String gbTypeCode;
private String gbTypeName; private String gbTypeName;
} }
package cn.datax.service.data.standard.controller;
import cn.datax.common.core.JsonPage;
import cn.datax.common.core.R;
import cn.datax.common.validate.ValidationGroups;
import cn.datax.service.data.standard.api.dto.ContrastDto;
import cn.datax.service.data.standard.api.entity.ContrastEntity;
import cn.datax.service.data.standard.api.vo.ContrastTreeVo;
import cn.datax.service.data.standard.api.vo.ContrastVo;
import cn.datax.service.data.standard.api.query.ContrastQuery;
import cn.datax.service.data.standard.mapstruct.ContrastMapper;
import cn.datax.service.data.standard.service.ContrastService;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import cn.datax.common.base.BaseController;
import java.util.List;
import java.util.stream.Collectors;
/**
* <p>
* 对照表信息表 前端控制器
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Api(tags = {"对照表信息表"})
@RestController
@RequestMapping("/contrasts")
public class ContrastController extends BaseController {
@Autowired
private ContrastService contrastService;
@Autowired
private ContrastMapper contrastMapper;
/**
* 通过ID查询信息
*
* @param id
* @return
*/
@ApiOperation(value = "获取详细信息", notes = "根据url的id来获取详细信息")
@ApiImplicitParam(name = "id", value = "ID", required = true, dataType = "String", paramType = "path")
@GetMapping("/{id}")
public R getContrastById(@PathVariable String id) {
ContrastEntity contrastEntity = contrastService.getContrastById(id);
return R.ok().setData(contrastMapper.toVO(contrastEntity));
}
/**
* 分页查询信息
*
* @param contrastQuery
* @return
*/
@ApiOperation(value = "分页查询", notes = "")
@ApiImplicitParams({
@ApiImplicitParam(name = "contrastQuery", value = "查询实体contrastQuery", required = true, dataTypeClass = ContrastQuery.class)
})
@GetMapping("/page")
public R getContrastPage(ContrastQuery contrastQuery) {
QueryWrapper<ContrastEntity> queryWrapper = new QueryWrapper<>();
IPage<ContrastEntity> page = contrastService.page(new Page<>(contrastQuery.getPageNum(), contrastQuery.getPageSize()), queryWrapper);
List<ContrastVo> collect = page.getRecords().stream().map(contrastMapper::toVO).collect(Collectors.toList());
JsonPage<ContrastVo> jsonPage = new JsonPage<>(page.getCurrent(), page.getSize(), page.getTotal(), collect);
return R.ok().setData(jsonPage);
}
/**
* 添加
* @param contrast
* @return
*/
@ApiOperation(value = "添加信息", notes = "根据contrast对象添加信息")
@ApiImplicitParam(name = "contrast", value = "详细实体contrast", required = true, dataType = "ContrastDto")
@PostMapping()
public R saveContrast(@RequestBody @Validated({ValidationGroups.Insert.class}) ContrastDto contrast) {
ContrastEntity contrastEntity = contrastService.saveContrast(contrast);
return R.ok().setData(contrastMapper.toVO(contrastEntity));
}
/**
* 修改
* @param contrast
* @return
*/
@ApiOperation(value = "修改信息", notes = "根据url的id来指定修改对象,并根据传过来的信息来修改详细信息")
@ApiImplicitParams({
@ApiImplicitParam(name = "id", value = "ID", required = true, dataType = "String", paramType = "path"),
@ApiImplicitParam(name = "contrast", value = "详细实体contrast", required = true, dataType = "ContrastDto")
})
@PutMapping("/{id}")
public R updateContrast(@PathVariable String id, @RequestBody @Validated({ValidationGroups.Update.class}) ContrastDto contrast) {
ContrastEntity contrastEntity = contrastService.updateContrast(contrast);
return R.ok().setData(contrastMapper.toVO(contrastEntity));
}
/**
* 删除
* @param id
* @return
*/
@ApiOperation(value = "删除", notes = "根据url的id来指定删除对象")
@ApiImplicitParam(name = "id", value = "ID", required = true, dataType = "String", paramType = "path")
@DeleteMapping("/{id}")
public R deleteContrastById(@PathVariable String id) {
contrastService.deleteContrastById(id);
return R.ok();
}
/**
* 批量删除
* @param ids
* @return
*/
@ApiOperation(value = "批量删除角色", notes = "根据url的ids来批量删除对象")
@ApiImplicitParam(name = "ids", value = "ID集合", required = true, dataType = "List", paramType = "path")
@DeleteMapping("/batch/{ids}")
public R deleteContrastBatch(@PathVariable List<String> ids) {
contrastService.deleteContrastBatch(ids);
return R.ok();
}
@GetMapping("/tree")
public R getContrastTree() {
List<ContrastTreeVo> list = contrastService.getContrastTree();
return R.ok().setData(list);
}
}
package cn.datax.service.data.standard.controller;
import cn.datax.common.core.JsonPage;
import cn.datax.common.core.R;
import cn.datax.common.validate.ValidationGroups;
import cn.datax.service.data.standard.api.dto.ContrastDictDto;
import cn.datax.service.data.standard.api.entity.ContrastDictEntity;
import cn.datax.service.data.standard.api.vo.ContrastDictVo;
import cn.datax.service.data.standard.api.query.ContrastDictQuery;
import cn.datax.service.data.standard.mapstruct.ContrastDictMapper;
import cn.datax.service.data.standard.service.ContrastDictService;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import cn.datax.common.base.BaseController;
import java.util.List;
import java.util.stream.Collectors;
/**
* <p>
* 字典对照信息表 前端控制器
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Api(tags = {"字典对照信息表"})
@RestController
@RequestMapping("/contrastDicts")
public class ContrastDictController extends BaseController {
@Autowired
private ContrastDictService contrastDictService;
@Autowired
private ContrastDictMapper contrastDictMapper;
/**
* 通过ID查询信息
*
* @param id
* @return
*/
@ApiOperation(value = "获取详细信息", notes = "根据url的id来获取详细信息")
@ApiImplicitParam(name = "id", value = "ID", required = true, dataType = "String", paramType = "path")
@GetMapping("/{id}")
public R getContrastDictById(@PathVariable String id) {
ContrastDictEntity contrastDictEntity = contrastDictService.getContrastDictById(id);
return R.ok().setData(contrastDictMapper.toVO(contrastDictEntity));
}
/**
* 分页查询信息
*
* @param contrastDictQuery
* @return
*/
@ApiOperation(value = "分页查询", notes = "")
@ApiImplicitParams({
@ApiImplicitParam(name = "contrastDictQuery", value = "查询实体contrastDictQuery", required = true, dataTypeClass = ContrastDictQuery.class)
})
@GetMapping("/page")
public R getContrastDictPage(ContrastDictQuery contrastDictQuery) {
QueryWrapper<ContrastDictEntity> queryWrapper = new QueryWrapper<>();
queryWrapper.eq(StrUtil.isNotBlank(contrastDictQuery.getContrastId()), "d.contrast_id", contrastDictQuery.getContrastId());
queryWrapper.like(StrUtil.isNotBlank(contrastDictQuery.getColCode()), "d.col_code", contrastDictQuery.getColCode());
queryWrapper.like(StrUtil.isNotBlank(contrastDictQuery.getColName()), "d.col_name", contrastDictQuery.getColName());
IPage<ContrastDictEntity> page = contrastDictService.page(new Page<>(contrastDictQuery.getPageNum(), contrastDictQuery.getPageSize()), queryWrapper);
List<ContrastDictVo> collect = page.getRecords().stream().map(contrastDictMapper::toVO).collect(Collectors.toList());
JsonPage<ContrastDictVo> jsonPage = new JsonPage<>(page.getCurrent(), page.getSize(), page.getTotal(), collect);
return R.ok().setData(jsonPage);
}
/**
* 添加
* @param contrastDict
* @return
*/
@ApiOperation(value = "添加信息", notes = "根据contrastDict对象添加信息")
@ApiImplicitParam(name = "contrastDict", value = "详细实体contrastDict", required = true, dataType = "ContrastDictDto")
@PostMapping()
public R saveContrastDict(@RequestBody @Validated({ValidationGroups.Insert.class}) ContrastDictDto contrastDict) {
ContrastDictEntity contrastDictEntity = contrastDictService.saveContrastDict(contrastDict);
return R.ok().setData(contrastDictMapper.toVO(contrastDictEntity));
}
/**
* 修改
* @param contrastDict
* @return
*/
@ApiOperation(value = "修改信息", notes = "根据url的id来指定修改对象,并根据传过来的信息来修改详细信息")
@ApiImplicitParams({
@ApiImplicitParam(name = "id", value = "ID", required = true, dataType = "String", paramType = "path"),
@ApiImplicitParam(name = "contrastDict", value = "详细实体contrastDict", required = true, dataType = "ContrastDictDto")
})
@PutMapping("/{id}")
public R updateContrastDict(@PathVariable String id, @RequestBody @Validated({ValidationGroups.Update.class}) ContrastDictDto contrastDict) {
ContrastDictEntity contrastDictEntity = contrastDictService.updateContrastDict(contrastDict);
return R.ok().setData(contrastDictMapper.toVO(contrastDictEntity));
}
/**
* 删除
* @param id
* @return
*/
@ApiOperation(value = "删除", notes = "根据url的id来指定删除对象")
@ApiImplicitParam(name = "id", value = "ID", required = true, dataType = "String", paramType = "path")
@DeleteMapping("/{id}")
public R deleteContrastDictById(@PathVariable String id) {
contrastDictService.deleteContrastDictById(id);
return R.ok();
}
/**
* 批量删除
* @param ids
* @return
*/
@ApiOperation(value = "批量删除角色", notes = "根据url的ids来批量删除对象")
@ApiImplicitParam(name = "ids", value = "ID集合", required = true, dataType = "List", paramType = "path")
@DeleteMapping("/batch/{ids}")
public R deleteContrastDictBatch(@PathVariable List<String> ids) {
contrastDictService.deleteContrastDictBatch(ids);
return R.ok();
}
}
package cn.datax.service.data.standard.controller;
import cn.datax.common.base.BaseController;
import cn.datax.common.core.R;
import cn.datax.service.data.standard.service.DictMappingService;
import io.swagger.annotations.Api;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
@Api(tags = {"字典对照映射"})
@RestController
@RequestMapping("/mappings")
public class DictMappingController extends BaseController {
@Autowired
private DictMappingService dictMappingService;
@GetMapping("/{id}")
public R getDictMapping(@PathVariable String id) {
Map<String, Object> map = dictMappingService.getDictMapping(id);
return R.ok().setData(map);
}
@PostMapping("/auto/{id}")
public R dictAutoMapping(@PathVariable String id) {
dictMappingService.dictAutoMapping(id);
return R.ok();
}
@PostMapping("/manual/{id}")
public R dictManualMapping(@PathVariable String id) {
dictMappingService.dictManualMapping(id);
return R.ok();
}
}
package cn.datax.service.data.standard.dao;
import cn.datax.common.base.BaseDao;
import cn.datax.service.data.standard.api.entity.ContrastEntity;
import org.apache.ibatis.annotations.Mapper;
import java.io.Serializable;
/**
* <p>
* 对照表信息表 Mapper 接口
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Mapper
public interface ContrastDao extends BaseDao<ContrastEntity> {
@Override
ContrastEntity selectById(Serializable id);
}
package cn.datax.service.data.standard.dao;
import cn.datax.common.base.BaseDao;
import cn.datax.service.data.standard.api.entity.ContrastDictEntity;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.Constants;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* <p>
* 字典对照信息表 Mapper 接口
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Mapper
public interface ContrastDictDao extends BaseDao<ContrastDictEntity> {
@Override
List<ContrastDictEntity> selectList(@Param(Constants.WRAPPER) Wrapper<ContrastDictEntity> queryWrapper);
@Override
<E extends IPage<ContrastDictEntity>> E selectPage(E page, @Param(Constants.WRAPPER) Wrapper<ContrastDictEntity> queryWrapper);
}
package cn.datax.service.data.standard.mapstruct;
import cn.datax.common.mapstruct.EntityMapper;
import cn.datax.service.data.standard.api.dto.ContrastDictDto;
import cn.datax.service.data.standard.api.entity.ContrastDictEntity;
import cn.datax.service.data.standard.api.vo.ContrastDictVo;
import org.mapstruct.Mapper;
/**
* <p>
* 字典对照信息表 Mapper 实体映射
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Mapper(componentModel = "spring")
public interface ContrastDictMapper extends EntityMapper<ContrastDictDto, ContrastDictEntity, ContrastDictVo> {
}
package cn.datax.service.data.standard.mapstruct;
import cn.datax.common.mapstruct.EntityMapper;
import cn.datax.service.data.standard.api.dto.ContrastDto;
import cn.datax.service.data.standard.api.entity.ContrastEntity;
import cn.datax.service.data.standard.api.vo.ContrastVo;
import org.mapstruct.Mapper;
/**
* <p>
* 对照表信息表 Mapper 实体映射
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Mapper(componentModel = "spring")
public interface ContrastMapper extends EntityMapper<ContrastDto, ContrastEntity, ContrastVo> {
}
package cn.datax.service.data.standard.service;
import cn.datax.service.data.standard.api.entity.ContrastDictEntity;
import cn.datax.service.data.standard.api.dto.ContrastDictDto;
import cn.datax.common.base.BaseService;
import java.util.List;
/**
* <p>
* 字典对照信息表 服务类
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
public interface ContrastDictService extends BaseService<ContrastDictEntity> {
ContrastDictEntity saveContrastDict(ContrastDictDto contrastDict);
ContrastDictEntity updateContrastDict(ContrastDictDto contrastDict);
ContrastDictEntity getContrastDictById(String id);
void deleteContrastDictById(String id);
void deleteContrastDictBatch(List<String> ids);
}
package cn.datax.service.data.standard.service;
import cn.datax.service.data.standard.api.entity.ContrastEntity;
import cn.datax.service.data.standard.api.dto.ContrastDto;
import cn.datax.common.base.BaseService;
import cn.datax.service.data.standard.api.vo.ContrastTreeVo;
import java.util.List;
/**
* <p>
* 对照表信息表 服务类
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
public interface ContrastService extends BaseService<ContrastEntity> {
ContrastEntity saveContrast(ContrastDto contrast);
ContrastEntity updateContrast(ContrastDto contrast);
ContrastEntity getContrastById(String id);
void deleteContrastById(String id);
void deleteContrastBatch(List<String> ids);
List<ContrastTreeVo> getContrastTree();
}
package cn.datax.service.data.standard.service;
import java.util.Map;
public interface DictMappingService {
Map<String, Object> getDictMapping(String id);
void dictAutoMapping(String id);
void dictManualMapping(String id);
}
package cn.datax.service.data.standard.service.impl;
import cn.datax.service.data.standard.api.entity.ContrastDictEntity;
import cn.datax.service.data.standard.api.dto.ContrastDictDto;
import cn.datax.service.data.standard.service.ContrastDictService;
import cn.datax.service.data.standard.mapstruct.ContrastDictMapper;
import cn.datax.service.data.standard.dao.ContrastDictDao;
import cn.datax.common.base.BaseServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
/**
* <p>
* 字典对照信息表 服务实现类
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Service
@Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class)
public class ContrastDictServiceImpl extends BaseServiceImpl<ContrastDictDao, ContrastDictEntity> implements ContrastDictService {
@Autowired
private ContrastDictDao contrastDictDao;
@Autowired
private ContrastDictMapper contrastDictMapper;
@Override
@Transactional(rollbackFor = Exception.class)
public ContrastDictEntity saveContrastDict(ContrastDictDto contrastDictDto) {
ContrastDictEntity contrastDict = contrastDictMapper.toEntity(contrastDictDto);
contrastDictDao.insert(contrastDict);
return contrastDict;
}
@Override
@Transactional(rollbackFor = Exception.class)
public ContrastDictEntity updateContrastDict(ContrastDictDto contrastDictDto) {
ContrastDictEntity contrastDict = contrastDictMapper.toEntity(contrastDictDto);
contrastDictDao.updateById(contrastDict);
return contrastDict;
}
@Override
public ContrastDictEntity getContrastDictById(String id) {
ContrastDictEntity contrastDictEntity = super.getById(id);
return contrastDictEntity;
}
@Override
@Transactional(rollbackFor = Exception.class)
public void deleteContrastDictById(String id) {
contrastDictDao.deleteById(id);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void deleteContrastDictBatch(List<String> ids) {
contrastDictDao.deleteBatchIds(ids);
}
}
package cn.datax.service.data.standard.service.impl;
import cn.datax.service.data.standard.api.entity.ContrastEntity;
import cn.datax.service.data.standard.api.dto.ContrastDto;
import cn.datax.service.data.standard.api.vo.ContrastTreeVo;
import cn.datax.service.data.standard.service.ContrastService;
import cn.datax.service.data.standard.mapstruct.ContrastMapper;
import cn.datax.service.data.standard.dao.ContrastDao;
import cn.datax.common.base.BaseServiceImpl;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* <p>
* 对照表信息表 服务实现类
* </p>
*
* @author yuwei
* @since 2020-09-27
*/
@Service
@Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class)
public class ContrastServiceImpl extends BaseServiceImpl<ContrastDao, ContrastEntity> implements ContrastService {
@Autowired
private ContrastDao contrastDao;
@Autowired
private ContrastMapper contrastMapper;
@Override
@Transactional(rollbackFor = Exception.class)
public ContrastEntity saveContrast(ContrastDto contrastDto) {
ContrastEntity contrast = contrastMapper.toEntity(contrastDto);
contrastDao.insert(contrast);
return contrast;
}
@Override
@Transactional(rollbackFor = Exception.class)
public ContrastEntity updateContrast(ContrastDto contrastDto) {
ContrastEntity contrast = contrastMapper.toEntity(contrastDto);
contrastDao.updateById(contrast);
return contrast;
}
@Override
public ContrastEntity getContrastById(String id) {
ContrastEntity contrastEntity = super.getById(id);
return contrastEntity;
}
@Override
@Transactional(rollbackFor = Exception.class)
public void deleteContrastById(String id) {
contrastDao.deleteById(id);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void deleteContrastBatch(List<String> ids) {
contrastDao.deleteBatchIds(ids);
}
@Override
public List<ContrastTreeVo> getContrastTree() {
List<ContrastTreeVo> list = new ArrayList<>();
List<ContrastEntity> contrastEntityList = contrastDao.selectList(Wrappers.emptyWrapper());
Map<String, List<ContrastEntity>> sourceMap = contrastEntityList.stream().collect(Collectors.groupingBy(ContrastEntity::getSourceId));
Iterator<Map.Entry<String, List<ContrastEntity>>> sourceIterator = sourceMap.entrySet().iterator();
while (sourceIterator.hasNext()) {
Map.Entry<String, List<ContrastEntity>> sourceEntry = sourceIterator.next();
String sourceId = sourceEntry.getKey();
List<ContrastEntity> sourceList = sourceEntry.getValue();
String sourceName = sourceList.get(0).getSourceName();
ContrastTreeVo sourceTree = new ContrastTreeVo();
sourceTree.setId(sourceId);
sourceTree.setLabel(sourceName);
Map<String, List<ContrastEntity>> tableMap = sourceList.stream().collect(Collectors.groupingBy(ContrastEntity::getTableId));
Iterator<Map.Entry<String, List<ContrastEntity>>> tableIterator = tableMap.entrySet().iterator();
List<ContrastTreeVo> tableTreeList = new ArrayList<>();
while (tableIterator.hasNext()) {
Map.Entry<String, List<ContrastEntity>> tableEntry = tableIterator.next();
String tableId = tableEntry.getKey();
List<ContrastEntity> tableList = tableEntry.getValue();
String tableName = tableList.get(0).getTableName();
String tableComment = tableList.get(0).getTableComment();
ContrastTreeVo tableTree = new ContrastTreeVo();
tableTree.setId(tableId);
tableTree.setLabel(tableName);
tableTree.setName(tableComment);
List<ContrastTreeVo> columnTreeList = tableList.stream().map(s -> {
ContrastTreeVo columnTree = new ContrastTreeVo();
columnTree.setId(s.getId());
columnTree.setLabel(s.getColumnName());
columnTree.setName(s.getColumnComment());
columnTree.setData(s);
return columnTree;
}).collect(Collectors.toList());
tableTree.setChildren(columnTreeList);
tableTreeList.add(tableTree);
}
sourceTree.setChildren(tableTreeList);
list.add(sourceTree);
}
return list;
}
}
package cn.datax.service.data.standard.service.impl;
import cn.datax.common.core.DataConstant;
import cn.datax.service.data.standard.api.entity.ContrastDictEntity;
import cn.datax.service.data.standard.api.entity.ContrastEntity;
import cn.datax.service.data.standard.api.entity.DictEntity;
import cn.datax.service.data.standard.api.vo.ContrastDictVo;
import cn.datax.service.data.standard.api.vo.DictVo;
import cn.datax.service.data.standard.dao.ContrastDao;
import cn.datax.service.data.standard.dao.ContrastDictDao;
import cn.datax.service.data.standard.dao.DictDao;
import cn.datax.service.data.standard.mapstruct.ContrastDictMapper;
import cn.datax.service.data.standard.mapstruct.DictMapper;
import cn.datax.service.data.standard.service.DictMappingService;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
@Service
public class DictMappingServiceImpl implements DictMappingService {
@Autowired
private ContrastDao contrastDao;
@Autowired
private ContrastDictDao contrastDictDao;
@Autowired
private ContrastDictMapper contrastDictMapper;
@Autowired
private DictDao dictDao;
@Autowired
private DictMapper dictMapper;
private static String BIND_GB_CODE = "gb_code";
private static String BIND_GB_NAME = "gb_name";
@Override
public Map<String, Object> getDictMapping(String id) {
ContrastEntity contrastEntity = contrastDao.selectById(id);
String contrastId = contrastEntity.getId();
String gbTypeId = contrastEntity.getGbTypeId();
QueryWrapper<ContrastDictEntity> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("d.contrast_id", contrastId);
List<ContrastDictEntity> contrastDictEntityList = contrastDictDao.selectList(queryWrapper);
List<ContrastDictVo> contrastDictList = contrastDictEntityList.stream().map(contrastDictMapper::toVO).collect(Collectors.toList());
List<DictEntity> dictEntityList = dictDao.selectList(Wrappers.<DictEntity>lambdaQuery().eq(DictEntity::getTypeId, gbTypeId).eq(DictEntity::getStatus, DataConstant.TrueOrFalse.TRUE.getKey()));
List<DictVo> dictList = dictEntityList.stream().map(dictMapper::toVO).collect(Collectors.toList());
Map<String, Object> map = new HashMap<>(4);
String tableName = StrUtil.isBlank(contrastEntity.getTableComment()) ? contrastEntity.getTableName() : contrastEntity.getTableName() + "(" + contrastEntity.getTableComment() + ")";
String columnName = StrUtil.isBlank(contrastEntity.getColumnComment()) ? contrastEntity.getTableName() : contrastEntity.getColumnName() + "(" + contrastEntity.getColumnComment() + ")";
long contrastTotal = contrastDictList.stream().count();
long unContrastTotal = contrastDictList.stream().filter(s -> DataConstant.TrueOrFalse.FALSE.getKey().equals(s.getStatus())).count();
map.put("title", "数据源: " + contrastEntity.getSourceName() + " 数据表: " + tableName + " 对照字段: " + columnName + " 标准类别编码: " + contrastEntity.getGbTypeCode() + " 标准类别名称: " + contrastEntity.getGbTypeName());
map.put("description", "总数: " + contrastTotal + " 未对照: " + unContrastTotal + " 已对照: " + (contrastTotal - unContrastTotal));
map.put("left", contrastDictList);
map.put("right", dictList);
return map;
}
@Override
public void dictAutoMapping(String id) {
ContrastEntity contrastEntity = contrastDao.selectById(id);
String contrastId = contrastEntity.getId();
String gbTypeId = contrastEntity.getGbTypeId();
String bindGbColumn = contrastEntity.getBindGbColumn();
// 查询未对照数据
QueryWrapper<ContrastDictEntity> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("d.contrast_id", contrastId);
queryWrapper.eq("d.status", DataConstant.TrueOrFalse.FALSE.getKey());
List<ContrastDictEntity> contrastDictEntityList = contrastDictDao.selectList(queryWrapper);
// 查询标准字典数据
List<DictEntity> dictEntityList = dictDao.selectList(Wrappers.<DictEntity>lambdaQuery().eq(DictEntity::getTypeId, gbTypeId).eq(DictEntity::getStatus, DataConstant.TrueOrFalse.TRUE.getKey()));
contrastDictEntityList.stream().forEach(c -> {
dictEntityList.stream().filter(d -> {
if (BIND_GB_CODE.equals(bindGbColumn)) {
return Objects.equals(c.getColCode(), d.getGbCode());
} else {
return Objects.equals(c.getColName(), d.getGbName());
}
}).forEach(s -> {
// 更新对照结果
String contrastGbId = s.getId();
c.setStatus(DataConstant.TrueOrFalse.TRUE.getKey());
c.setContrastGbId(contrastGbId);
contrastDictDao.updateById(c);
});
});
}
@Override
public void dictManualMapping(String id) {
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.datax.service.data.standard.dao.ContrastDictDao">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="cn.datax.service.data.standard.api.entity.ContrastDictEntity">
<result column="id" property="id" />
<result column="status" property="status" />
<result column="create_by" property="createBy" />
<result column="create_time" property="createTime" />
<result column="create_dept" property="createDept" />
<result column="update_by" property="updateBy" />
<result column="update_time" property="updateTime" />
<result column="remark" property="remark" />
<result column="contrast_id" property="contrastId" />
<result column="col_code" property="colCode" />
<result column="col_name" property="colName" />
<result column="contrast_gb_id" property="contrastGbId" />
</resultMap>
<resultMap id="ExtendResultMap" type="cn.datax.service.data.standard.api.entity.ContrastDictEntity" extends="BaseResultMap">
<result column="source_name" property="sourceName" />
<result column="table_name" property="tableName" />
<result column="column_name" property="columnName" />
<result column="gb_type_code" property="gbTypeCode" />
<result column="gb_type_name" property="gbTypeName" />
<result column="contrast_gb_code" property="contrastGbCode" />
<result column="contrast_gb_name" property="contrastGbName" />
</resultMap>
<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id,
status,
create_by,
create_time,
create_dept,
update_by,
update_time,
remark,
contrast_id, col_code, col_name, contrast_gb_id
</sql>
<sql id="Dict_Column_List">
${alias}.id,
${alias}.status,
${alias}.create_by,
${alias}.create_time,
${alias}.create_dept,
${alias}.update_by,
${alias}.update_time,
${alias}.remark,
${alias}.contrast_id, ${alias}.col_code, ${alias}.col_name, ${alias}.contrast_gb_id
</sql>
<select id="selectList" resultMap="ExtendResultMap">
SELECT c.source_name, c.table_name, c.column_name, t.gb_type_code, t.gb_type_name,
sd.gb_code AS contrast_gb_code, sd.gb_name AS contrast_gb_name,
<include refid="Dict_Column_List"><property name="alias" value="d"/></include>
FROM standard_contrast_dict d
LEFT JOIN standard_contrast c ON c.id = d.contrast_id
LEFT JOIN standard_type t ON t.id = c.gb_type_id
LEFT JOIN standard_dict sd ON sd.id = d.contrast_gb_id
${ew.customSqlSegment}
</select>
<select id="selectPage" resultMap="ExtendResultMap">
SELECT c.source_name, c.table_name, c.column_name, t.gb_type_code, t.gb_type_name,
sd.gb_code AS contrast_gb_code, sd.gb_name AS contrast_gb_name,
<include refid="Dict_Column_List"><property name="alias" value="d"/></include>
FROM standard_contrast_dict d
LEFT JOIN standard_contrast c ON c.id = d.contrast_id
LEFT JOIN standard_type t ON t.id = c.gb_type_id
LEFT JOIN standard_dict sd ON sd.id = d.contrast_gb_id
${ew.customSqlSegment}
</select>
</mapper>
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.datax.service.data.standard.dao.ContrastDao">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="cn.datax.service.data.standard.api.entity.ContrastEntity">
<result column="id" property="id" />
<result column="status" property="status" />
<result column="create_by" property="createBy" />
<result column="create_time" property="createTime" />
<result column="create_dept" property="createDept" />
<result column="update_by" property="updateBy" />
<result column="update_time" property="updateTime" />
<result column="remark" property="remark" />
<result column="source_id" property="sourceId" />
<result column="source_name" property="sourceName" />
<result column="table_id" property="tableId" />
<result column="table_name" property="tableName" />
<result column="table_comment" property="tableComment" />
<result column="column_id" property="columnId" />
<result column="column_name" property="columnName" />
<result column="column_comment" property="columnComment" />
<result column="gb_type_id" property="gbTypeId" />
<result column="bind_gb_column" property="bindGbColumn" />
</resultMap>
<resultMap id="ExtendResultMap" type="cn.datax.service.data.standard.api.entity.ContrastEntity" extends="BaseResultMap">
<result column="gb_type_code" property="gbTypeCode" />
<result column="gb_type_name" property="gbTypeName" />
</resultMap>
<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id,
status,
create_by,
create_time,
create_dept,
update_by,
update_time,
remark,
source_id, source_name, table_id, table_name, table_comment, column_id, column_name, column_comment, gb_type_id, bind_gb_column
</sql>
<sql id="Contrast_Column_List">
${alias}.id,
${alias}.status,
${alias}.create_by,
${alias}.create_time,
${alias}.create_dept,
${alias}.update_by,
${alias}.update_time,
${alias}.remark,
${alias}.source_id, ${alias}.source_name, ${alias}.table_id, ${alias}.table_name, ${alias}.table_comment, ${alias}.column_id,
${alias}.column_name, ${alias}.column_comment, ${alias}.gb_type_id, ${alias}.bind_gb_column
</sql>
<select id="selectById" resultMap="ExtendResultMap">
SELECT t.gb_type_code, t.gb_type_name,
<include refid="Contrast_Column_List"><property name="alias" value="c"/></include>
FROM standard_contrast c
LEFT JOIN standard_type t ON t.id = c.gb_type_id
WHERE 1 = 1 AND c.id = #{id}
</select>
</mapper>
...@@ -15,6 +15,9 @@ ...@@ -15,6 +15,9 @@
<result column="type_id" property="typeId" /> <result column="type_id" property="typeId" />
<result column="gb_code" property="gbCode" /> <result column="gb_code" property="gbCode" />
<result column="gb_name" property="gbName" /> <result column="gb_name" property="gbName" />
</resultMap>
<resultMap id="ExtendResultMap" type="cn.datax.service.data.standard.api.entity.DictEntity" extends="BaseResultMap">
<result column="gb_type_code" property="gbTypeCode" /> <result column="gb_type_code" property="gbTypeCode" />
<result column="gb_type_name" property="gbTypeName" /> <result column="gb_type_name" property="gbTypeName" />
</resultMap> </resultMap>
...@@ -42,7 +45,7 @@ ...@@ -42,7 +45,7 @@
${alias}.remark, ${alias}.type_id, ${alias}.gb_code, ${alias}.gb_name ${alias}.remark, ${alias}.type_id, ${alias}.gb_code, ${alias}.gb_name
</sql> </sql>
<select id="selectById" resultMap="BaseResultMap"> <select id="selectById" resultMap="ExtendResultMap">
SELECT t.gb_type_code, t.gb_type_name, SELECT t.gb_type_code, t.gb_type_name,
<include refid="Dict_Column_List"><property name="alias" value="d"/></include> <include refid="Dict_Column_List"><property name="alias" value="d"/></include>
FROM standard_dict d FROM standard_dict d
...@@ -50,7 +53,7 @@ ...@@ -50,7 +53,7 @@
WHERE 1 = 1 AND d.id = #{id} WHERE 1 = 1 AND d.id = #{id}
</select> </select>
<select id="selectPage" resultMap="BaseResultMap"> <select id="selectPage" resultMap="ExtendResultMap">
SELECT t.gb_type_code, t.gb_type_name, SELECT t.gb_type_code, t.gb_type_name,
<include refid="Dict_Column_List"><property name="alias" value="d"/></include> <include refid="Dict_Column_List"><property name="alias" value="d"/></include>
FROM standard_dict d FROM standard_dict d
......
...@@ -21,7 +21,7 @@ public class ScheduleJob extends QuartzJobBean { ...@@ -21,7 +21,7 @@ public class ScheduleJob extends QuartzJobBean {
@Autowired @Autowired
private QrtzJobLogService qrtzJobLogService; private QrtzJobLogService qrtzJobLogService;
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 30, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(50)); private static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(16));
@Override @Override
protected void executeInternal(JobExecutionContext context) { protected void executeInternal(JobExecutionContext context) {
......
package cn.datax.service.workflow.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 消息是否成功发送到Exchange
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息成功发送到Exchange");
} else {
log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
}
});
// 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
rabbitTemplate.setMandatory(true);
// 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
});
return rabbitTemplate;
}
}
package cn.datax.service.workflow.flowable; package cn.datax.service.workflow.flowable;
import cn.datax.common.core.DataConstant; import cn.datax.common.core.DataConstant;
import cn.datax.common.rabbitmq.config.RabbitMqConstant;
import cn.datax.common.utils.SpringContextHolder;
import cn.datax.service.workflow.api.enums.VariablesEnum; import cn.datax.service.workflow.api.enums.VariablesEnum;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.delegate.DelegateExecution; import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.ExecutionListener; import org.flowable.engine.delegate.ExecutionListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
@Slf4j @Slf4j
...@@ -22,10 +26,17 @@ public class EndTaskListener implements ExecutionListener { ...@@ -22,10 +26,17 @@ public class EndTaskListener implements ExecutionListener {
String businessKey = (String) variables.get(VariablesEnum.businessKey.toString()); String businessKey = (String) variables.get(VariablesEnum.businessKey.toString());
String businessCode = (String) variables.get(VariablesEnum.businessCode.toString()); String businessCode = (String) variables.get(VariablesEnum.businessCode.toString());
log.info("业务结束:{},{}", businessKey, businessCode); log.info("业务结束:{},{}", businessKey, businessCode);
Map<String, Object> map = new HashMap<>(4);
map.put(VariablesEnum.businessKey.toString(), businessKey);
map.put(VariablesEnum.businessCode.toString(), businessCode);
if (delegateExecution.getCurrentActivityId().equals(VariablesEnum.approveEnd.toString())) { if (delegateExecution.getCurrentActivityId().equals(VariablesEnum.approveEnd.toString())) {
log.info("业务结束状态:{}", DataConstant.AuditState.AGREE.getKey()); log.info("业务结束状态:{}", DataConstant.AuditState.AGREE.getKey());
map.put("flowStatus", DataConstant.AuditState.AGREE.getKey());
} else if (delegateExecution.getCurrentActivityId().equals(VariablesEnum.rejectEnd.toString())) { } else if (delegateExecution.getCurrentActivityId().equals(VariablesEnum.rejectEnd.toString())) {
log.info("业务结束状态:{}", DataConstant.AuditState.REJECT.getKey()); log.info("业务结束状态:{}", DataConstant.AuditState.REJECT.getKey());
map.put("flowStatus", DataConstant.AuditState.REJECT.getKey());
} }
RabbitTemplate rabbitTemplate = SpringContextHolder.getBean(RabbitTemplate.class);
rabbitTemplate.convertAndSend(RabbitMqConstant.TOPIC_EXCHANGE_WORKFLOW, RabbitMqConstant.TOPIC_WORKFLOW_KEY + businessCode, map);
} }
} }
package cn.datax.service.workflow.flowable; package cn.datax.service.workflow.flowable;
import cn.datax.common.core.DataConstant;
import cn.datax.common.rabbitmq.config.RabbitMqConstant;
import cn.datax.common.utils.SpringContextHolder; import cn.datax.common.utils.SpringContextHolder;
import cn.datax.service.workflow.api.enums.VariablesEnum; import cn.datax.service.workflow.api.enums.VariablesEnum;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.TaskService; import org.flowable.engine.TaskService;
import org.flowable.task.service.delegate.DelegateTask; import org.flowable.task.service.delegate.DelegateTask;
import org.flowable.task.service.delegate.TaskListener; import org.flowable.task.service.delegate.TaskListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
...@@ -28,6 +32,13 @@ public class InitialAuditCompleteTaskListener implements TaskListener { ...@@ -28,6 +32,13 @@ public class InitialAuditCompleteTaskListener implements TaskListener {
String businessKey = (String) variables.get(VariablesEnum.businessKey.toString()); String businessKey = (String) variables.get(VariablesEnum.businessKey.toString());
String businessCode = (String) variables.get(VariablesEnum.businessCode.toString()); String businessCode = (String) variables.get(VariablesEnum.businessCode.toString());
log.info("业务退回:{},{}", businessKey, businessCode); log.info("业务退回:{},{}", businessKey, businessCode);
log.info("业务退回状态:{}", DataConstant.AuditState.BACK.getKey());
RabbitTemplate rabbitTemplate = SpringContextHolder.getBean(RabbitTemplate.class);
Map<String, Object> map = new HashMap<>(4);
map.put(VariablesEnum.businessKey.toString(), businessKey);
map.put(VariablesEnum.businessCode.toString(), businessCode);
map.put("flowStatus", DataConstant.AuditState.BACK.getKey());
rabbitTemplate.convertAndSend(RabbitMqConstant.TOPIC_EXCHANGE_WORKFLOW, RabbitMqConstant.TOPIC_WORKFLOW_KEY + businessCode, map);
} }
log.info("退出初审节点用户任务完成监听器"); log.info("退出初审节点用户任务完成监听器");
} }
......
package cn.datax.service.workflow.flowable; package cn.datax.service.workflow.flowable;
import cn.datax.common.core.DataConstant; import cn.datax.common.core.DataConstant;
import cn.datax.common.rabbitmq.config.RabbitMqConstant;
import cn.datax.common.utils.SpringContextHolder;
import cn.datax.service.workflow.api.enums.VariablesEnum; import cn.datax.service.workflow.api.enums.VariablesEnum;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.flowable.task.service.delegate.DelegateTask; import org.flowable.task.service.delegate.DelegateTask;
import org.flowable.task.service.delegate.TaskListener; import org.flowable.task.service.delegate.TaskListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
@Slf4j @Slf4j
...@@ -25,5 +29,11 @@ public class SubmitCompleteTaskListener implements TaskListener { ...@@ -25,5 +29,11 @@ public class SubmitCompleteTaskListener implements TaskListener {
log.info("业务审核中:{},{}", businessKey, businessCode); log.info("业务审核中:{},{}", businessKey, businessCode);
log.info("业务审核中状态:{}", DataConstant.AuditState.AUDIT.getKey()); log.info("业务审核中状态:{}", DataConstant.AuditState.AUDIT.getKey());
log.info("退出提交节点用户任务完成监听器"); log.info("退出提交节点用户任务完成监听器");
RabbitTemplate rabbitTemplate = SpringContextHolder.getBean(RabbitTemplate.class);
Map<String, Object> map = new HashMap<>(4);
map.put(VariablesEnum.businessKey.toString(), businessKey);
map.put(VariablesEnum.businessCode.toString(), businessCode);
map.put("flowStatus", DataConstant.AuditState.AUDIT.getKey());
rabbitTemplate.convertAndSend(RabbitMqConstant.TOPIC_EXCHANGE_WORKFLOW, RabbitMqConstant.TOPIC_WORKFLOW_KEY + businessCode, map);
} }
} }
package cn.datax.service.workflow.service.impl; package cn.datax.service.workflow.service.impl;
import cn.datax.common.core.DataConstant;
import cn.datax.common.rabbitmq.config.RabbitMqConstant;
import cn.datax.common.utils.SecurityUtil; import cn.datax.common.utils.SecurityUtil;
import cn.datax.service.workflow.api.dto.ProcessInstanceCreateRequest; import cn.datax.service.workflow.api.dto.ProcessInstanceCreateRequest;
import cn.datax.service.workflow.api.enums.VariablesEnum; import cn.datax.service.workflow.api.enums.VariablesEnum;
...@@ -27,12 +29,14 @@ import org.flowable.engine.runtime.ProcessInstanceQuery; ...@@ -27,12 +29,14 @@ import org.flowable.engine.runtime.ProcessInstanceQuery;
import org.flowable.engine.task.Comment; import org.flowable.engine.task.Comment;
import org.flowable.image.impl.DefaultProcessDiagramGenerator; import org.flowable.image.impl.DefaultProcessDiagramGenerator;
import org.flowable.task.api.Task; import org.flowable.task.api.Task;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import java.io.InputStream; import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -52,6 +56,9 @@ public class FlowInstanceServiceImpl implements FlowInstanceService { ...@@ -52,6 +56,9 @@ public class FlowInstanceServiceImpl implements FlowInstanceService {
@Autowired @Autowired
private TaskService taskService; private TaskService taskService;
@Autowired
private RabbitTemplate rabbitTemplate;
private static final String IMAGE_TYPE = "png"; private static final String IMAGE_TYPE = "png";
private static final String FONT_NAME = "宋体"; private static final String FONT_NAME = "宋体";
...@@ -103,13 +110,19 @@ public class FlowInstanceServiceImpl implements FlowInstanceService { ...@@ -103,13 +110,19 @@ public class FlowInstanceServiceImpl implements FlowInstanceService {
@Override @Override
public void deleteProcessInstance(String processInstanceId) { public void deleteProcessInstance(String processInstanceId) {
// 发送消息队列 // 发送消息队列
ProcessInstance processInstance = runtimeService.createProcessInstanceQuery().processInstanceId(processInstanceId).singleResult(); ProcessInstance processInstance = runtimeService.createProcessInstanceQuery().includeProcessVariables().processInstanceId(processInstanceId).singleResult();
Map<String, Object> variables = processInstance.getProcessVariables(); Map<String, Object> variables = processInstance.getProcessVariables();
String businessKey = (String) variables.get(VariablesEnum.businessKey.toString()); String businessKey = (String) variables.get(VariablesEnum.businessKey.toString());
String businessCode = (String) variables.get(VariablesEnum.businessCode.toString()); String businessCode = (String) variables.get(VariablesEnum.businessCode.toString());
log.info("业务撤销:{},{}", businessKey, businessCode); log.info("业务撤销:{},{}", businessKey, businessCode);
log.info("成功删除流程实例ID:{}", processInstanceId); log.info("成功删除流程实例ID:{}", processInstanceId);
log.info("业务撤销状态:{}", DataConstant.AuditState.CANCEL.getKey());
runtimeService.deleteProcessInstance(processInstanceId, "用户撤销"); runtimeService.deleteProcessInstance(processInstanceId, "用户撤销");
Map<String, Object> map = new HashMap<>(4);
map.put(VariablesEnum.businessKey.toString(), businessKey);
map.put(VariablesEnum.businessCode.toString(), businessCode);
map.put("flowStatus", DataConstant.AuditState.CANCEL.getKey());
rabbitTemplate.convertAndSend(RabbitMqConstant.TOPIC_EXCHANGE_WORKFLOW, RabbitMqConstant.TOPIC_WORKFLOW_KEY + businessCode, map);
} }
@Override @Override
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
"echarts": "^4.8.0", "echarts": "^4.8.0",
"element-ui": "2.13.2", "element-ui": "2.13.2",
"good-storage": "^1.1.1", "good-storage": "^1.1.1",
"jsplumb": "^2.14.6",
"normalize.css": "7.0.0", "normalize.css": "7.0.0",
"nprogress": "0.2.0", "nprogress": "0.2.0",
"path-to-regexp": "2.4.0", "path-to-regexp": "2.4.0",
......
import request from '@/utils/request'
export function pageCheckJob(data) {
return request({
url: '/data/quality/scheduleJobs/page',
method: 'get',
params: data
})
}
export function pauseCheckJob(id) {
return request({
url: '/data/quality/scheduleJobs/pause/' + id,
method: 'post'
})
}
export function resumeCheckJob(id) {
return request({
url: '/data/quality/scheduleJobs/resume/' + id,
method: 'post'
})
}
import request from '@/utils/request'
export function listRuleType(data) {
return request({
url: '/data/quality/ruleTypes/report/list',
method: 'get',
params: data
})
}
export function pageCheckReport(data) {
return request({
url: '/data/quality/checkReports/page',
method: 'get',
params: data
})
}
import request from '@/utils/request'
export function listRuleType(data) {
return request({
url: '/data/quality/ruleTypes/list',
method: 'get',
params: data
})
}
export function pageCheckRule(data) {
return request({
url: '/data/quality/checkRules/page',
method: 'get',
params: data
})
}
export function getCheckRule(id) {
return request({
url: '/data/quality/checkRules/' + id,
method: 'get'
})
}
export function delCheckRule(id) {
return request({
url: '/data/quality/checkRules/' + id,
method: 'delete'
})
}
export function delCheckRules(ids) {
return request({
url: '/data/quality/checkRules/batch/' + ids,
method: 'delete'
})
}
export function addCheckRule(data) {
return request({
url: '/data/quality/checkRules',
method: 'post',
data: data
})
}
export function updateCheckRule(data) {
return request({
url: '/data/quality/checkRules/' + data.id,
method: 'put',
data: data
})
}
import request from '@/utils/request'
export function getContrastTree(data) {
return request({
url: '/data/standard/contrasts/tree',
method: 'get',
params: data
})
}
export function addContrast(data) {
return request({
url: '/data/standard/contrasts',
method: 'post',
data: data
})
}
export function updateContrast(data) {
return request({
url: '/data/standard/contrasts/' + data.id,
method: 'put',
data: data
})
}
export function delContrast(id) {
return request({
url: '/data/standard/contrasts/' + id,
method: 'delete'
})
}
import request from '@/utils/request'
export function pageContrastDict(data) {
return request({
url: '/data/standard/contrastDicts/page',
method: 'get',
params: data
})
}
export function getContrastDict(id) {
return request({
url: '/data/standard/contrastDicts/' + id,
method: 'get'
})
}
export function delContrastDict(id) {
return request({
url: '/data/standard/contrastDicts/' + id,
method: 'delete'
})
}
export function delContrastDicts(ids) {
return request({
url: '/data/standard/contrastDicts/batch/' + ids,
method: 'delete'
})
}
export function addContrastDict(data) {
return request({
url: '/data/standard/contrastDicts',
method: 'post',
data: data
})
}
export function updateContrastDict(data) {
return request({
url: '/data/standard/contrastDicts/' + data.id,
method: 'put',
data: data
})
}
import request from '@/utils/request'
export function getDictMapping(id) {
return request({
url: '/data/standard/mappings/' + id,
method: 'get'
})
}
export function dictAutoMapping(id) {
return request({
url: '/data/standard/mappings/auto/' + id,
method: 'post'
})
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment