Commit 66919cc4 by yuwei

项目初始化

parent 601e6de6
package cn.datax.service.data.quality.schedule.exception;
import cn.datax.service.data.quality.schedule.exception.util.ExceptionMessageFormat;
import cn.datax.service.data.quality.schedule.exception.util.factory.ExceptionMsgFormatFactory;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 子线程异常,子线程出现异常时抛出
*/
public class ChildThreadException extends Exception {
/**
* serialVersionUID
*/
private static final long serialVersionUID=1L;
/**
* 子线程的异常列表
*/
private List<Exception> exceptionList;
/**
* 异常信息格式化工具
*/
private ExceptionMessageFormat formatter;
/**
* 锁
*/
private Lock lock;
public ChildThreadException() {
super();
initial();
}
public ChildThreadException(String message) {
super(message);
initial();
}
public ChildThreadException(String message, StackTraceElement[] stackTrace) {
this(message);
setStackTrace(stackTrace);
}
private void initial() {
exceptionList = new ArrayList<Exception>();
lock = new ReentrantLock();
formatter = ExceptionMsgFormatFactory.getInstance().getFormatter(ExceptionMsgFormatFactory.STACK_TRACE);
}
/**
* 子线程是否有异常
* @return
*/
public boolean hasException() {
return exceptionList.size() > 0;
}
/**
* 添加子线程的异常
* @param e
*/
public void addException(Exception e) {
try {
lock.lock();
e.setStackTrace(e.getStackTrace());
exceptionList.add(e);
} finally {
lock.unlock();
}
}
/**
* 获取子线程的异常列表
* @return
*/
public List<Exception> getExceptionList() {
return exceptionList;
}
/**
* 清空子线程的异常列表
*/
public void clearExceptionList() {
exceptionList.clear();
}
/**
* 获取所有子线程异常的堆栈跟踪信息
* @return
*/
public String getAllStackTraceMessage() {
StringBuffer sb = new StringBuffer();
for (Exception e : exceptionList) {
sb.append(e.getClass().getName());
sb.append(": ");
sb.append(e.getMessage());
sb.append("\n");
sb.append(formatter.formate(e));
}
return sb.toString();
}
/**
* 打印所有子线程的异常的堆栈跟踪信息
*/
public void printAllStackTrace() {
printAllStackTrace(System.err);
}
/**
* 打印所有子线程的异常的堆栈跟踪信息
* @param s
*/
public void printAllStackTrace(PrintStream s) {
for (Exception e : exceptionList) {
e.printStackTrace(s);
}
}
}
package cn.datax.service.data.quality.schedule.exception.util;
/**
* 默认异常信息格式化工具
*/
public class DefaultExceptionMsgHandler implements ExceptionMessageFormat {
private DefaultExceptionMsgHandler() {
}
private static class SingletonHolder{
private static final DefaultExceptionMsgHandler instance = new DefaultExceptionMsgHandler();
}
public static DefaultExceptionMsgHandler getInstance(){
return SingletonHolder.instance;
}
/**
* 格式化异常信息
*/
@Override
public String formate(Exception e) {
return e.getMessage() + "\n";
}
}
package cn.datax.service.data.quality.schedule.exception.util;
/**
* 异常信息格式化
*/
public interface ExceptionMessageFormat {
public String formate(Exception e);
}
package cn.datax.service.data.quality.schedule.exception.util;
/**
* 堆栈跟踪信息格式化工具
*/
public class StackTraceMsgHandler implements ExceptionMessageFormat {
private StackTraceMsgHandler() {
}
private static class SingletonHolder {
private static final StackTraceMsgHandler instance = new StackTraceMsgHandler();
}
public static StackTraceMsgHandler getInstance() {
return SingletonHolder.instance;
}
/**
* 格式化堆栈跟踪信息
*/
@Override
public String formate(Exception e) {
StackTraceElement[] stackTrace = e.getStackTrace();
StringBuffer sb = new StringBuffer();
for (StackTraceElement stackTraceElement : stackTrace) {
sb.append("\tat " + stackTraceElement + "\n");
}
return sb.toString();
}
}
package cn.datax.service.data.quality.schedule.exception.util.factory;
import cn.datax.service.data.quality.schedule.exception.util.DefaultExceptionMsgHandler;
import cn.datax.service.data.quality.schedule.exception.util.ExceptionMessageFormat;
import cn.datax.service.data.quality.schedule.exception.util.StackTraceMsgHandler;
/**
* 异常信息格式化工厂
*/
public class ExceptionMsgFormatFactory {
public static final String STACK_TRACE = "StackTraceHandler";
private ExceptionMsgFormatFactory() {
}
private static class SingletonHolder {
private static final ExceptionMsgFormatFactory instance = new ExceptionMsgFormatFactory();
}
public static ExceptionMsgFormatFactory getInstance() {
return SingletonHolder.instance;
}
/**
* 获取格式化工具
*
* @param formatterName
* @return
*/
public ExceptionMessageFormat getFormatter(String formatterName) {
switch (formatterName) {
case STACK_TRACE:
return StackTraceMsgHandler.getInstance();
default:
break;
}
return DefaultExceptionMsgHandler.getInstance();
}
}
......@@ -2,12 +2,12 @@ package cn.datax.service.data.quality.schedule.task;
import cn.datax.common.core.DataConstant;
import cn.datax.common.database.DataSourceFactory;
import cn.datax.common.database.DbQuery;
import cn.datax.common.database.constants.DbQueryProperty;
import cn.datax.service.data.metadata.api.dto.DbSchema;
import cn.datax.service.data.metadata.api.entity.MetadataSourceEntity;
import cn.datax.service.data.metadata.api.feign.MetadataSourceServiceFeign;
import cn.datax.service.data.quality.api.entity.CheckReportEntity;
import cn.datax.service.data.quality.api.entity.CheckRuleEntity;
import cn.datax.service.data.quality.schedule.exception.ChildThreadException;
import cn.datax.service.data.quality.schedule.thread.MultiThreadHandler;
import cn.datax.service.data.quality.schedule.thread.parallel.ParallelTaskWithThreadPool;
import cn.datax.service.data.quality.service.CheckRuleService;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.extern.slf4j.Slf4j;
......@@ -15,7 +15,7 @@ import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
......@@ -35,44 +35,28 @@ public class QualityTask {
public void task(Map<String, Object> map) {
System.out.println("执行批次:" + map);
// 结果集
List<CheckReportEntity> result = new ArrayList<>();
// 获取可执行的核查规则
List<CheckRuleEntity> list = checkRuleService.list(Wrappers.<CheckRuleEntity>lambdaQuery().eq(CheckRuleEntity::getStatus, DataConstant.TrueOrFalse.TRUE.getKey()));
int poolSize = list.size();
// 定义固定长度的线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(16),
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(50),
new BasicThreadFactory.Builder().namingPattern("executor-schedule-pool-%d").daemon(true).build());
// 定义计数器
final CountDownLatch latch = new CountDownLatch(poolSize);
list.stream().forEach(s -> executorTask(map, threadPoolExecutor, latch, s));
// 主线程阻塞,等待所有子线程执行完成
MultiThreadHandler handler = new ParallelTaskWithThreadPool(threadPoolExecutor);
// 启动子线程作为要处理的并行任务
list.stream().forEach(rule -> {
TaskHander task = new TaskHander(metadataSourceServiceFeign, dataSourceFactory, rule, result);
handler.addTask(task);
});
try {
latch.await();
} catch (InterruptedException e) {}
handler.run();
} catch (ChildThreadException e) {
System.out.println(e.getAllStackTraceMessage());
}
// 关闭线程池
threadPoolExecutor.shutdown();
}
private void executorTask(Map<String, Object> map, ThreadPoolExecutor threadPoolExecutor, CountDownLatch latch, CheckRuleEntity checkRuleEntity) {
MetadataSourceEntity dataSource = null;
try {
dataSource = metadataSourceServiceFeign.getMetadataSourceById(checkRuleEntity.getRuleSourceId());
} catch (Exception e) {
e.printStackTrace();
}
if (dataSource != null) {
DbSchema dbSchema = dataSource.getDbSchema();
DbQueryProperty dbQueryProperty = new DbQueryProperty(dataSource.getDbType(), dbSchema.getHost(),
dbSchema.getUsername(), dbSchema.getPassword(), dbSchema.getPort(), dbSchema.getDbName(), dbSchema.getSid());
DbQuery dbQuery = dataSourceFactory.createDbQuery(dbQueryProperty);
Connection conn = dbQuery.getConnection();
Future<Map<String, Object>> future = threadPoolExecutor.submit(new TaskHander(checkRuleEntity.getRuleSql(), latch, conn));
try {
System.out.println("任务执行结果:" + future.get());
} catch (Exception e) {
e.printStackTrace();
}
// 核查报告
}
// 核查报告
System.out.println(result);
}
}
package cn.datax.service.data.quality.schedule.task;
import cn.datax.common.database.DataSourceFactory;
import cn.datax.common.database.DbQuery;
import cn.datax.common.database.constants.DbQueryProperty;
import cn.datax.common.exception.DataException;
import cn.datax.service.data.metadata.api.dto.DbSchema;
import cn.datax.service.data.metadata.api.entity.MetadataSourceEntity;
import cn.datax.service.data.metadata.api.feign.MetadataSourceServiceFeign;
import cn.datax.service.data.quality.api.entity.CheckReportEntity;
import cn.datax.service.data.quality.api.entity.CheckRuleEntity;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
public class TaskHander implements Runnable {
public class TaskHander implements Callable<Map<String, Object>> {
private MetadataSourceServiceFeign metadataSourceServiceFeign;
private CountDownLatch latch;
private DataSourceFactory dataSourceFactory;
private Connection conn;
private CheckRuleEntity checkRuleEntity;
private String sql;
private List<CheckReportEntity> list;
public TaskHander() {
super();
}
public TaskHander(String sql, CountDownLatch latch, Connection conn) {
public TaskHander(MetadataSourceServiceFeign metadataSourceServiceFeign, DataSourceFactory dataSourceFactory, CheckRuleEntity checkRuleEntity, List<CheckReportEntity> list) {
super();
this.sql = sql;
this.latch = latch;
this.conn = conn;
this.metadataSourceServiceFeign = metadataSourceServiceFeign;
this.dataSourceFactory = dataSourceFactory;
this.checkRuleEntity = checkRuleEntity;
this.list = list;
}
@Override
public Map<String, Object> call() throws Exception {
System.out.println(Thread.currentThread().getName() + "线程开始执行");
System.out.println("执行sql:" + sql);
public void run() {
System.out.println(Thread.currentThread().getName() + "线程执行开始");
CheckReportEntity checkReportEntity = new CheckReportEntity();
checkReportEntity.setCheckRuleId(checkRuleEntity.getId());
checkReportEntity.setCheckDate(LocalDateTime.now());
MetadataSourceEntity dataSource = null;
Optional<MetadataSourceEntity> dataSourceOptional = Optional.ofNullable(metadataSourceServiceFeign.getMetadataSourceById(checkRuleEntity.getRuleSourceId()));
if (dataSourceOptional.isPresent()) {
dataSource = dataSourceOptional.get();
} else {
checkReportEntity.setCheckResult("获取数据源接口出错");
list.add(checkReportEntity);
dataSourceOptional.orElseThrow(DataException::new);
}
DbSchema dbSchema = dataSource.getDbSchema();
DbQueryProperty dbQueryProperty = new DbQueryProperty(dataSource.getDbType(), dbSchema.getHost(),
dbSchema.getUsername(), dbSchema.getPassword(), dbSchema.getPort(), dbSchema.getDbName(), dbSchema.getSid());
DbQuery dbQuery = null;
Optional<DbQuery> dbQueryOptional = Optional.ofNullable(dataSourceFactory.createDbQuery(dbQueryProperty));
if (dbQueryOptional.isPresent()) {
dbQuery = dbQueryOptional.get();
} else {
checkReportEntity.setCheckResult("创建数据查询接口出错");
list.add(checkReportEntity);
dbQueryOptional.orElseThrow(DataException::new);
}
Connection conn = dbQuery.getConnection();
Statement stmt = null;
ResultSet rs = null;
try {
stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
rs = stmt.executeQuery(sql);
rs = stmt.executeQuery(checkRuleEntity.getRuleSql());
while (rs.next()) {
}
list.add(checkReportEntity);
} catch (Exception e) {
e.printStackTrace();
checkReportEntity.setCheckResult(e.getMessage());
list.add(checkReportEntity);
throw new DataException(e);
} finally {
try {
if(rs != null){
......@@ -52,10 +91,12 @@ public class TaskHander implements Callable<Map<String, Object>> {
if (conn != null) {
conn.close();
}
} catch (SQLException e) {}
} catch (SQLException e) {
checkReportEntity.setCheckResult("释放数据库连接出错");
list.add(checkReportEntity);
throw new DataException("释放数据库连接出错");
}
}
latch.countDown();
Map<String, Object> map = new HashMap<>();
return map;
System.out.println(Thread.currentThread().getName() + "线程执行结束");
}
}
package cn.datax.service.data.quality.schedule.thread;
import cn.datax.service.data.quality.schedule.exception.ChildThreadException;
/**
* 多任务处理
*/
public interface MultiThreadHandler {
/**
* 添加任务
* @param tasks
*/
void addTask(Runnable... tasks);
/**
* 执行任务
* @throws ChildThreadException
*/
void run() throws ChildThreadException;
}
package cn.datax.service.data.quality.schedule.thread.parallel;
import cn.datax.service.data.quality.schedule.exception.ChildThreadException;
import cn.datax.service.data.quality.schedule.thread.MultiThreadHandler;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* 并行线程处理
*/
public abstract class AbstractMultiParallelThreadHandler implements MultiThreadHandler {
/**
* 子线程倒计数锁
*/
protected CountDownLatch childLatch;
/**
* 任务列表
*/
protected List<Runnable> taskList;
/**
* 子线程异常
*/
protected ChildThreadException childThreadException;
public AbstractMultiParallelThreadHandler() {
taskList = new ArrayList<Runnable>();
childThreadException = new ChildThreadException();
}
public void setCountDownLatch(CountDownLatch latch) {
this.childLatch = latch;
}
/**
* {@inheritDoc}
*/
@Override
public void addTask(Runnable... tasks) {
if (null == tasks) {
taskList = new ArrayList<Runnable>();
}
for (Runnable task : tasks) {
taskList.add(task);
}
}
/**
* {@inheritDoc}
*/
@Override
public abstract void run() throws ChildThreadException;
}
package cn.datax.service.data.quality.schedule.thread.parallel;
import cn.datax.service.data.quality.schedule.exception.ChildThreadException;
import java.util.concurrent.CountDownLatch;
/**
* 并行任务参数
*/
public class MultiParallelContext {
/**
* 运行的任务
*/
private Runnable task;
/**
* 子线程倒计数锁
*/
private CountDownLatch childLatch;
/**
* 子线程异常
*/
private ChildThreadException childException;
public MultiParallelContext() {
}
public MultiParallelContext(Runnable task, CountDownLatch childLatch, ChildThreadException childException) {
this.task = task;
this.childLatch = childLatch;
this.childException = childException;
}
public Runnable getTask() {
return task;
}
public void setTask(Runnable task) {
this.task = task;
}
public CountDownLatch getChildLatch() {
return childLatch;
}
public void setChildLatch(CountDownLatch childLatch) {
this.childLatch = childLatch;
}
public ChildThreadException getChildException() {
return childException;
}
public void setChildException(ChildThreadException childException) {
this.childException = childException;
}
}
package cn.datax.service.data.quality.schedule.thread.parallel;
/**
* 并行线程对象
*/
public class MultiParallelRunnable implements Runnable {
/**
* 并行任务参数
*/
private MultiParallelContext context;
/**
* 构造函数
* @param context
*/
public MultiParallelRunnable(MultiParallelContext context) {
this.context = context;
}
/**
* 运行任务
*/
@Override
public void run() {
try {
context.getTask().run();
} catch (Exception e) {
e.printStackTrace();
context.getChildException().addException(e);
} finally {
context.getChildLatch().countDown();
}
}
}
package cn.datax.service.data.quality.schedule.thread.parallel;
import cn.datax.service.data.quality.schedule.exception.ChildThreadException;
import java.util.concurrent.CountDownLatch;
/**
* 并行任务处理工具
*/
public class MultiParallelThreadHandler extends AbstractMultiParallelThreadHandler {
/**
* 无参构造器
*/
public MultiParallelThreadHandler() {
super();
}
/**
* 根据任务数量运行任务
*/
@Override
public void run() throws ChildThreadException {
if (null == taskList || taskList.size() == 0) {
return;
} else if (taskList.size() == 1) {
runWithoutNewThread();
} else if (taskList.size() > 1) {
runInNewThread();
}
}
/**
* 新建线程运行任务
*
* @throws ChildThreadException
*/
private void runInNewThread() throws ChildThreadException {
childLatch = new CountDownLatch(taskList.size());
childThreadException.clearExceptionList();
for (Runnable task : taskList) {
invoke(new MultiParallelRunnable(new MultiParallelContext(task, childLatch, childThreadException)));
}
taskList.clear();
try {
childLatch.await();
} catch (InterruptedException e) {
childThreadException.addException(e);
}
throwChildExceptionIfRequired();
}
/**
* 默认线程执行方法
*
* @param command
*/
protected void invoke(Runnable command) {
if(command.getClass().isAssignableFrom(Thread.class)){
Thread.class.cast(command).start();
}else{
new Thread(command).start();
}
}
/**
* 在当前线程中直接运行
*
* @throws ChildThreadException
*/
private void runWithoutNewThread() throws ChildThreadException {
try {
taskList.get(0).run();
} catch (Exception e) {
childThreadException.addException(e);
}
throwChildExceptionIfRequired();
}
/**
* 根据需要抛出子线程异常
*
* @throws ChildThreadException
*/
private void throwChildExceptionIfRequired() throws ChildThreadException {
if (childThreadException.hasException()) {
childExceptionHandler(childThreadException);
}
}
/**
* 默认抛出子线程异常
* @param e
* @throws ChildThreadException
*/
protected void childExceptionHandler(ChildThreadException e) throws ChildThreadException {
throw e;
}
}
package cn.datax.service.data.quality.schedule.thread.parallel;
import java.util.concurrent.ExecutorService;
/**
* 使用线程池运行并行任务
*/
public class ParallelTaskWithThreadPool extends MultiParallelThreadHandler {
private ExecutorService service;
public ParallelTaskWithThreadPool() {
}
public ParallelTaskWithThreadPool(ExecutorService service) {
this.service = service;
}
public ExecutorService getService() {
return service;
}
public void setService(ExecutorService service) {
this.service = service;
}
/**
* 使用线程池运行
*/
@Override
protected void invoke(Runnable command) {
if(null != service){
service.execute(command);
}else{
super.invoke(command);
}
}
}
package cn.datax.service.data.quality.schedule.thread.test;
import cn.datax.service.data.quality.schedule.exception.ChildThreadException;
import cn.datax.service.data.quality.schedule.thread.MultiThreadHandler;
import cn.datax.service.data.quality.schedule.thread.parallel.MultiParallelThreadHandler;
import cn.datax.service.data.quality.schedule.thread.parallel.ParallelTaskWithThreadPool;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestCase implements Runnable {
private String name;
private Map<String, Object> result;
public TestCase(String name, Map<String, Object> result) {
this.name = name;
this.result = result;
}
@Override
public void run() {
// 模拟线程执行1000ms
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 模拟线程1和线程3抛出异常
if(name.equals("1") || name.equals("3")) {
throw new RuntimeException(name + ": throw exception");
}
result.put(name, "complete part " + name + "!");
}
// public static void main(String[] args) {
// System.out.println("main begin \t=================");
// Map<String, Object> resultMap = new HashMap<>(8, 1);
//// MultiThreadHandler handler = new MultiParallelThreadHandler();
// ExecutorService service = Executors.newFixedThreadPool(3);
// MultiThreadHandler handler = new ParallelTaskWithThreadPool(service);
// TestCase task = null;
// // 启动5个子线程作为要处理的并行任务,共同完成结果集resultMap
// for(int i=1; i<=5 ; i++){
// task = new TestCase("" + i, resultMap);
// handler.addTask(task);
// }
// try {
// handler.run();
// } catch (ChildThreadException e) {
// System.out.println(e.getAllStackTraceMessage());
// }
// System.out.println(resultMap);
// service.shutdown();
// System.out.println("main end \t=================");
// }
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment