Commit 26195db8 by yuwei

2.0.0项目初始化

parent 1dfee283
......@@ -3,8 +3,10 @@ package cn.datax.common.base;
import com.baomidou.mybatisplus.annotation.FieldFill;
import com.baomidou.mybatisplus.annotation.TableField;
import lombok.Data;
import lombok.EqualsAndHashCode;
@Data
@EqualsAndHashCode(callSuper = true)
public abstract class DataScopeBaseEntity extends BaseEntity {
private static final long serialVersionUID = 1L;
......
......@@ -5,4 +5,6 @@ public class ValidationGroups {
public interface Insert{};
public interface Update{};
public interface Other{};
}
......@@ -4,6 +4,7 @@ import cn.datax.common.database.core.DbColumn;
import cn.datax.common.database.core.DbTable;
import cn.datax.common.database.core.PageResult;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
......@@ -17,9 +18,14 @@ import java.util.Map;
public interface DbQuery {
/**
* 获取数据库连接
*/
Connection getConnection();
/**
* 检测连通性
*/
boolean valid() throws SQLException;
boolean valid();
/**
* 关闭数据源
......
......@@ -27,13 +27,32 @@ public class DbQueryTemplate implements DbQuery {
protected DbDialect dbDialect;
@Override
public boolean valid() throws SQLException {
Connection conn = dataSource.getConnection();
boolean valid = conn.isValid(0);
if (conn != null) {
conn.close();
public Connection getConnection() {
try {
return dataSource.getConnection();
} catch (SQLException e) {
throw new DataQueryException("获取数据库连接出错");
}
return valid;
}
@Override
public boolean valid() {
Connection conn = null;
try {
conn = dataSource.getConnection();
return conn.isValid(0);
} catch (SQLException e) {
throw new DataQueryException("检测连通性出错");
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
throw new DataQueryException("关闭数据库连接出错");
}
}
}
}
@Override
......
......@@ -64,6 +64,15 @@ hystrix:
isolation:
thread:
timeoutInMilliseconds: 60000 #断路器超时时间,默认1000ms
dataSqlConsoleHystrix: #sql工作台方法的超时时间 60s
fallback:
enabled: true
execution:
timeout:
enabled: true
isolation:
thread:
timeoutInMilliseconds: 60000 #断路器超时时间,默认1000ms
shareSecurityContext: true
#请求处理的超时时间
......
......@@ -95,4 +95,16 @@ spring:
- name: Hystrix
args:
name: dataApiCallHystrix
fallbackUri: forward:/fallback
# 数据SQL工作台
- id: datax-service-data-sql-console
uri: lb://datax-service-data-sql-console
predicates:
- Path=/data/console/**
filters:
- SwaggerHeaderFilter
- StripPrefix=2
- name: Hystrix
args:
name: dataSqlConsoleHystrix
fallbackUri: forward:/fallback
\ No newline at end of file
# 数据源配置
spring:
redis:
database: 1
host: 127.0.0.1
port: 6379
password: # 密码(默认为空)
timeout: 6000ms # 连接超时时长(毫秒)
lettuce:
pool:
max-active: 1000 # 连接池最大连接数(使用负值表示没有限制)
max-wait: -1ms # 连接池最大阻塞等待时间(使用负值表示没有限制)
max-idle: 10 # 连接池中的最大空闲连接
min-idle: 5 # 连接池中的最小空闲连接
datasource:
dynamic:
type: com.zaxxer.hikari.HikariDataSource
hikari:
auto-commit: false
connection-timeout: 30000
idle-timeout: 25000
login-timeout: 5
max-lifetime: 30000
read-only: false
validation-timeout: 3000
maximum-pool-size: 15
minimum-idle: 5
pool-name: DataxHikariCP
connection-test-query: SELECT 1 FROM DUAL
data-source-properties:
cachePrepStmts: true
prepStmtCacheSize: 250
prepStmtCacheSqlLimit: 2048
useServerPrepStmts: true
useLocalSessionState: true
rewriteBatchedStatements: true
cacheResultSetMetadata: true
cacheServerConfiguration: true
elideSetAutoCommits: true
maintainTimeStats: false
primary: mysql
datasource:
mysql:
driver-class-name: com.p6spy.engine.spy.P6SpyDriver
url: jdbc:p6spy:mysql://127.0.0.1:3306/data_cloud?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
username: root
password: 1234@abcd
mybatis-plus:
mapper-locations: classpath*:mapper/*Mapper.xml
type-aliases-package: cn.datax.service.data.factory.api.entity
global-config:
db-config:
id-type: ASSIGN_ID
banner: false
configuration:
map-underscore-to-camel-case: true
cache-enabled: false
call-setters-on-nulls: true
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
# spring security 配置
security:
oauth2:
client:
access-token-uri: http://localhost:8612/auth/oauth/token
user-authorization-uri: http://localhost:8612/auth/oauth/authorize
client-id: datax
client-secret: 123456
scope: all
resource:
loadBalanced: true
token-info-uri: http://localhost:8612/auth/oauth/check_token
\ No newline at end of file
package cn.datax.service.data.factory.api.dto;
import cn.datax.common.validate.ValidationGroups;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import javax.validation.constraints.NotBlank;
import java.io.Serializable;
@Data
public class SqlConsoleDto implements Serializable {
private static final long serialVersionUID=1L;
@ApiModelProperty(value = "当前时间戳")
@NotBlank(message = "时间戳不能为空", groups = {ValidationGroups.Other.class})
private String sqlKey;
@ApiModelProperty(value = "数据源")
@NotBlank(message = "数据源不能为空")
private String sourceId;
@ApiModelProperty(value = "SQL文本")
@NotBlank(message = "SQL不能为空")
private String sqlText;
}
package cn.datax.service.data.factory.api.vo;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
@Data
public class SqlConsoleVo implements Serializable {
private static final long serialVersionUID=1L;
private String sql;
private Long time;
private Boolean success;
private Integer count;
private List<Map<String, Object>> data;
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>data-factory-service-parent</artifactId>
<groupId>cn.datax</groupId>
<version>2.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<version>2.0.0</version>
<description>SQL工作台</description>
<artifactId>data-factory-service-sql-console</artifactId>
<dependencies>
<!--web 模块-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<artifactId>spring-boot-starter-tomcat</artifactId>
<groupId>org.springframework.boot</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-undertow</artifactId>
</dependency>
<!--配置中心客户端 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct</artifactId>
<version>${mapstruct.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct-processor</artifactId>
<version>${mapstruct.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>cn.datax</groupId>
<artifactId>datax-common-mybatis</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>cn.datax</groupId>
<artifactId>datax-common-redis</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>cn.datax</groupId>
<artifactId>datax-common-security</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>cn.datax</groupId>
<artifactId>datax-common-database</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>cn.datax</groupId>
<artifactId>datax-common-log</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>cn.datax</groupId>
<artifactId>system-service-api</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>cn.datax</groupId>
<artifactId>data-factory-service-api</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package cn.datax.service.data.factory.sql.console;
import cn.datax.common.log.annotation.EnableDataLog;
import cn.datax.common.mybatis.annotation.EnableDataMybatis;
import cn.datax.common.redis.annotation.EnableDataRedis;
import cn.datax.common.security.annotation.EnableDataServerProtect;
import org.springframework.boot.SpringApplication;
import org.springframework.cloud.client.SpringCloudApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
@EnableDataServerProtect
@EnableDataMybatis
@EnableDataRedis
@EnableDataLog
@EnableFeignClients(basePackages = {"cn.datax.service.system.api.feign", "cn.datax.service.data.factory.api.feign"})
@SpringCloudApplication
public class DataSqlConsoleApplication {
public static void main(String[] args) {
SpringApplication.run(DataSqlConsoleApplication.class);
}
}
package cn.datax.service.data.factory.sql.console.concurrent;
import lombok.extern.slf4j.Slf4j;
import java.sql.SQLException;
import java.util.concurrent.Callable;
/**
* 多线程模板类
*/
@Slf4j
public abstract class CallableTemplate<V> implements Callable<V> {
/**
* 前置处理,子类可以Override该方法
*/
public void beforeProcess() {
log.info("before process....");
}
/**
* 处理业务逻辑的方法,需要子类去Override
* @return
*/
public abstract V process();
/**
* 后置处理,子类可以Override该方法
*/
public void afterProcess() {
log.info("after process....");
}
@Override
public V call() {
beforeProcess();
V result = process();
afterProcess();
return result;
}
}
package cn.datax.service.data.factory.sql.console.concurrent;
import cn.datax.service.data.factory.api.vo.SqlConsoleVo;
import lombok.extern.slf4j.Slf4j;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@Slf4j
public class DateHander extends CallableTemplate<SqlConsoleVo> {
private CountDownLatch latch;
private Connection conn;
private String sql;
public DateHander(CountDownLatch latch, Connection conn, String sql) {
this.latch = latch;
this.conn = conn;
this.sql = sql;
}
@Override
public SqlConsoleVo process() {
log.info("执行sql:" + sql);
long start = System.currentTimeMillis();
Statement stmt = null;
ResultSet rs = null;
// 将查询数据存储到数据中
List<Map<String, Object>> list = new ArrayList<>();
// 新增、修改、删除受影响行数
Integer updateCount = null;
SqlConsoleVo sqlConsoleVo = new SqlConsoleVo();
sqlConsoleVo.setSuccess(true);
try {
// 为了设置fetchSize,必须设置为false
conn.setAutoCommit(false);
stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
stmt.setFetchSize(100);
// 是否查询操作
boolean execute = stmt.execute(sql);
if (execute) {
rs = stmt.getResultSet();
// 获取结果集的元数据信息
ResultSetMetaData rsmd = rs.getMetaData();
// 获取列字段的个数
int colunmCount = rsmd.getColumnCount();
// 存储列名的数组
String[] columnNames = new String[colunmCount];
for (int i = 0; i < colunmCount; i++) {
// 获取所有的字段名称
columnNames[i] = rsmd.getColumnLabel(i + 1);
}
while(rs.next()){
Map<String, Object> map = new HashMap<>();
for (int i = 0; i < colunmCount; i++) {
// 获取列名
String columnName = columnNames[i];
// 获取该列对应的值
Object value = rs.getObject(columnName);
map.put(columnName, value);
}
list.add(map);
}
} else {
// 执行新增、修改、删除受影响行数
updateCount = stmt.getUpdateCount();
}
conn.commit();
} catch (SQLException e) {
sqlConsoleVo.setSuccess(false);
if(conn != null){
try {
conn.rollback();
} catch (SQLException e1) {
}
}
} finally {
if(rs != null){
try {
rs.close();
} catch (SQLException e) {
}
}
if(stmt != null){
try {
stmt.close();
} catch (SQLException e) {
}
}
if(conn != null){
try {
conn.close();
} catch (SQLException e) {
}
}
}
latch.countDown();
long end = System.currentTimeMillis();
log.info("线程查询数据用时:" + (end - start) + "ms");
sqlConsoleVo.setSql(sql);
sqlConsoleVo.setCount(updateCount);
sqlConsoleVo.setData(list);
sqlConsoleVo.setTime(end - start);
return sqlConsoleVo;
}
}
package cn.datax.service.data.factory.sql.console.config;
import cn.datax.common.security.handler.DataAccessDeniedHandler;
import cn.datax.common.security.handler.DataAuthExceptionEntryPoint;
import cn.datax.common.security.utils.DataRedisTokenServices;
import cn.datax.common.security.utils.RedisTokenStore;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.security.config.annotation.method.configuration.EnableGlobalMethodSecurity;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.oauth2.config.annotation.web.configuration.EnableResourceServer;
import org.springframework.security.oauth2.config.annotation.web.configuration.ResourceServerConfigurerAdapter;
import org.springframework.security.oauth2.config.annotation.web.configurers.ResourceServerSecurityConfigurer;
import org.springframework.security.oauth2.provider.token.TokenStore;
@Configuration
@EnableResourceServer
@EnableGlobalMethodSecurity(prePostEnabled = true)
public class DataResourceServerConfig extends ResourceServerConfigurerAdapter {
@Autowired
private DataAccessDeniedHandler accessDeniedHandler;
@Autowired
private DataAuthExceptionEntryPoint exceptionEntryPoint;
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Bean
public TokenStore redisTokenStore() {
return new RedisTokenStore(redisConnectionFactory);
}
@Override
public void configure(ResourceServerSecurityConfigurer resources) {
DataRedisTokenServices dataTokenServices = new DataRedisTokenServices();
dataTokenServices.setTokenStore(redisTokenStore());
resources
.tokenStore(redisTokenStore())
.tokenServices(dataTokenServices)
.authenticationEntryPoint(exceptionEntryPoint)
.accessDeniedHandler(accessDeniedHandler);
}
@Override
public void configure(HttpSecurity http) throws Exception {
//允许使用iframe 嵌套,避免swagger-ui 不被加载的问题
http.headers().frameOptions().disable();
http.authorizeRequests()
.antMatchers(
"/actuator/**",
"/v2/api-docs/**",
"/swagger-ui.html",
"/doc.html",
"/swagger-resources/**",
"/webjars/**",
// feign 内部调用不用授权
"/inner/**"
).permitAll()
.anyRequest().authenticated()
.and().csrf().disable();
}
}
package cn.datax.service.data.factory.sql.console.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.RequestMethod;
import springfox.documentation.builders.*;
import springfox.documentation.schema.ModelRef;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.ApiKey;
import springfox.documentation.service.Parameter;
import springfox.documentation.service.ResponseMessage;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
import java.util.ArrayList;
import java.util.List;
@Configuration
@EnableSwagger2
public class SwaggerConfig {
/**
* 创建API应用
* apiInfo() 增加API相关信息
* 通过select()函数返回一个ApiSelectorBuilder实例,用来控制哪些接口暴露给Swagger来展现,
* 本例采用指定扫描的包路径来定义指定要建立API的目录。
*
* @return
*/
@Bean
public Docket createRestApi(){
//版本类型是swagger2
return new Docket(DocumentationType.SWAGGER_2)
//通过调用自定义方法apiInfo,获得文档的主要信息
.apiInfo(apiInfo())
//设置全局参数
.globalOperationParameters(globalParamBuilder())
//设置全局响应参数
.globalResponseMessage(RequestMethod.GET,responseBuilder())
.globalResponseMessage(RequestMethod.POST,responseBuilder())
.globalResponseMessage(RequestMethod.PUT,responseBuilder())
.globalResponseMessage(RequestMethod.DELETE,responseBuilder())
.select()
.apis(RequestHandlerSelectors.basePackage("cn.datax.service.data.factory.sql.console.controller"))//扫描该包下面的API注解
.paths(PathSelectors.any())
.build()
//设置安全认证
.securitySchemes(security());
}
/**
* 创建该API的基本信息(这些基本信息会展现在文档页面中)
* 访问地址:http://项目实际地址/swagger-ui.html
* @return
*/
private ApiInfo apiInfo() {
return new ApiInfoBuilder()
.title("数据工厂管理中心") //接口管理文档首页显示
.description("数据工厂管理中心接口文档") //API的描述
.version("1.0")
.build();
}
/**
* 安全认证参数
* @return
*/
private List<ApiKey> security() {
List<ApiKey> apiKeys = new ArrayList<>();
apiKeys.add(new ApiKey("Authorization", "Authorization", "header"));
return apiKeys;
}
/**
* 构建全局参数列表
* @return
*/
private List<Parameter> globalParamBuilder(){
List<Parameter> pars = new ArrayList<>();
pars.add(parameterBuilder("Authorization","令牌","string","header",false).build());
return pars;
}
/**
* 创建参数
* @return
*/
private ParameterBuilder parameterBuilder(String name, String desc, String type, String parameterType, boolean required) {
ParameterBuilder tokenPar = new ParameterBuilder();
tokenPar.name(name).description(desc).modelRef(new ModelRef(type)).parameterType(parameterType).required(required).build();
return tokenPar;
}
/**
* 创建全局响应值
* @return
*/
private List<ResponseMessage> responseBuilder() {
List<ResponseMessage> responseMessageList = new ArrayList<>();
responseMessageList.add(new ResponseMessageBuilder().code(200).message("响应成功").build());
responseMessageList.add(new ResponseMessageBuilder().code(500).message("服务器内部错误").build());
return responseMessageList;
}
}
package cn.datax.service.data.factory.sql.console.controller;
import cn.datax.common.base.BaseController;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/inner")
public class InnerController extends BaseController {
}
package cn.datax.service.data.factory.sql.console.controller;
import cn.datax.common.base.BaseController;
import cn.datax.common.core.R;
import cn.datax.common.validate.ValidationGroups;
import cn.datax.service.data.factory.api.dto.SqlConsoleDto;
import cn.datax.service.data.factory.api.vo.SqlConsoleVo;
import cn.datax.service.data.factory.sql.console.service.SqlConsoleService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController
@RequestMapping("/v1")
public class SqlConsoleController extends BaseController {
@Autowired
private SqlConsoleService sqlConsoleService;
@PostMapping("/run")
public R sqlRun(@RequestBody @Validated SqlConsoleDto sqlConsoleDto){
List<SqlConsoleVo> list = sqlConsoleService.sqlRun(sqlConsoleDto);
return R.ok().setData(list);
}
@PostMapping("/stop")
public R sqlStop(@RequestBody @Validated({ValidationGroups.Other.class}) SqlConsoleDto sqlConsoleDto){
sqlConsoleService.sqlStop(sqlConsoleDto);
return R.ok();
}
}
package cn.datax.service.data.factory.sql.console.service;
import cn.datax.service.data.factory.api.dto.SqlConsoleDto;
import cn.datax.service.data.factory.api.vo.SqlConsoleVo;
import java.util.List;
public interface SqlConsoleService {
List<SqlConsoleVo> sqlRun(SqlConsoleDto sqlConsoleDto);
void sqlStop(SqlConsoleDto sqlConsoleDto);
}
package cn.datax.service.data.factory.sql.console.service.impl;
import cn.datax.common.core.R;
import cn.datax.common.database.DataSourceFactory;
import cn.datax.common.database.DbQuery;
import cn.datax.common.database.constants.DbQueryProperty;
import cn.datax.common.exception.DataException;
import cn.datax.common.utils.ThrowableUtil;
import cn.datax.service.data.factory.api.dto.DbSchema;
import cn.datax.service.data.factory.api.dto.SqlConsoleDto;
import cn.datax.service.data.factory.api.entity.DataSourceEntity;
import cn.datax.service.data.factory.api.feign.DataSourceServiceFeign;
import cn.datax.service.data.factory.api.vo.SqlConsoleVo;
import cn.datax.service.data.factory.sql.console.concurrent.CallableTemplate;
import cn.datax.service.data.factory.sql.console.concurrent.DateHander;
import cn.datax.service.data.factory.sql.console.service.SqlConsoleService;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.Statements;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
@Slf4j
@Service
public class SqlConsoleServiceImpl implements SqlConsoleService {
@Autowired
private DataSourceFactory dataSourceFactory;
@Autowired
private DataSourceServiceFeign dataSourceServiceFeign;
private static Map<String, List<Connection>> connectionMap = new ConcurrentHashMap<>();
@Override
public List<SqlConsoleVo> sqlRun(SqlConsoleDto sqlConsoleDto) {
String sqlKey = sqlConsoleDto.getSqlKey();
Statements stmts;
try {
stmts = CCJSqlParserUtil.parseStatements(sqlConsoleDto.getSqlText());
} catch (JSQLParserException e) {
log.error("全局异常信息ex={}, StackTrace={}", e.getMessage(), ThrowableUtil.getStackTrace(e));
throw new DataException("SQL语法有问题,解析出错");
}
List<Statement> sqls = stmts.getStatements();
if (CollUtil.isEmpty(sqls)) {
throw new DataException("未解析到SQL语句");
}
R sourceResult = dataSourceServiceFeign.getDataSourceById(sqlConsoleDto.getSourceId());
if(sourceResult == null || !sourceResult.isSuccess() || ObjectUtil.isEmpty(sourceResult.getData())){
throw new DataException("SQL工作台查询数据源出错");
}
DataSourceEntity dataSource = JSON.parseObject(JSON.toJSONString(sourceResult.getData()), DataSourceEntity.class);
DbSchema dbSchema = dataSource.getDbSchema();
DbQueryProperty dbQueryProperty = new DbQueryProperty(dataSource.getDbType(), dbSchema.getHost(),
dbSchema.getUsername(), dbSchema.getPassword(), dbSchema.getPort(), dbSchema.getDbName());
DbQuery dbQuery = dataSourceFactory.createDbQuery(dbQueryProperty);
// 定义计数器
final CountDownLatch latch = new CountDownLatch(sqls.size());
// 定义固定长度的线程池
ExecutorService executorService = Executors.newFixedThreadPool(sqls.size());
// Callable用于产生结果
List<CallableTemplate<SqlConsoleVo>> tasks = new ArrayList<>();
List<Connection> conns = new ArrayList<>();
for (int i = 0; i < sqls.size(); i++) {
Connection conn = dbQuery.getConnection();
conns.add(conn);
DateHander dateHander = new DateHander(latch, conn, sqls.get(i).toString());
tasks.add(dateHander);
}
connectionMap.put(sqlKey, conns);
// Future用于获取结果
List<SqlConsoleVo> result = new ArrayList<>();
List<Future<SqlConsoleVo>> futures;
try {
futures = executorService.invokeAll(tasks);
// 主线程阻塞,等待所有子线程执行完成
latch.await();
// 处理线程返回结果
for (Future<SqlConsoleVo> future : futures) {
result.add(future.get());
}
} catch (Exception e) {
log.error("全局异常信息ex={}, StackTrace={}", e.getMessage(), ThrowableUtil.getStackTrace(e));
}
// 关闭线程池
executorService.shutdown();
// 执行完清除
connectionMap.remove(sqlKey);
return result;
}
@Override
public void sqlStop(SqlConsoleDto sqlConsoleDto) {
String sqlKey = sqlConsoleDto.getSqlKey();
List<Connection> conns = connectionMap.get(sqlKey);
if (CollUtil.isNotEmpty(conns)) {
for (int i = 0; i < conns.size(); i++) {
Connection conn = conns.get(i);
try {
if (null != conn && !conn.isClosed()) {
conn.close();
}
} catch (SQLException e) {
throw new DataException("SQL工作台停止出错");
}
}
}
}
}
server:
port: 8816
spring:
application:
name: datax-service-data-sql-console
profiles:
active: dev
cloud:
config:
fail-fast: true
name: ${spring.application.name}
profile: ${spring.profiles.active}
discovery:
enabled: true
service-id: datax-config
# 注册中心配置
eureka:
instance:
lease-renewal-interval-in-seconds: 20
client:
register-with-eureka: true
fetch-registry: true
instance-info-replication-interval-seconds: 30
registry-fetch-interval-seconds: 3
service-url:
defaultZone: http://localhost:8610/eureka
\ No newline at end of file
module.log=com.p6spy.engine.logging.P6LogFactory,com.p6spy.engine.outage.P6OutageFactory
# 自定义日志打印
logMessageFormat=com.baomidou.mybatisplus.extension.p6spy.P6SpyLogger
#日志输出到控制台
appender=com.baomidou.mybatisplus.extension.p6spy.StdoutLogger
# 使用日志系统记录 sql
#appender=com.p6spy.engine.spy.appender.Slf4JLogger
# 设置 p6spy driver 代理
deregisterdrivers=true
# 取消JDBC URL前缀
useprefix=true
# 配置记录 Log 例外,可去掉的结果集有error,info,batch,debug,statement,commit,rollback,result,resultset.
excludecategories=info,debug,result,batch,resultset
# 日期格式
dateformat=yyyy-MM-dd HH:mm:ss
# 实际驱动可多个
#driverlist=org.h2.Driver
# 是否开启慢SQL记录
outagedetection=true
# 慢SQL记录标准 2 秒
outagedetectioninterval=2
# 开启过滤
filter=true
# 配置不打印的内容
exclude=select 1
\ No newline at end of file
......@@ -25,7 +25,6 @@ import org.springframework.web.bind.annotation.*;
import cn.datax.common.base.BaseController;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
......@@ -145,13 +144,7 @@ public class DataSourceController extends BaseController {
@PostMapping("/checkConnection")
public R checkConnection(@RequestBody @Validated({ValidationGroups.Insert.class}) DataSourceDto dataSource) {
DbQuery dbQuery = dataSourceService.checkConnection(dataSource);
boolean valid = false;
try {
valid = dbQuery.valid();
} catch (SQLException e) {
e.printStackTrace();
return R.error(e.getMessage());
}
Boolean valid = dbQuery.valid();
return valid ? R.ok() : R.error("数据库连接有误,请检查数据库配置是否正确");
}
......
......@@ -16,5 +16,6 @@
<modules>
<module>data-factory-service</module>
<module>data-factory-service-api</module>
<module>data-factory-service-sql-console</module>
</modules>
</project>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment