Commit e72e349f by yuwei

项目初始化

parent 6de3441c
......@@ -5,6 +5,7 @@ import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import javax.validation.Valid;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.io.Serializable;
......@@ -50,8 +51,9 @@ public class CheckRuleDto implements Serializable {
private String ruleColumn;
@ApiModelProperty(value = "核查字段名称")
private String ruleColumnComment;
@ApiModelProperty(value = "核查脚本")
private String ruleSql;
@ApiModelProperty(value = "核查配置")
@Valid
private RuleConfig ruleConfig;
@ApiModelProperty(value = "状态")
@NotNull(message = "状态不能为空", groups = {ValidationGroups.Insert.class, ValidationGroups.Update.class})
private String status;
......
package cn.datax.service.data.quality.api.dto;
import lombok.Data;
import java.io.Serializable;
@Data
public class RuleConfig implements Serializable {
private static final long serialVersionUID=1L;
private String ruleItemCode;
}
package cn.datax.service.data.quality.api.entity;
import cn.datax.common.base.DataScopeBaseEntity;
import cn.datax.service.data.quality.api.dto.RuleConfig;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.extension.handlers.JacksonTypeHandler;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
......@@ -93,6 +95,12 @@ public class CheckRuleEntity extends DataScopeBaseEntity {
private String ruleColumnComment;
/**
* 核查配置
*/
@TableField(value = "config_json", typeHandler = JacksonTypeHandler.class)
private RuleConfig ruleConfig;
/**
* 核查脚本
*/
private String ruleSql;
......
package cn.datax.service.data.quality.api.enums;
public enum RuleItem {
Unique("unique_key", "验证用户指定的字段是否具有唯一性"),
AccuracyLlength("accuracy_key_length", "验证长度是否符合规定"),
Integrity("integrity_key", "验证表中必须出现的字段非空"),
Relevance("relevance_key", "验证关联性"),
Timeliness("timeliness_key", "验证及时性"),
Consistent("consistent_key", "验证用户指定的字段枚举值是否合乎要求");
private final String code;
private final String desc;
RuleItem(String code, String desc) {
this.code = code;
this.desc = desc;
}
public String getCode() {
return code;
}
}
package cn.datax.service.data.quality.api.vo;
import cn.datax.service.data.quality.api.dto.RuleConfig;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
......@@ -38,5 +39,5 @@ public class CheckRuleVo implements Serializable {
private String ruleColumnId;
private String ruleColumn;
private String ruleColumnComment;
private String ruleSql;
private RuleConfig ruleConfig;
}
......@@ -8,6 +8,9 @@ import com.baomidou.mybatisplus.core.toolkit.Constants;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.io.Serializable;
import java.util.List;
/**
* <p>
* 核查规则信息表 Mapper 接口
......@@ -20,5 +23,11 @@ import org.apache.ibatis.annotations.Param;
public interface CheckRuleDao extends BaseDao<CheckRuleEntity> {
@Override
CheckRuleEntity selectById(Serializable id);
@Override
List<CheckRuleEntity> selectList(@Param(Constants.WRAPPER) Wrapper<CheckRuleEntity> queryWrapper);
@Override
<E extends IPage<CheckRuleEntity>> E selectPage(E page, @Param(Constants.WRAPPER) Wrapper<CheckRuleEntity> queryWrapper);
}
package cn.datax.service.data.quality.schedule;
import cn.datax.common.exception.DataException;
import cn.datax.service.data.quality.schedule.rules.RuleItem;
import cn.datax.service.data.quality.schedule.rules.RuleItemRegistry;
import java.util.Optional;
public class CheckRuleFactory {
private static final RuleItemRegistry RULE_ITEM_REGISTRY = new RuleItemRegistry();
public CheckRuleFactory() {
}
public static RuleItem getRuleItem(String code) {
return Optional.ofNullable(RULE_ITEM_REGISTRY.getRuleItem(code)).orElseThrow(() -> new DataException(String.format("%s not supported.", code)));
}
}
package cn.datax.service.data.quality.schedule.rules;
/**
* 准确性核查
* 核查项:长度
* select sum(case when length(column) > 15 then 1 else 0 end), count(*) from table;
*/
public class AccuracyLlengthRule implements RuleItem {
@Override
public String parse(String table, String column) {
final StringBuilder builder = new StringBuilder();
builder.append("SELECT SUM(CASE WHEN LENGTH(").append(column).append(") > 15 THEN 1 ELSE 0 END), COUNT(*) FROM ").append(table);
return builder.toString();
}
@Override
public String code() {
return "accuracy_key_length";
}
}
package cn.datax.service.data.quality.schedule.rules;
/**
* 一致性核查
* 核查项:字典
* select sum(case when column not in ('0', '1') then 1 else 0 end), count(*) from table;
*/
public class ConsistentRule implements RuleItem {
@Override
public String parse(String table, String column) {
final StringBuilder builder = new StringBuilder();
builder.append("SELECT SUM(CASE WHEN ").append(column).append(" NOT IN ('0', '1') THEN 1 ELSE 0 END), COUNT(*) FROM ").append(table);
return builder.toString();
}
@Override
public String code() {
return "consistent_key";
}
}
package cn.datax.service.data.quality.schedule.rules;
/**
* 完整性核查
* 核查项:非空
* select sum(case when column is not null and trim(column) != '' then 1 else 0 end), count(*) from table;
*/
public class IntegrityRule implements RuleItem {
@Override
public String parse(String table, String column) {
final StringBuilder builder = new StringBuilder();
builder.append("SELECT SUM(CASE WHEN ").append(column).append(" IS NOT NULL AND TRIM(").append(column).append(") != '' THEN 1 ELSE 0 END), COUNT(*) FROM ").append(table);
return builder.toString();
}
@Override
public String code() {
return "integrity_key";
}
}
package cn.datax.service.data.quality.schedule.rules;
/**
* 关联性核查
*/
public class RelevanceRule implements RuleItem {
@Override
public String parse(String table, String column) {
return null;
}
@Override
public String code() {
return "relevance_key";
}
}
package cn.datax.service.data.quality.schedule.rules;
public interface RuleItem {
String parse(String table, String column);
String code();
}
package cn.datax.service.data.quality.schedule.rules;
import java.util.HashMap;
import java.util.Map;
public class RuleItemRegistry {
private final Map<String, RuleItem> rule_item_map = new HashMap<>();
public RuleItemRegistry() {
this.rule_item_map.put("unique_key", new UniqueRule());
this.rule_item_map.put("consistent_key", new ConsistentRule());
this.rule_item_map.put("integrity_key", new IntegrityRule());
this.rule_item_map.put("relevance_key", new RelevanceRule());
this.rule_item_map.put("timeliness_key", new TimelinessRule());
this.rule_item_map.put("accuracy_key_length", new AccuracyLlengthRule());
}
public RuleItem getRuleItem(String code) {
return this.rule_item_map.get(code);
}
}
package cn.datax.service.data.quality.schedule.rules;
/**
* 及时性核查
*/
public class TimelinessRule implements RuleItem {
@Override
public String parse(String table, String column) {
return null;
}
@Override
public String code() {
return "timeliness_key";
}
}
package cn.datax.service.data.quality.schedule.rules;
/**
* 唯一性核查
* 核查项:主键
* select count(distinct id), count(*) from table;
*/
public class UniqueRule implements RuleItem {
@Override
public String parse(String table, String column) {
final StringBuilder builder = new StringBuilder();
builder.append("SELECT totalCount-count AS errorCount, totalCount FROM (");
builder.append("SELECT COUNT(DISTINCT ").append(column).append(") AS count, COUNT(*) AS totalCount FROM ").append(table);
builder.append(") AS TEMP");
return builder.toString();
}
@Override
public String code() {
return "unique_key";
}
}
......@@ -63,7 +63,8 @@ public class QualityTask {
MultiThreadHandler handler = new ParallelTaskWithThreadPool(threadPoolExecutor);
// 启动子线程作为要处理的并行任务
list.stream().forEach(rule -> {
TaskHander task = new TaskHander(metadataSourceServiceFeign, dataSourceFactory, rule, result);
RuleItemEntity ruleItem = ruleItemList.stream().filter(item -> item.getId().equals(rule.getRuleItemId())).findFirst().get();
TaskHander task = new TaskHander(metadataSourceServiceFeign, dataSourceFactory, ruleItem, rule, result);
handler.addTask(task);
});
try {
......@@ -79,12 +80,12 @@ public class QualityTask {
String status = StrUtil.isBlank(s.getCheckResult()) ? DataConstant.TrueOrFalse.TRUE.getKey() : DataConstant.TrueOrFalse.FALSE.getKey();
if (StrUtil.isBlank(s.getCheckResult())) {
s.setCheckBatch((String) map.get("batch"));
checkReportService.save(s);
// checkReportService.save(s);
// 更新最近核查批次号
LambdaUpdateWrapper<CheckRuleEntity> updateWrapper = new LambdaUpdateWrapper<>();
updateWrapper.set(CheckRuleEntity::getLastCheckBatch, (String) map.get("batch"));
updateWrapper.eq(CheckRuleEntity::getId, s.getCheckRuleId());
checkRuleService.update(updateWrapper);
// checkRuleService.update(updateWrapper);
}
// 定时任务日志
ScheduleLogEntity scheduleLogEntity = new ScheduleLogEntity();
......@@ -94,7 +95,7 @@ public class QualityTask {
scheduleLogEntity.setExecuteRuleId(s.getCheckRuleId());
scheduleLogEntity.setExecuteResult(s.getCheckResult());
scheduleLogEntity.setStatus(status);
scheduleLogService.save(scheduleLogEntity);
// scheduleLogService.save(scheduleLogEntity);
});
}
}
......@@ -9,6 +9,8 @@ import cn.datax.service.data.metadata.api.entity.MetadataSourceEntity;
import cn.datax.service.data.metadata.api.feign.MetadataSourceServiceFeign;
import cn.datax.service.data.quality.api.entity.CheckReportEntity;
import cn.datax.service.data.quality.api.entity.CheckRuleEntity;
import cn.datax.service.data.quality.api.entity.RuleItemEntity;
import cn.datax.service.data.quality.schedule.CheckRuleFactory;
import java.sql.Connection;
import java.sql.ResultSet;
......@@ -24,14 +26,17 @@ public class TaskHander implements Runnable {
private DataSourceFactory dataSourceFactory;
private RuleItemEntity ruleItemEntity;
private CheckRuleEntity checkRuleEntity;
private List<CheckReportEntity> list;
public TaskHander(MetadataSourceServiceFeign metadataSourceServiceFeign, DataSourceFactory dataSourceFactory, CheckRuleEntity checkRuleEntity, List<CheckReportEntity> list) {
public TaskHander(MetadataSourceServiceFeign metadataSourceServiceFeign, DataSourceFactory dataSourceFactory, RuleItemEntity ruleItemEntity, CheckRuleEntity checkRuleEntity, List<CheckReportEntity> list) {
super();
this.metadataSourceServiceFeign = metadataSourceServiceFeign;
this.dataSourceFactory = dataSourceFactory;
this.ruleItemEntity = ruleItemEntity;
this.checkRuleEntity = checkRuleEntity;
this.list = list;
}
......@@ -41,27 +46,35 @@ public class TaskHander implements Runnable {
CheckReportEntity checkReportEntity = new CheckReportEntity();
checkReportEntity.setCheckRuleId(checkRuleEntity.getId());
checkReportEntity.setCheckDate(LocalDateTime.now());
MetadataSourceEntity dataSource = null;
Optional<MetadataSourceEntity> dataSourceOptional = Optional.ofNullable(metadataSourceServiceFeign.getMetadataSourceById(checkRuleEntity.getRuleSourceId()));
if (dataSourceOptional.isPresent()) {
dataSource = dataSourceOptional.get();
} else {
MetadataSourceEntity dataSource = Optional.ofNullable(metadataSourceServiceFeign.getMetadataSourceById(checkRuleEntity.getRuleSourceId())).orElseThrow(() -> {
checkReportEntity.setCheckResult("获取数据源接口出错");
list.add(checkReportEntity);
dataSourceOptional.orElseThrow(DataException::new);
}
return new DataException("获取数据源接口出错");
});
// if (dataSourceOptional.isPresent()) {
// dataSource = dataSourceOptional.get();
// } else {
// checkReportEntity.setCheckResult("获取数据源接口出错");
// list.add(checkReportEntity);
// dataSourceOptional.orElseThrow(DataException::new);
// }
DbSchema dbSchema = dataSource.getDbSchema();
DbQueryProperty dbQueryProperty = new DbQueryProperty(dataSource.getDbType(), dbSchema.getHost(),
dbSchema.getUsername(), dbSchema.getPassword(), dbSchema.getPort(), dbSchema.getDbName(), dbSchema.getSid());
DbQuery dbQuery = null;
Optional<DbQuery> dbQueryOptional = Optional.ofNullable(dataSourceFactory.createDbQuery(dbQueryProperty));
if (dbQueryOptional.isPresent()) {
dbQuery = dbQueryOptional.get();
} else {
DbQuery dbQuery = Optional.ofNullable(dataSourceFactory.createDbQuery(dbQueryProperty)).orElseThrow(() -> {
checkReportEntity.setCheckResult("创建数据查询接口出错");
list.add(checkReportEntity);
dbQueryOptional.orElseThrow(DataException::new);
}
return new DataException("创建数据查询接口出错");
});
// if (dbQueryOptional.isPresent()) {
// dbQuery = dbQueryOptional.get();
// } else {
// checkReportEntity.setCheckResult("创建数据查询接口出错");
// list.add(checkReportEntity);
// dbQueryOptional.orElseThrow(DataException::new);
// }
String sql = CheckRuleFactory.getRuleItem(ruleItemEntity.getItemCode()).parse(checkRuleEntity.getRuleTable(), checkRuleEntity.getRuleColumn());
System.out.println(sql);
Connection conn = dbQuery.getConnection();
Statement stmt = null;
ResultSet rs = null;
......
......@@ -24,13 +24,14 @@
<result column="rule_column_id" property="ruleColumnId" />
<result column="rule_column" property="ruleColumn" />
<result column="rule_column_comment" property="ruleColumnComment" />
<result column="rule_sql" property="ruleSql" />
<result column="last_check_batch" property="lastCheckBatch" />
</resultMap>
<resultMap id="ExtendResultMap" type="cn.datax.service.data.quality.api.entity.CheckRuleEntity" extends="BaseResultMap">
<result column="rule_type" property="ruleType" />
<result column="rule_level" property="ruleLevel" />
<result column="config_json" property="ruleConfig" typeHandler="com.baomidou.mybatisplus.extension.handlers.JacksonTypeHandler"/>
<result column="rule_sql" property="ruleSql" />
</resultMap>
<!-- 通用查询结果列 -->
......@@ -44,7 +45,7 @@
update_time,
remark,
rule_name, rule_type_id, rule_item_id, rule_level_id, rule_source_id, rule_source, rule_table_id, rule_table, rule_table_comment
rule_column_id, rule_column, rule_column_comment, rule_sql, last_check_batch
rule_column_id, rule_column, rule_column_comment, last_check_batch
</sql>
<sql id="Rule_Column_List">
......@@ -57,7 +58,7 @@
${alias}.update_time,
${alias}.remark,
${alias}.rule_name, ${alias}.rule_type_id, ${alias}.rule_item_id, ${alias}.rule_level_id, ${alias}.rule_source_id, ${alias}.rule_source,
${alias}.rule_table_id, ${alias}.rule_table, ${alias}.rule_table_comment, ${alias}.rule_column_id, ${alias}.rule_column, ${alias}.rule_column_comment, ${alias}.rule_sql, ${alias}.last_check_batch
${alias}.rule_table_id, ${alias}.rule_table, ${alias}.rule_table_comment, ${alias}.rule_column_id, ${alias}.rule_column, ${alias}.rule_column_comment, ${alias}.last_check_batch
</sql>
<select id="selectPage" resultMap="ExtendResultMap">
......@@ -69,4 +70,18 @@
${ew.customSqlSegment}
</select>
<select id="selectById" resultMap="ExtendResultMap">
SELECT config_json,
<include refid="Rule_Column_List"></include>
FROM quality_check_rule
WHERE 1=1 AND id = #{id}
</select>
<select id="selectList" resultMap="ExtendResultMap">
SELECT rule_sql,
<include refid="Base_Column_List"></include>
FROM quality_check_rule
${ew.customSqlSegment}
</select>
</mapper>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment