Commit 601e6de6 by yuwei

项目初始化

parent a13adf79
...@@ -144,24 +144,17 @@ public class DateHander extends CallableTemplate<SqlConsoleVo> { ...@@ -144,24 +144,17 @@ public class DateHander extends CallableTemplate<SqlConsoleVo> {
} }
} }
} finally { } finally {
if(rs != null){
try { try {
if(rs != null){
rs.close(); rs.close();
} catch (SQLException e) {
} }
} if (stmt != null) {
if(stmt != null){
try {
stmt.close(); stmt.close();
} catch (SQLException e) {
} }
} if (conn != null) {
if(conn != null){
try {
conn.close(); conn.close();
} catch (SQLException e) {
}
} }
} catch (SQLException e) {}
} }
latch.countDown(); latch.countDown();
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
......
...@@ -69,6 +69,16 @@ ...@@ -69,6 +69,16 @@
<artifactId>data-quality-service-api</artifactId> <artifactId>data-quality-service-api</artifactId>
<version>2.0.0</version> <version>2.0.0</version>
</dependency> </dependency>
<dependency>
<groupId>cn.datax</groupId>
<artifactId>datax-common-database</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>cn.datax</groupId>
<artifactId>data-metadata-service-api</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
......
...@@ -4,7 +4,7 @@ import org.springframework.boot.SpringApplication; ...@@ -4,7 +4,7 @@ import org.springframework.boot.SpringApplication;
import org.springframework.cloud.client.SpringCloudApplication; import org.springframework.cloud.client.SpringCloudApplication;
import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.cloud.openfeign.EnableFeignClients;
@EnableFeignClients(basePackages = {"cn.datax.service.system.api.feign"}) @EnableFeignClients(basePackages = {"cn.datax.service.system.api.feign", "cn.datax.service.data.metadata.api.feign"})
@SpringCloudApplication @SpringCloudApplication
public class DataxQualityApplication { public class DataxQualityApplication {
......
...@@ -37,13 +37,13 @@ public class SchedulingRunnable implements Runnable { ...@@ -37,13 +37,13 @@ public class SchedulingRunnable implements Runnable {
public void run() { public void run() {
log.info("定时任务开始执行 - bean:{},方法:{},参数:{}", beanName, methodName, params); log.info("定时任务开始执行 - bean:{},方法:{},参数:{}", beanName, methodName, params);
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
Map map = new HashMap(); Map<String, Object> map = new HashMap<>();
String batch; String batch;
try { try {
Object target = SpringContextHolder.getBean(beanName); Object target = SpringContextHolder.getBean(beanName);
Method method = target.getClass().getDeclaredMethod(methodName, Map.class); Method method = target.getClass().getDeclaredMethod(methodName, Map.class);
if (StrUtil.isNotEmpty(params)) { if (StrUtil.isNotEmpty(params)) {
map = new ObjectMapper().readValue(params, Map.class); map.putAll(new ObjectMapper().readValue(params, Map.class));
} }
batch = DateUtil.format(LocalDateTime.now(), DatePattern.PURE_DATETIME_PATTERN); batch = DateUtil.format(LocalDateTime.now(), DatePattern.PURE_DATETIME_PATTERN);
map.put("batch", batch); map.put("batch", batch);
......
package cn.datax.service.data.quality.schedule.task; package cn.datax.service.data.quality.schedule.task;
import cn.datax.common.core.DataConstant; import cn.datax.common.core.DataConstant;
import cn.datax.common.database.DataSourceFactory;
import cn.datax.common.database.DbQuery;
import cn.datax.common.database.constants.DbQueryProperty;
import cn.datax.service.data.metadata.api.dto.DbSchema;
import cn.datax.service.data.metadata.api.entity.MetadataSourceEntity;
import cn.datax.service.data.metadata.api.feign.MetadataSourceServiceFeign;
import cn.datax.service.data.quality.api.entity.CheckRuleEntity; import cn.datax.service.data.quality.api.entity.CheckRuleEntity;
import cn.datax.service.data.quality.service.CheckRuleService; import cn.datax.service.data.quality.service.CheckRuleService;
import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.core.toolkit.Wrappers;
...@@ -9,13 +15,10 @@ import org.apache.commons.lang3.concurrent.BasicThreadFactory; ...@@ -9,13 +15,10 @@ import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.time.LocalDateTime; import java.sql.Connection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
@Component("qualityTask") @Component("qualityTask")
...@@ -24,7 +27,13 @@ public class QualityTask { ...@@ -24,7 +27,13 @@ public class QualityTask {
@Autowired @Autowired
private CheckRuleService checkRuleService; private CheckRuleService checkRuleService;
public void task(Map map) { @Autowired
private DataSourceFactory dataSourceFactory;
@Autowired
private MetadataSourceServiceFeign metadataSourceServiceFeign;
public void task(Map<String, Object> map) {
System.out.println("执行批次:" + map); System.out.println("执行批次:" + map);
// 获取可执行的核查规则 // 获取可执行的核查规则
List<CheckRuleEntity> list = checkRuleService.list(Wrappers.<CheckRuleEntity>lambdaQuery().eq(CheckRuleEntity::getStatus, DataConstant.TrueOrFalse.TRUE.getKey())); List<CheckRuleEntity> list = checkRuleService.list(Wrappers.<CheckRuleEntity>lambdaQuery().eq(CheckRuleEntity::getStatus, DataConstant.TrueOrFalse.TRUE.getKey()));
...@@ -35,12 +44,7 @@ public class QualityTask { ...@@ -35,12 +44,7 @@ public class QualityTask {
new BasicThreadFactory.Builder().namingPattern("executor-schedule-pool-%d").daemon(true).build()); new BasicThreadFactory.Builder().namingPattern("executor-schedule-pool-%d").daemon(true).build());
// 定义计数器 // 定义计数器
final CountDownLatch latch = new CountDownLatch(poolSize); final CountDownLatch latch = new CountDownLatch(poolSize);
list.stream().forEach(s -> { list.stream().forEach(s -> executorTask(map, threadPoolExecutor, latch, s));
threadPoolExecutor.execute(() -> {
log.info(s.getRuleName() + ":" + LocalDateTime.now());
latch.countDown();
});
});
// 主线程阻塞,等待所有子线程执行完成 // 主线程阻塞,等待所有子线程执行完成
try { try {
latch.await(); latch.await();
...@@ -48,4 +52,27 @@ public class QualityTask { ...@@ -48,4 +52,27 @@ public class QualityTask {
// 关闭线程池 // 关闭线程池
threadPoolExecutor.shutdown(); threadPoolExecutor.shutdown();
} }
private void executorTask(Map<String, Object> map, ThreadPoolExecutor threadPoolExecutor, CountDownLatch latch, CheckRuleEntity checkRuleEntity) {
MetadataSourceEntity dataSource = null;
try {
dataSource = metadataSourceServiceFeign.getMetadataSourceById(checkRuleEntity.getRuleSourceId());
} catch (Exception e) {
e.printStackTrace();
}
if (dataSource != null) {
DbSchema dbSchema = dataSource.getDbSchema();
DbQueryProperty dbQueryProperty = new DbQueryProperty(dataSource.getDbType(), dbSchema.getHost(),
dbSchema.getUsername(), dbSchema.getPassword(), dbSchema.getPort(), dbSchema.getDbName(), dbSchema.getSid());
DbQuery dbQuery = dataSourceFactory.createDbQuery(dbQueryProperty);
Connection conn = dbQuery.getConnection();
Future<Map<String, Object>> future = threadPoolExecutor.submit(new TaskHander(checkRuleEntity.getRuleSql(), latch, conn));
try {
System.out.println("任务执行结果:" + future.get());
} catch (Exception e) {
e.printStackTrace();
}
// 核查报告
}
}
} }
package cn.datax.service.data.quality.schedule.task;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
public class TaskHander implements Callable<Map<String, Object>> {
private CountDownLatch latch;
private Connection conn;
private String sql;
public TaskHander() {
super();
}
public TaskHander(String sql, CountDownLatch latch, Connection conn) {
super();
this.sql = sql;
this.latch = latch;
this.conn = conn;
}
@Override
public Map<String, Object> call() throws Exception {
System.out.println(Thread.currentThread().getName() + "线程开始执行");
System.out.println("执行sql:" + sql);
Statement stmt = null;
ResultSet rs = null;
try {
stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
rs = stmt.executeQuery(sql);
while (rs.next()) {
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if(rs != null){
rs.close();
}
if (stmt != null) {
stmt.close();
}
if (conn != null) {
conn.close();
}
} catch (SQLException e) {}
}
latch.countDown();
Map<String, Object> map = new HashMap<>();
return map;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment