Commit 38dfa77b by hy

集成common-database模块

parent ed7a1e9a
......@@ -71,6 +71,14 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
......
package cn.datax.common.database;
import cn.datax.common.database.constants.DbQueryProperty;
public interface DataSourceFactory {
/**
* 创建数据源实例
*
* @param property
* @return
*/
DbQuery createDbQuery(DbQueryProperty property);
}
package cn.datax.common.database;
import cn.datax.common.database.core.DbColumn;
import cn.datax.common.database.core.DbTable;
import org.springframework.jdbc.core.RowMapper;
/**
* 表数据查询接口
*
* @author yuwei
* @since 2020-03-14
*/
public interface DbDialect {
RowMapper<DbTable> tableMapper();
RowMapper<DbColumn> columnMapper();
/**
* 获取指定表的所有列
*
* @param dbName
* @param tableName
* @return
*/
String columns(String dbName, String tableName);
/**
* 获取数据库下的 所有表
*
* @param dbName
* @return
*/
String tables(String dbName);
/**
* 构建 分页 sql
*
* @param sql
* @param offset
* @param count
* @return
*/
String buildPaginationSql(String sql, long offset, long count);
/**
* 包装 count sql
*
* @param sql
* @return
*/
String count(String sql);
}
package cn.datax.common.database;
import cn.datax.common.database.core.DbColumn;
import cn.datax.common.database.core.DbTable;
import cn.datax.common.database.core.PageResult;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
/**
* 表数据查询接口
*
* @author yuwei
* @since 2020-03-14
*/
public interface DbQuery {
/**
* 获取数据库连接
*/
Connection getConnection();
/**
* 检测连通性
*/
boolean valid();
/**
* 关闭数据源
*/
void close();
/**
* 获取指定表 具有的所有字段列表
* @param dbName
* @param tableName
* @return
*/
List<DbColumn> getTableColumns(String dbName, String tableName);
/**
* 获取指定数据库下 所有的表信息
*
* @param dbName
* @return
*/
List<DbTable> getTables(String dbName);
/**
* 获取总数
*
* @param sql
* @return
*/
int count(String sql);
/**
* 获取总数带查询参数
*
* @param sql
* @return
*/
int count(String sql, Object[] args);
/**
* 获取总数带查询参数 NamedParameterJdbcTemplate
*
* @param sql
* @return
*/
int count(String sql, Map<String, Object> params);
/**
* 查询结果列表
*
* @param sql
* @return
*/
List<Map<String, Object>> queryList(String sql);
/**
* 查询结果列表带查询参数
*
* @param sql
* @param args
* @return
*/
List<Map<String, Object>> queryList(String sql, Object[] args);
/**
* 查询结果分页
*
* @param sql
* @param offset
* @param size
* @return
*/
PageResult<Map<String, Object>> queryByPage(String sql, long offset, long size);
/**
* 查询结果分页带查询参数
* @param sql
* @param args
* @param offset
* @param size
* @return
*/
PageResult<Map<String, Object>> queryByPage(String sql, Object[] args, long offset, long size);
/**
* 查询结果分页带查询参数 NamedParameterJdbcTemplate
* @param sql
* @param params
* @param offset
* @param size
* @return
*/
PageResult<Map<String, Object>> queryByPage(String sql, Map<String, Object> params, long offset, long size);
}
package cn.datax.common.database;
import cn.datax.common.database.constants.DbType;
import cn.datax.common.database.dialect.DialectRegistry;
/**
* 方言工厂类
*
* @author yuwei
* @since 2020-03-14
*/
public class DialectFactory {
private static final DialectRegistry DIALECT_REGISTRY = new DialectRegistry();
public static DbDialect getDialect(DbType dbType) {
return DIALECT_REGISTRY.getDialect(dbType);
}
}
package cn.datax.common.database.cache;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class DefaultSqlCache extends LinkedHashMap<String, DefaultSqlCache.ExpireNode<Object>> implements SqlCache {
private int capacity;
private long expire;
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public DefaultSqlCache(int capacity, long expire) {
super((int) Math.ceil(capacity / 0.75) + 1, 0.75f, true);
// 容量
this.capacity = capacity;
// 固定过期时间
this.expire = expire;
}
@Override
public void put(String key, Object value, long ttl) {
long expireTime = Long.MAX_VALUE;
if (ttl >= 0) {
expireTime = System.currentTimeMillis() + (ttl == 0 ? this.expire : ttl);
}
lock.writeLock().lock();
try {
// 封装成过期时间节点
put(key, new ExpireNode<>(expireTime, value));
} finally {
lock.writeLock().unlock();
}
}
@Override
public Object get(String key) {
lock.readLock().lock();
ExpireNode<Object> expireNode;
try {
expireNode = super.get(key);
} finally {
lock.readLock().unlock();
}
if (expireNode == null) {
return null;
}
// 惰性删除过期的
if (this.expire > -1L && expireNode.expire < System.currentTimeMillis()) {
try {
lock.writeLock().lock();
super.remove(key);
} finally {
lock.writeLock().unlock();
}
return null;
}
return expireNode.value;
}
@Override
public void delete(String key) {
try {
lock.writeLock().lock();
Iterator<Map.Entry<String, ExpireNode<Object>>> iterator = super.entrySet().iterator();
// 清除key的缓存
while (iterator.hasNext()) {
Map.Entry<String, ExpireNode<Object>> entry = iterator.next();
if (entry.getKey().equals(key)) {
iterator.remove();
}
}
} finally {
lock.writeLock().unlock();
}
}
@Override
protected boolean removeEldestEntry(Map.Entry<String, ExpireNode<Object>> eldest) {
if (this.expire > -1L && size() > capacity) {
clean();
}
// lru淘汰
return size() > this.capacity;
}
/**
* 清理已过期的数据
*/
private void clean() {
try {
lock.writeLock().lock();
Iterator<Map.Entry<String, ExpireNode<Object>>> iterator = super.entrySet().iterator();
long now = System.currentTimeMillis();
while (iterator.hasNext()) {
Map.Entry<String, ExpireNode<Object>> next = iterator.next();
// 判断是否过期
if (next.getValue().expire < now) {
iterator.remove();
}
}
} finally {
lock.writeLock().unlock();
}
}
/**
* 过期时间节点
*/
static class ExpireNode<V> {
long expire;
Object value;
public ExpireNode(long expire, Object value) {
this.expire = expire;
this.value = value;
}
}
}
package cn.datax.common.database.cache;
import cn.datax.common.database.utils.MD5Util;
import java.util.Arrays;
/**
* SQL缓存接口
*/
public interface SqlCache {
/**
* 计算key
*/
default String buildSqlCacheKey(String sql, Object[] args) {
return MD5Util.encrypt(sql + ":" + Arrays.toString(args));
}
/**
* 存入缓存
* @param key key
* @param value 值
*/
void put(String key, Object value, long ttl);
/**
* 获取缓存
* @param key key
* @return
*/
<T> T get(String key);
/**
* 删除缓存
* @param key key
*/
void delete(String key);
}
package cn.datax.common.database.constants;
import cn.datax.common.database.exception.DataQueryException;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.springframework.util.StringUtils;
import java.io.Serializable;
@Data
@AllArgsConstructor
public class DbQueryProperty implements Serializable {
private static final long serialVersionUID = 1L;
private String dbType;
private String host;
private String username;
private String password;
private Integer port;
private String dbName;
private String sid;
/**
* 参数合法性校验
*/
public void viald() {
if (StringUtils.isEmpty(dbType) || StringUtils.isEmpty(host) ||
StringUtils.isEmpty(username) || StringUtils.isEmpty(password) ||
StringUtils.isEmpty(port)) {
throw new DataQueryException("参数不完整");
}
if (DbType.OTHER.getDb().equals(dbType)) {
throw new DataQueryException("不支持的数据库类型");
}
}
}
package cn.datax.common.database.constants;
/**
* 数据库类型
*
* @author yuwei
* @since 2020-03-14
*/
public enum DbType {
/**
* MYSQL
*/
MYSQL("1", "MySql数据库", "jdbc:mysql://${host}:${port}/${dbName}?serverTimezone=GMT%2B8&characterEncoding=UTF-8&useUnicode=true&useSSL=false"),
/**
* MARIADB
*/
MARIADB("2", "MariaDB数据库", "jdbc:mariadb://${host}:${port}/${dbName}"),
/**
* ORACLE
*/
ORACLE("3", "Oracle11g及以下数据库", "jdbc:oracle:thin:@${host}:${port}:${sid}"),
/**
* oracle12c new pagination
*/
ORACLE_12C("4", "Oracle12c+数据库", "jdbc:oracle:thin:@${host}:${port}:${sid}"),
/**
* POSTGRESQL
*/
POSTGRE_SQL("5", "PostgreSql数据库", "jdbc:postgresql://${host}:${port}/${dbName}"),
/**
* SQLSERVER2005
*/
SQL_SERVER2008("6", "SQLServer2008及以下数据库", "jdbc:sqlserver://${host}:${port};DatabaseName=${dbName}"),
/**
* SQLSERVER
*/
SQL_SERVER("7", "SQLServer2012+数据库", "jdbc:sqlserver://${host}:${port};DatabaseName=${dbName}"),
/**
* UNKONWN DB
*/
OTHER("8", "其他数据库", "");
/**
* 数据库名称
*/
private final String db;
/**
* 描述
*/
private final String desc;
/**
* url
*/
private final String url;
public String getDb() {
return this.db;
}
public String getDesc() {
return this.desc;
}
public String getUrl() {
return this.url;
}
DbType(String db, String desc, String url) {
this.db = db;
this.desc = desc;
this.url = url;
}
/**
* 获取数据库类型
*
* @param dbType 数据库类型字符串
*/
public static DbType getDbType(String dbType) {
for (DbType type : DbType.values()) {
if (type.db.equals(dbType)) {
return type;
}
}
return OTHER;
}
}
package cn.datax.common.database.core;
import lombok.Data;
@Data
public class DbColumn {
/**
* 列名
*/
private String colName;
/**
* 数据类型
*/
private String dataType;
/**
* 数据长度
*/
private String dataLength;
/**
* 数据精度
*/
private String dataPrecision;
/**
* 数据小数位
*/
private String dataScale;
/**
* 是否主键
*/
private Boolean colKey;
/**
* 是否允许为空
*/
private Boolean nullable;
/**
* 列的序号
*/
private Integer colPosition;
/**
* 列默认值
*/
private String dataDefault;
/**
* 列注释
*/
private String colComment;
}
package cn.datax.common.database.core;
import lombok.Data;
@Data
public class DbTable {
/**
* 表名
*/
private String tableName;
/**
* 表注释
*/
private String tableComment;
}
package cn.datax.common.database.core;
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;
import java.util.List;
@Data
@Accessors(chain = true)
public class PageResult<T> implements Serializable {
private static final long serialVersionUID = 1L;
private Integer pageNum;
private Integer pageSize;
private Integer total;
private List<T> data;
public PageResult(Integer total, List<T> data) {
this.total = total;
this.data = data;
}
}
package cn.datax.common.database.datasource;
import cn.datax.common.database.*;
import cn.datax.common.database.constants.DbQueryProperty;
import cn.datax.common.database.constants.DbType;
import cn.datax.common.database.exception.DataQueryException;
import cn.datax.common.database.query.AbstractDbQueryFactory;
import cn.datax.common.database.query.CacheDbQueryFactoryBean;
import com.zaxxer.hikari.HikariDataSource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.util.StringUtils;
import javax.sql.DataSource;
public abstract class AbstractDataSourceFactory implements DataSourceFactory {
@Override
public DbQuery createDbQuery(DbQueryProperty property) {
property.viald();
DbType dbType = DbType.getDbType(property.getDbType());
DataSource dataSource = createDataSource(property);
DbQuery dbQuery = createDbQuery(dataSource, dbType);
return dbQuery;
}
public DbQuery createDbQuery(DataSource dataSource, DbType dbType) {
DbDialect dbDialect = DialectFactory.getDialect(dbType);
if(dbDialect == null){
throw new DataQueryException("该数据库类型正在开发中");
}
AbstractDbQueryFactory dbQuery = new CacheDbQueryFactoryBean();
dbQuery.setDataSource(dataSource);
dbQuery.setJdbcTemplate(new JdbcTemplate(dataSource));
dbQuery.setDbDialect(dbDialect);
return dbQuery;
}
public DataSource createDataSource(DbQueryProperty property) {
HikariDataSource dataSource = new HikariDataSource();
dataSource.setJdbcUrl(trainToJdbcUrl(property));
dataSource.setUsername(property.getUsername());
dataSource.setPassword(property.getPassword());
return dataSource;
}
protected String trainToJdbcUrl(DbQueryProperty property) {
String url = DbType.getDbType(property.getDbType()).getUrl();
if (StringUtils.isEmpty(url)) {
throw new DataQueryException("无效数据库类型!");
}
url = url.replace("${host}", property.getHost());
url = url.replace("${port}", String.valueOf(property.getPort()));
if (DbType.ORACLE.getDb().equals(property.getDbType()) || DbType.ORACLE_12C.getDb().equals(property.getDbType())) {
url = url.replace("${sid}", property.getSid());
} else {
url = url.replace("${dbName}", property.getDbName());
}
return url;
}
}
package cn.datax.common.database.datasource;
import cn.datax.common.database.constants.DbQueryProperty;
import javax.sql.DataSource;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class CacheDataSourceFactoryBean extends AbstractDataSourceFactory {
/**
* 数据源缓存
*/
private static Map<String, DataSource> dataSourceMap = new ConcurrentHashMap<>();
@Override
public DataSource createDataSource(DbQueryProperty property) {
String key = property.getHost() + ":" + property.getPort() + ":" + property.getUsername()+ ":" + property.getDbName();
String s = compress(key);
DataSource dataSource = dataSourceMap.get(s);
if (null == dataSource) {
synchronized (CacheDataSourceFactoryBean.class) {
if (null == dataSource) {
dataSource = super.createDataSource(property);
dataSourceMap.put(s, dataSource);
}
}
}
return dataSource;
}
// 压缩
public static String compress(String str) {
if (str == null || str.length() == 0) {
return str;
}
MessageDigest md = null;
try {
md = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
md.update(str.getBytes());
byte b[] = md.digest();
int i;
StringBuffer buf = new StringBuffer();
for (int offset = 0; offset < b.length; offset++) {
i = b[offset];
if (i < 0)
i += 256;
if (i < 16)
buf.append("0");
buf.append(Integer.toHexString(i));
}
// System.out.println("MD5(" + str + ",32小写) = " + buf.toString());
// System.out.println("MD5(" + str + ",32大写) = " + buf.toString().toUpperCase());
// System.out.println("MD5(" + str + ",16小写) = " + buf.toString().substring(8, 24));
// System.out.println("MD5(" + str + ",16大写) = " + buf.toString().substring(8, 24).toUpperCase());
return buf.toString().substring(8, 24).toUpperCase();
}
}
package cn.datax.common.database.datasource;
public class DefaultDataSourceFactoryBean extends AbstractDataSourceFactory {
}
package cn.datax.common.database.dialect;
import cn.datax.common.database.DbDialect;
/**
* 方言抽象类
*
* @author yuwei
* @since 2020-03-14
*/
public abstract class AbstractDbDialect implements DbDialect {
@Override
public String columns(String dbName, String tableName) {
return "select column_name AS COLNAME, ordinal_position AS COLPOSITION, column_default AS DATADEFAULT, is_nullable AS NULLABLE, data_type AS DATATYPE, " +
"character_maximum_length AS DATALENGTH, numeric_precision AS DATAPRECISION, numeric_scale AS DATASCALE, column_key AS COLKEY, column_comment AS COLCOMMENT " +
"from information_schema.columns where table_schema = '" + dbName + "' and table_name = '" + tableName + "' order by ordinal_position ";
}
@Override
public String tables(String dbName) {
return "SELECT table_name AS TABLENAME, table_comment AS TABLECOMMENT FROM information_schema.tables where table_schema = '" + dbName + "' ";
}
@Override
public String buildPaginationSql(String originalSql, long offset, long count) {
// 获取 分页实际条数
StringBuilder sqlBuilder = new StringBuilder(originalSql);
sqlBuilder.append(" LIMIT ").append(offset).append(" , ").append(count);
return sqlBuilder.toString();
}
@Override
public String count(String sql) {
return "SELECT COUNT(*) FROM ( " + sql + " ) TEMP";
}
}
package cn.datax.common.database.dialect;
import cn.datax.common.database.DbDialect;
import cn.datax.common.database.constants.DbType;
import java.util.EnumMap;
import java.util.Map;
public class DialectRegistry {
private final Map<DbType, DbDialect> dialect_enum_map = new EnumMap<>(DbType.class);
public DialectRegistry() {
dialect_enum_map.put(DbType.MARIADB, new MariaDBDialect());
dialect_enum_map.put(DbType.MYSQL, new MySqlDialect());
dialect_enum_map.put(DbType.ORACLE_12C, new Oracle12cDialect());
dialect_enum_map.put(DbType.ORACLE, new OracleDialect());
dialect_enum_map.put(DbType.POSTGRE_SQL, new PostgreDialect());
dialect_enum_map.put(DbType.SQL_SERVER2008, new SQLServer2008Dialect());
dialect_enum_map.put(DbType.SQL_SERVER, new SQLServerDialect());
dialect_enum_map.put(DbType.OTHER, new UnknownDialect());
}
public DbDialect getDialect(DbType dbType) {
return dialect_enum_map.get(dbType);
}
}
package cn.datax.common.database.dialect;
/**
* MariaDB 数据库方言
*
* @author yuwei
* @since 2020-03-14
*/
public class MariaDBDialect extends MySqlDialect {
}
package cn.datax.common.database.dialect;
import cn.datax.common.database.core.DbColumn;
import cn.datax.common.database.core.DbTable;
import org.springframework.jdbc.core.RowMapper;
import java.sql.ResultSet;
/**
* MySql 数据库方言
*
* @author yuwei
* @since 2020-03-14
*/
public class MySqlDialect extends AbstractDbDialect {
@Override
public RowMapper<DbColumn> columnMapper() {
return (ResultSet rs, int rowNum) -> {
DbColumn entity = new DbColumn();
entity.setColName(rs.getString("COLNAME"));
entity.setDataType(rs.getString("DATATYPE"));
entity.setDataLength(rs.getString("DATALENGTH"));
entity.setDataPrecision(rs.getString("DATAPRECISION"));
entity.setDataScale(rs.getString("DATASCALE"));
entity.setColKey("PRI".equals(rs.getString("COLKEY")) ? true : false);
entity.setNullable("YES".equals(rs.getString("NULLABLE")) ? true : false);
entity.setColPosition(rs.getInt("COLPOSITION"));
entity.setDataDefault(rs.getString("DATADEFAULT"));
entity.setColComment(rs.getString("COLCOMMENT"));
return entity;
};
}
@Override
public RowMapper<DbTable> tableMapper() {
return (ResultSet rs, int rowNum) -> {
DbTable entity = new DbTable();
entity.setTableName(rs.getString("TABLENAME"));
entity.setTableComment(rs.getString("TABLECOMMENT"));
return entity;
};
}
}
package cn.datax.common.database.dialect;
/**
* ORACLE Oracle12c+数据库方言
*
* @author yuwei
* @since 2020-03-14
*/
public class Oracle12cDialect extends OracleDialect {
@Override
public String buildPaginationSql(String originalSql, long offset, long count) {
StringBuilder sqlBuilder = new StringBuilder(originalSql);
sqlBuilder.append(" OFFSET ").append(offset).append(" ROWS FETCH NEXT ").append(count).append(" ROWS ONLY ");
return sqlBuilder.toString();
}
}
package cn.datax.common.database.dialect;
import cn.datax.common.database.core.DbColumn;
import cn.datax.common.database.core.DbTable;
import org.springframework.jdbc.core.RowMapper;
import java.sql.ResultSet;
/**
* Oracle Oracle11g及以下数据库方言
*
* @author yuwei
* @since 2020-03-14
*/
public class OracleDialect extends AbstractDbDialect {
@Override
public String columns(String dbName, String tableName) {
return "select columns.column_name AS colName, columns.data_type AS DATATYPE, columns.data_length AS DATALENGTH, columns.data_precision AS DATAPRECISION, " +
"columns.data_scale AS DATASCALE, columns.nullable AS NULLABLE, columns.column_id AS COLPOSITION, columns.data_default AS DATADEFAULT, comments.comments AS COLCOMMENT," +
"case when t.column_name is null then 0 else 1 end as COLKEY " +
"from sys.user_tab_columns columns LEFT JOIN sys.user_col_comments comments ON columns.table_name = comments.table_name AND columns.column_name = comments.column_name " +
"left join ( " +
"select col.column_name as column_name, con.table_name as table_name from user_constraints con, user_cons_columns col " +
"where con.constraint_name = col.constraint_name and con.constraint_type = 'P' " +
") t on t.table_name = columns.table_name and columns.column_name = t.column_name " +
"where columns.table_name = UPPER('" + tableName + "') order by columns.column_id ";
}
@Override
public String tables(String dbName) {
return "select tables.table_name AS TABLENAME, comments.comments AS TABLECOMMENT from sys.user_tables tables " +
"LEFT JOIN sys.user_tab_comments comments ON tables.table_name = comments.table_name ";
}
@Override
public String buildPaginationSql(String originalSql, long offset, long count) {
StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("SELECT * FROM ( SELECT TMP.*, ROWNUM ROW_ID FROM ( ");
sqlBuilder.append(originalSql).append(" ) TMP WHERE ROWNUM <=").append((offset >= 1) ? (offset + count) : count);
sqlBuilder.append(") WHERE ROW_ID > ").append(offset);
return sqlBuilder.toString();
}
@Override
public RowMapper<DbColumn> columnMapper() {
return (ResultSet rs, int rowNum) -> {
DbColumn entity = new DbColumn();
entity.setColName(rs.getString("COLNAME"));
entity.setDataType(rs.getString("DATATYPE"));
entity.setDataLength(rs.getString("DATALENGTH"));
entity.setDataPrecision(rs.getString("DATAPRECISION"));
entity.setDataScale(rs.getString("DATASCALE"));
entity.setColKey("1".equals(rs.getString("COLKEY")) ? true : false);
entity.setNullable("Y".equals(rs.getString("NULLABLE")) ? true : false);
entity.setColPosition(rs.getInt("COLPOSITION"));
entity.setDataDefault(rs.getString("DATADEFAULT"));
entity.setColComment(rs.getString("COLCOMMENT"));
return entity;
};
}
@Override
public RowMapper<DbTable> tableMapper() {
return (ResultSet rs, int rowNum) -> {
DbTable entity = new DbTable();
entity.setTableName(rs.getString("TABLENAME"));
entity.setTableComment(rs.getString("TABLECOMMENT"));
return entity;
};
}
}
package cn.datax.common.database.dialect;
import cn.datax.common.database.core.DbColumn;
import cn.datax.common.database.core.DbTable;
import org.springframework.jdbc.core.RowMapper;
import java.sql.ResultSet;
/**
* Postgre 数据库方言
*
* @author yuwei
* @since 2020-03-14
*/
public class PostgreDialect extends AbstractDbDialect {
@Override
public String columns(String dbName, String tableName) {
return "select col.column_name AS COLNAME, col.ordinal_position AS COLPOSITION, col.column_default AS DATADEFAULT, col.is_nullable AS NULLABLE, col.udt_name AS DATATYPE, " +
"col.character_maximum_length AS DATALENGTH, col.numeric_precision AS DATAPRECISION, col.numeric_scale AS DATASCALE, des.description AS COLCOMMENT, " +
"case when t.colname is null then 0 else 1 end as COLKEY " +
"from information_schema.columns col left join pg_description des on col.table_name::regclass = des.objoid and col.ordinal_position = des.objsubid " +
"left join ( " +
"select pg_attribute.attname as colname from pg_constraint inner join pg_class on pg_constraint.conrelid = pg_class.oid " +
"inner join pg_attribute on pg_attribute.attrelid = pg_class.oid and pg_attribute.attnum = any(pg_constraint.conkey) " +
"where pg_class.relname = '" + tableName + "' and pg_constraint.contype = 'p' " +
") t on t.colname = col.column_name " +
"where col.table_catalog = '" + dbName + "' and col.table_schema = 'public' and col.table_name = '" + tableName + "' order by col.ordinal_position ";
}
@Override
public String tables(String dbName) {
return "select relname AS TABLENAME, cast(obj_description(relfilenode, 'pg_class') as varchar) AS TABLECOMMENT from pg_class " +
"where relname in (select tablename from pg_tables where schemaname = 'public' and tableowner = '" + dbName + "' and position('_2' in tablename) = 0) ";
}
@Override
public String buildPaginationSql(String originalSql, long offset, long count) {
StringBuilder sqlBuilder = new StringBuilder(originalSql);
sqlBuilder.append(" LIMIT ").append(count).append(" offset ").append(offset);
return sqlBuilder.toString();
}
@Override
public RowMapper<DbColumn> columnMapper() {
return (ResultSet rs, int rowNum) -> {
DbColumn entity = new DbColumn();
entity.setColName(rs.getString("COLNAME"));
entity.setDataType(rs.getString("DATATYPE"));
entity.setDataLength(rs.getString("DATALENGTH"));
entity.setDataPrecision(rs.getString("DATAPRECISION"));
entity.setDataScale(rs.getString("DATASCALE"));
entity.setColKey("1".equals(rs.getString("COLKEY")) ? true : false);
entity.setNullable("YES".equals(rs.getString("NULLABLE")) ? true : false);
entity.setColPosition(rs.getInt("COLPOSITION"));
entity.setDataDefault(rs.getString("DATADEFAULT"));
entity.setColComment(rs.getString("COLCOMMENT"));
return entity;
};
}
@Override
public RowMapper<DbTable> tableMapper() {
return (ResultSet rs, int rowNum) -> {
DbTable entity = new DbTable();
entity.setTableName(rs.getString("TABLENAME"));
entity.setTableComment(rs.getString("TABLECOMMENT"));
return entity;
};
}
}
package cn.datax.common.database.dialect;
import cn.datax.common.database.core.DbColumn;
import cn.datax.common.database.core.DbTable;
import cn.datax.common.database.exception.DataQueryException;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.util.StringUtils;
import java.sql.ResultSet;
/**
* SQLServer 2005 数据库方言
*
* @author yuwei
* @since 2020-03-14
*/
public class SQLServer2008Dialect extends AbstractDbDialect {
@Override
public String columns(String dbName, String tableName) {
return "select columns.name AS colName, columns.column_id AS COLPOSITION, columns.max_length AS DATALENGTH, columns.precision AS DATAPRECISION, columns.scale AS DATASCALE, " +
"columns.is_nullable AS NULLABLE, types.name AS DATATYPE, CAST(ep.value AS NVARCHAR(128)) AS COLCOMMENT, e.text AS DATADEFAULT, " +
"(select top 1 ind.is_primary_key from sys.index_columns ic left join sys.indexes ind on ic.object_id = ind.object_id and ic.index_id = ind.index_id and ind.name like 'PK_%' where ic.object_id=columns.object_id and ic.column_id=columns.column_id) AS COLKEY " +
"from sys.columns columns LEFT JOIN sys.types types ON columns.system_type_id = types.system_type_id " +
"LEFT JOIN syscomments e ON columns.default_object_id= e.id " +
"LEFT JOIN sys.extended_properties ep ON ep.major_id = columns.object_id AND ep.minor_id = columns.column_id AND ep.name = 'MS_Description' " +
"where columns.object_id = object_id('" + tableName + "') order by columns.column_id ";
}
@Override
public String tables(String dbName) {
return "select tables.name AS TABLENAME, CAST(ep.value AS NVARCHAR(128)) AS TABLECOMMENT " +
"from sys.tables tables LEFT JOIN sys.extended_properties ep ON ep.major_id = tables.object_id AND ep.minor_id = 0";
}
private static String getOrderByPart(String sql) {
String loweredString = sql.toLowerCase();
int orderByIndex = loweredString.indexOf("order by");
if (orderByIndex != -1) {
return sql.substring(orderByIndex);
} else {
return "";
}
}
@Override
public String buildPaginationSql(String originalSql, long offset, long count) {
StringBuilder pagingBuilder = new StringBuilder();
String orderby = getOrderByPart(originalSql);
String distinctStr = "";
String loweredString = originalSql.toLowerCase();
String sqlPartString = originalSql;
if (loweredString.trim().startsWith("select")) {
int index = 6;
if (loweredString.startsWith("select distinct")) {
distinctStr = "DISTINCT ";
index = 15;
}
sqlPartString = sqlPartString.substring(index);
}
pagingBuilder.append(sqlPartString);
// if no ORDER BY is specified use fake ORDER BY field to avoid errors
if (StringUtils.isEmpty(orderby)) {
orderby = "ORDER BY CURRENT_TIMESTAMP";
}
StringBuilder sql = new StringBuilder();
sql.append("WITH selectTemp AS (SELECT ").append(distinctStr).append("TOP 100 PERCENT ")
.append(" ROW_NUMBER() OVER (").append(orderby).append(") as __row_number__, ").append(pagingBuilder)
.append(") SELECT * FROM selectTemp WHERE __row_number__ BETWEEN ")
//FIX#299:原因:mysql中limit 10(offset,size) 是从第10开始(不包含10),;而这里用的BETWEEN是两边都包含,所以改为offset+1
.append(offset + 1)
.append(" AND ")
.append(offset + count).append(" ORDER BY __row_number__");
return sql.toString();
}
@Override
public RowMapper<DbColumn> columnMapper() {
return (ResultSet rs, int rowNum) -> {
DbColumn entity = new DbColumn();
entity.setColName(rs.getString("COLNAME"));
entity.setDataType(rs.getString("DATATYPE"));
entity.setDataLength(rs.getString("DATALENGTH"));
entity.setDataPrecision(rs.getString("DATAPRECISION"));
entity.setDataScale(rs.getString("DATASCALE"));
entity.setColKey("1".equals(rs.getString("COLKEY")) ? true : false);
entity.setNullable("1".equals(rs.getString("NULLABLE")) ? true : false);
entity.setColPosition(rs.getInt("COLPOSITION"));
entity.setDataDefault(rs.getString("DATADEFAULT"));
entity.setColComment(rs.getString("COLCOMMENT"));
return entity;
};
}
@Override
public RowMapper<DbTable> tableMapper() {
return (ResultSet rs, int rowNum) -> {
DbTable entity = new DbTable();
entity.setTableName(rs.getString("TABLENAME"));
entity.setTableComment(rs.getString("TABLECOMMENT"));
return entity;
};
}
}
package cn.datax.common.database.dialect;
/**
* SQLServer 数据库方言
*
* @author yuwei
* @since 2020-03-14
*/
public class SQLServerDialect extends SQLServer2008Dialect {
@Override
public String buildPaginationSql(String originalSql, long offset, long count) {
StringBuilder sqlBuilder = new StringBuilder(originalSql);
sqlBuilder.append(" OFFSET ").append(offset).append(" ROWS FETCH NEXT ").append(count).append(" ROWS ONLY ");
return sqlBuilder.toString();
}
}
package cn.datax.common.database.dialect;
import cn.datax.common.database.core.DbColumn;
import cn.datax.common.database.core.DbTable;
import cn.datax.common.database.exception.DataQueryException;
import org.springframework.jdbc.core.RowMapper;
/**
* 未知 数据库方言
*
* @author yuwei
* @since 2020-03-14
*/
public class UnknownDialect extends AbstractDbDialect {
@Override
public String columns(String dbName, String tableName) {
throw new DataQueryException("不支持的数据库类型");
}
@Override
public String tables(String dbName) {
throw new DataQueryException("不支持的数据库类型");
}
@Override
public String buildPaginationSql(String sql, long offset, long count) {
throw new DataQueryException("不支持的数据库类型");
}
@Override
public String count(String sql) {
throw new DataQueryException("不支持的数据库类型");
}
@Override
public RowMapper<DbColumn> columnMapper() {
throw new DataQueryException("不支持的数据库类型");
}
@Override
public RowMapper<DbTable> tableMapper() {
throw new DataQueryException("不支持的数据库类型");
}
}
package cn.datax.common.database.exception;
public class DataQueryException extends RuntimeException {
public DataQueryException(String message) {
super(message);
}
}
package cn.datax.common.database.query;
import cn.datax.common.database.DbDialect;
import cn.datax.common.database.DbQuery;
import cn.datax.common.database.core.DbColumn;
import cn.datax.common.database.core.DbTable;
import cn.datax.common.database.core.PageResult;
import cn.datax.common.database.exception.DataQueryException;
import com.zaxxer.hikari.HikariDataSource;
import lombok.Setter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
@Setter
public abstract class AbstractDbQueryFactory implements DbQuery {
protected DataSource dataSource;
protected JdbcTemplate jdbcTemplate;
protected DbDialect dbDialect;
@Override
public Connection getConnection() {
try {
return dataSource.getConnection();
} catch (SQLException e) {
throw new DataQueryException("获取数据库连接出错");
}
}
@Override
public boolean valid() {
Connection conn = null;
try {
conn = dataSource.getConnection();
return conn.isValid(0);
} catch (SQLException e) {
throw new DataQueryException("检测连通性出错");
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
throw new DataQueryException("关闭数据库连接出错");
}
}
}
}
@Override
public void close() {
if (dataSource instanceof HikariDataSource) {
((HikariDataSource) dataSource).close();
} else {
throw new DataQueryException("不合法数据源类型");
}
}
@Override
public List<DbColumn> getTableColumns(String dbName, String tableName) {
String sql = dbDialect.columns(dbName, tableName);
return jdbcTemplate.query(sql, dbDialect.columnMapper());
}
@Override
public List<DbTable> getTables(String dbName) {
String sql = dbDialect.tables(dbName);
return jdbcTemplate.query(sql, dbDialect.tableMapper());
}
@Override
public int count(String sql) {
return jdbcTemplate.queryForObject(dbDialect.count(sql), Integer.class);
}
@Override
public int count(String sql, Object[] args) {
return jdbcTemplate.queryForObject(dbDialect.count(sql), args, Integer.class);
}
@Override
public int count(String sql, Map<String, Object> params) {
NamedParameterJdbcTemplate namedJdbcTemplate = new NamedParameterJdbcTemplate(jdbcTemplate);
return namedJdbcTemplate.queryForObject(dbDialect.count(sql), params, Integer.class);
}
@Override
public List<Map<String, Object>> queryList(String sql) {
return jdbcTemplate.queryForList(sql);
}
@Override
public List<Map<String, Object>> queryList(String sql, Object[] args) {
return jdbcTemplate.queryForList(sql, args);
}
@Override
public PageResult<Map<String, Object>> queryByPage(String sql, long offset, long size) {
int total = count(sql);
String pageSql = dbDialect.buildPaginationSql(sql, offset, size);
List<Map<String, Object>> records = jdbcTemplate.queryForList(pageSql);
return new PageResult<>(total, records);
}
@Override
public PageResult<Map<String, Object>> queryByPage(String sql, Object[] args, long offset, long size) {
int total = count(sql, args);
String pageSql = dbDialect.buildPaginationSql(sql, offset, size);
List<Map<String, Object>> records = jdbcTemplate.queryForList(pageSql, args);
return new PageResult<>(total, records);
}
@Override
public PageResult<Map<String, Object>> queryByPage(String sql, Map<String, Object> params, long offset, long size) {
int total = count(sql, params);
String pageSql = dbDialect.buildPaginationSql(sql, offset, size);
NamedParameterJdbcTemplate namedJdbcTemplate = new NamedParameterJdbcTemplate(jdbcTemplate);
List<Map<String, Object>> records = namedJdbcTemplate.queryForList(pageSql, params);
return new PageResult<>(total, records);
}
}
package cn.datax.common.database.query;
import cn.datax.common.database.cache.DefaultSqlCache;
import cn.datax.common.database.core.DbColumn;
import cn.datax.common.database.core.DbTable;
import cn.datax.common.database.core.PageResult;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class CacheDbQueryFactoryBean extends AbstractDbQueryFactory {
/**
* 默认缓存5分钟
*/
private static long DEFAULT_EXPIRE = 5 * 60 * 1000;
private static DefaultSqlCache sqlCache = new DefaultSqlCache(100, DEFAULT_EXPIRE);
private <T> T putCacheValue(String key, T value, long ttl) {
sqlCache.put(key, value, ttl);
return value;
}
@Override
public List<DbColumn> getTableColumns(String dbName, String tableName) {
Object[] args = new Object[]{dbName, tableName};
Optional.ofNullable(sqlCache.get(sqlCache.buildSqlCacheKey(super.dataSource.toString() + ":getTableColumns", args)));
return super.getTableColumns(dbName, tableName);
}
@Override
public List<DbTable> getTables(String dbName) {
Object[] args = new Object[]{dbName};
String cacheKey = sqlCache.buildSqlCacheKey(super.dataSource.toString() + ":getTables", args);
return (List<DbTable>) Optional.ofNullable(sqlCache.get(cacheKey))
.orElse(putCacheValue(cacheKey, super.getTables(dbName), DEFAULT_EXPIRE));
}
@Override
public int count(String sql) {
String cacheKey = sqlCache.buildSqlCacheKey(super.dataSource.toString() + ":" + sql, null);
return (int) Optional.ofNullable(sqlCache.get(cacheKey))
.orElse(putCacheValue(cacheKey, super.count(sql), DEFAULT_EXPIRE));
}
@Override
public int count(String sql, Object[] args) {
String cacheKey = sqlCache.buildSqlCacheKey(super.dataSource.toString() + ":" + sql, args);
return (int) Optional.ofNullable(sqlCache.get(cacheKey))
.orElse(putCacheValue(cacheKey, super.count(sql, args), DEFAULT_EXPIRE));
}
@Override
public int count(String sql, Map<String, Object> params) {
String cacheKey = sqlCache.buildSqlCacheKey(super.dataSource.toString() + ":" + sql, params.values().toArray());
return (int) Optional.ofNullable(sqlCache.get(cacheKey))
.orElse(putCacheValue(cacheKey, super.count(sql, params), DEFAULT_EXPIRE));
}
@Override
public List<Map<String, Object>> queryList(String sql) {
String cacheKey = sqlCache.buildSqlCacheKey(super.dataSource.toString() + ":" + sql, null);
return (List<Map<String, Object>>) Optional.ofNullable(sqlCache.get(cacheKey))
.orElse(putCacheValue(cacheKey, super.queryList(sql), DEFAULT_EXPIRE));
}
@Override
public List<Map<String, Object>> queryList(String sql, Object[] args) {
String cacheKey = sqlCache.buildSqlCacheKey(super.dataSource.toString() + ":" + sql, args);
return (List<Map<String, Object>>) Optional.ofNullable(sqlCache.get(cacheKey))
.orElse(putCacheValue(cacheKey, super.queryList(sql, args), DEFAULT_EXPIRE));
}
@Override
public PageResult<Map<String, Object>> queryByPage(String sql, long offset, long size) {
Object[] args = new Object[]{offset, size};
String cacheKey = sqlCache.buildSqlCacheKey(super.dataSource.toString() + ":" + sql, args);
return (PageResult<Map<String, Object>>) Optional.ofNullable(sqlCache.get(cacheKey))
.orElse(putCacheValue(cacheKey, super.queryByPage(sql, offset, size), DEFAULT_EXPIRE));
}
@Override
public PageResult<Map<String, Object>> queryByPage(String sql, Object[] args, long offset, long size) {
Object[] objects = Arrays.copyOf(args, args.length + 2);
objects[args.length] = offset;
objects[args.length + 1] = size;
String cacheKey = sqlCache.buildSqlCacheKey(super.dataSource.toString() + ":" + sql, objects);
return (PageResult<Map<String, Object>>) Optional.ofNullable(sqlCache.get(cacheKey))
.orElse(putCacheValue(cacheKey, super.queryByPage(sql, args, offset, size), DEFAULT_EXPIRE));
}
@Override
public PageResult<Map<String, Object>> queryByPage(String sql, Map<String, Object> params, long offset, long size) {
Object[] args = params.values().toArray();
Object[] objects = Arrays.copyOf(args, args.length + 2);
objects[args.length] = offset;
objects[args.length + 1] = size;
String cacheKey = sqlCache.buildSqlCacheKey(super.dataSource.toString() + ":" + sql, objects);
return (PageResult<Map<String, Object>>) Optional.ofNullable(sqlCache.get(cacheKey))
.orElse(putCacheValue(cacheKey, super.queryByPage(sql, params, offset, size), DEFAULT_EXPIRE));
}
}
package cn.datax.common.database.query;
public class DefaultDbQueryFactoryBean extends AbstractDbQueryFactory {
}
package cn.datax.common.database.utils;
import java.security.MessageDigest;
import java.util.Arrays;
public class MD5Util {
public static void main(String[] args) throws InterruptedException {
Object[] arr = new Object[]{"dbName"};
Object[] objects = Arrays.copyOf(arr, arr.length + 2);
System.out.println(objects.length);
int length = arr.length;
objects[length] = 1;
objects[length+1] = 2;
System.out.println(Arrays.toString(objects));
// String encrypt = MD5Util.encrypt("sql" + ":" + Arrays.toString(arr));
// System.out.println(encrypt);
}
private static final char[] HEX_CHARS = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
/**
* MD5加密
*/
public static String encrypt(String value){
return encrypt(value.getBytes());
}
/**
* MD5加密
*/
public static String encrypt(byte[] value){
try {
byte[] bytes = MessageDigest.getInstance("MD5").digest(value);
char[] chars = new char[32];
for (int i = 0; i < chars.length; i = i + 2) {
byte b = bytes[i / 2];
chars[i] = HEX_CHARS[(b >>> 0x4) & 0xf];
chars[i + 1] = HEX_CHARS[b & 0xf];
}
return new String(chars);
} catch (Exception e) {
throw new RuntimeException("md5 encrypt error", e);
}
}
}
......@@ -2,4 +2,5 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.datax.common.config.JacksonConfig,\
cn.datax.common.config.RestTemplateConfig,\
cn.datax.common.exception.GlobalExceptionHandler,\
cn.datax.common.utils.SpringContextHolder
cn.datax.common.utils.SpringContextHolder,\
cn.datax.common.database.datasource.CacheDataSourceFactoryBean
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment