说在前面
本文转自“天河聊技术”微信公众号
sql路由这里的内容比较多,包含单表路由或者绑定表路由、多库多表路由、笛卡尔积路由,分三部分来介绍,今天先介绍单表或绑定表路由。
sql路由源码解析
com.dangdang.ddframe.rdb.sharding.routing.PreparedStatementRoutingEngine、com.dangdang.ddframe.rdb.sharding.routing.StatementRoutingEngine两个sql路由引擎类,预编译的用的比较多,我们以预编译的Statement的引擎类来跟踪下sharding-jdbc是对sql怎么进行路由的。
上层sql执行器接收到逻辑sql后再进行sql路由的时候会创建预编译statement对象的路由器,因此会调用其构造器
/** * 预解析的SQL路由器. * * @author zhangliang */public final class PreparedStatementRoutingEngine {// 逻辑sql private final String logicSQL;//sql路由器 private final SQLRouter sqlRouter;// sql语句对象 private SQLStatement sqlStatement; public PreparedStatementRoutingEngine(final String logicSQL, final ShardingContext shardingContext) { this.logicSQL = logicSQL; sqlRouter = SQLRouterFactory.createSQLRouter(shardingContext); }
/** * 路由引擎工厂. * * @author zhangiang */@NoArgsConstructor(access = AccessLevel.PRIVATE)public final class SQLRouterFactory { /** * 创建SQL路由器. * * @param shardingContext 数据源运行期上下文 * @return SQL路由器 */// 这里是静态工厂方法实现 public static SQLRouter createSQLRouter(final ShardingContext shardingContext) { return HintManagerHolder.isDatabaseShardingOnly() ? new DatabaseHintSQLRouter(shardingContext) : new ParsingSQLRouter(shardingContext); }}
接下来会创建ParsingSQLRouter对象
** * 需要解析的SQL路由器. * * @author zhangiang */public final class ParsingSQLRouter implements SQLRouter {// 分库分表配置对象 private final ShardingRule shardingRule;// 支持的数据库类型 private final DatabaseType databaseType;// 是否要展示sql private final boolean showSQL; private final ListgeneratedKeys;// 上面这些属性值都是存储在分片上下文中 public ParsingSQLRouter(final ShardingContext shardingContext) { shardingRule = shardingContext.getShardingRule(); databaseType = shardingContext.getDatabaseType(); showSQL = shardingContext.isShowSQL(); generatedKeys = new LinkedList<>(); }
这个方法是sql路由的入口方法
/** * SQL路由. * 当第一次路由时进行SQL解析,之后的路由复用第一次的解析结果. * * @param parameters SQL中的参数 * @return 路由结果 */public SQLRouteResult route(final List
进入到这个parse方法
sqlStatement = sqlRouter.parse(logicSQL, parameters.size());
@Override public SQLStatement parse(final String logicSQL, final int parametersSize) {// 创建sql解析引擎 SQLParsingEngine parsingEngine = new SQLParsingEngine(databaseType, logicSQL, shardingRule);// 开启度量上下文 Context context = MetricsContext.start("Parse SQL"); // sql解析器解析获得sql语句对象 SQLStatement result = parsingEngine.parse(); if (result instanceof InsertStatement) { ((InsertStatement) result).appendGenerateKeyToken(shardingRule, parametersSize); } MetricsContext.stop(context); return result; }
进入下面的sql路由方法,返回路由结果
return sqlRouter.route(logicSQL, parameters, sqlStatement);
private RoutingResult route(final List
创建简单路由引擎routingEngine = new SimpleRoutingEngine(shardingRule, parameters, tableNames.iterator().next(), sqlStatement);//单表路由
/** * 简单路由引擎. * * @author zhangliang */@RequiredArgsConstructorpublic final class SimpleRoutingEngine implements RoutingEngine {// 分库分表配置对象 private final ShardingRule shardingRule;// sql参数 private final List
不是单表路由,就走多库多表路由引擎,创建多库多表路由对象
routingEngine = new ComplexRoutingEngine(shardingRule, parameters, tableNames, sqlStatement);
/** * 混合多库表路由引擎. * * @author gaohongtao * @author zhangliang */@RequiredArgsConstructor@Slf4jpublic final class ComplexRoutingEngine implements RoutingEngine { private final ShardingRule shardingRule; private final List
return routingEngine.route();//tianhe TODO 笛卡尔积
这里是路由逻辑,这里有三种实现,一种是单表或者绑定表路由,一种是多库多表路由,一种是笛卡尔积路由
单表或者绑定表路由
@Override public RoutingResult route() {// 根据逻辑表名获得表规则配置对象 TableRule tableRule = shardingRule.getTableRule(logicTableName);// 根据表规则配置对象获得数据源集合 CollectionroutedDataSources = routeDataSources(tableRule); Map > routedMap = new LinkedHashMap<>(routedDataSources.size()); for (String each : routedDataSources) { routedMap.put(each, routeTables(tableRule, each)); } return generateRoutingResult(tableRule, routedMap); }
根据逻辑表名获得表规则配置对象 TableRule tableRule = shardingRule.getTableRule(logicTableName);
/** * 根据逻辑表名称查找分片规则. * * @param logicTableName 逻辑表名称 * @return 该逻辑表的分片规则 */ public TableRule getTableRule(final String logicTableName) {// 根据逻辑表返回表规则配置对象 OptionaltableRule = tryFindTableRule(logicTableName); if (tableRule.isPresent()) { return tableRule.get(); }// 如果默认数据源不为空就根据默认数据源创建表配置规则对象 if (dataSourceRule.getDefaultDataSource().isPresent()) { return createTableRuleWithDefaultDataSource(logicTableName, dataSourceRule); } throw new ShardingJdbcException("Cannot find table rule and default data source with logic table: '%s'", logicTableName); }
// 如果默认数据源不为空就根据默认数据源创建表配置规则对象 if (dataSourceRule.getDefaultDataSource().isPresent()) { return createTableRuleWithDefaultDataSource(logicTableName, dataSourceRule); }
// 根据默认数据源创建部分库数据分片策略,数据表不分表分片策略对象,并创建表配置规则对象进行装载 private TableRule createTableRuleWithDefaultDataSource(final String logicTableName, final DataSourceRule defaultDataSourceRule) { MapdefaultDataSourceMap = new HashMap<>(1); defaultDataSourceMap.put(defaultDataSourceRule.getDefaultDataSourceName(), defaultDataSourceRule.getDefaultDataSource().get()); return TableRule.builder(logicTableName) .dataSourceRule(new DataSourceRule(defaultDataSourceMap)) .databaseShardingStrategy(new DatabaseShardingStrategy("", new NoneDatabaseShardingAlgorithm())) .tableShardingStrategy(new TableShardingStrategy("", new NoneTableShardingAlgorithm())).build(); }
返回到这里
@Override public RoutingResult route() {// 根据逻辑表名获得表规则配置对象 TableRule tableRule = shardingRule.getTableRule(logicTableName);// 根据表规则配置对象获得数据源集合 CollectionroutedDataSources = routeDataSources(tableRule); Map > routedMap = new LinkedHashMap<>(routedDataSources.size()); for (String each : routedDataSources) { routedMap.put(each, routeTables(tableRule, each)); } return generateRoutingResult(tableRule, routedMap); }
// 根据表规则配置对象获得数据源集合 CollectionroutedDataSources = routeDataSources(tableRule);
根据分片列获取分片值
getShardingValues(strategy.getShardingColumns());
// 根据真实的数据源名称和分片值计算静态分片 Collectionresult = strategy.doStaticSharding(tableRule.getActualDatasourceNames(), shardingValues);
/** * 计算静态分片. * * @param availableTargetNames 所有的可用分片资源集合 * @param shardingValues 分片值集合 * @return 分库后指向的数据源名称集合 */ public CollectiondoStaticSharding(final Collection availableTargetNames, final Collection > shardingValues) { Collection result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);// 如果没有解析到传入的数据源分片值,要走全库路由 if (shardingValues.isEmpty()) { result.addAll(availableTargetNames); } else {// 如果传入分片值,根据分片值去获取具体的数据源 result.addAll(doSharding(shardingValues, availableTargetNames)); } return result; }
注意上面的数据库路由的默认实现,如果不传入数据库分片值会走全库路由的,数据量大的话是会影响性能的,所以建议必须要传入分片值,阿里的TDDL这里的实现是直接报错的。
// 如果传入分片值,根据分片值去获取具体的数据源 result.addAll(doSharding(shardingValues, availableTargetNames));
private CollectiondoSharding(final Collection > shardingValues, final Collection availableTargetNames) {// 如果没分片 if (shardingAlgorithm instanceof NoneKeyShardingAlgorithm) { return Collections.singletonList(((NoneKeyShardingAlgorithm) shardingAlgorithm).doSharding(availableTargetNames, shardingValues.iterator().next())); }// 如果按一个分片值分片 if (shardingAlgorithm instanceof SingleKeyShardingAlgorithm) { SingleKeyShardingAlgorithm singleKeyShardingAlgorithm = (SingleKeyShardingAlgorithm ) shardingAlgorithm; ShardingValue shardingValue = shardingValues.iterator().next(); switch (shardingValue.getType()) { case SINGLE:// = 元算符分片 return Collections.singletonList(singleKeyShardingAlgorithm.doEqualSharding(availableTargetNames, shardingValue)); case LIST:// in运算符分片 return singleKeyShardingAlgorithm.doInSharding(availableTargetNames, shardingValue); case RANGE:// between运算符分片 return singleKeyShardingAlgorithm.doBetweenSharding(availableTargetNames, shardingValue); default:// 现在只支持这三种运算符分片 throw new UnsupportedOperationException(shardingValue.getType().getClass().getName()); } }// 如果是多个分片值 if (shardingAlgorithm instanceof MultipleKeysShardingAlgorithm) { return ((MultipleKeysShardingAlgorithm) shardingAlgorithm).doSharding(availableTargetNames, shardingValues); }// 其他方式的分片不支持 throw new UnsupportedOperationException(shardingAlgorithm.getClass().getName()); }
返回到这里
@Override public RoutingResult route() {// 根据逻辑表名获得表规则配置对象 TableRule tableRule = shardingRule.getTableRule(logicTableName);// 根据表规则配置对象获得数据源集合 CollectionroutedDataSources = routeDataSources(tableRule); Map > routedMap = new LinkedHashMap<>(routedDataSources.size()); for (String each : routedDataSources) { routedMap.put(each, routeTables(tableRule, each)); } return generateRoutingResult(tableRule, routedMap); }
根据数据源和表配置规则组装路由map
routedMap.put(each, routeTables(tableRule, each));
下面这个方法是获取路由的表的集合
private CollectionrouteTables(final TableRule tableRule, final String routedDataSource) {// 获取表分片策略 TableShardingStrategy strategy = shardingRule.getTableShardingStrategy(tableRule);// 获取分片值 List > shardingValues = HintManagerHolder.isUseShardingHint() ? getTableShardingValuesFromHint(strategy.getShardingColumns()) : getShardingValues(strategy.getShardingColumns());//doDynamicSharding// 如果是动态分片走动态分片,如果是静态分片走静态分片 Collection result = tableRule.isDynamic() ? strategy.doDynamicSharding(shardingValues) : strategy.doStaticSharding(tableRule.getActualTableNames(routedDataSource), shardingValues); Preconditions.checkState(!result.isEmpty(), "no table route info"); return result; }
/** * 计算动态分片. * * @param shardingValues 分片值集合 * @return 分库后指向的分片资源集合 */public CollectiondoDynamicSharding(final Collection > shardingValues) {//doDynamicSharding Preconditions.checkState(!shardingValues.isEmpty(), "Dynamic table should contain sharding value."); Collection availableTargetNames = Collections.emptyList(); Collection result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); result.addAll(doSharding(shardingValues, availableTargetNames)); return result;}
返回到这里
@Override public RoutingResult route() {// 根据逻辑表名获得表规则配置对象 TableRule tableRule = shardingRule.getTableRule(logicTableName);// 根据表规则配置对象获得数据源集合 CollectionroutedDataSources = routeDataSources(tableRule); Map > routedMap = new LinkedHashMap<>(routedDataSources.size()); for (String each : routedDataSources) { routedMap.put(each, routeTables(tableRule, each)); } return generateRoutingResult(tableRule, routedMap); }
// 生成路由结果 return generateRoutingResult(tableRule, routedMap);
private RoutingResult generateRoutingResult(final TableRule tableRule, final Map> routedMap) { RoutingResult result = new RoutingResult();// 遍历roadMap,roadMap里面key值存储的是数据源名称,value值是物理数据表集合 for (Entry > entry : routedMap.entrySet()) {// 获取最下数据单元,每个数据单元是一个DataNode Collection dataNodes = tableRule.getActualDataNodes(entry.getKey(), entry.getValue()); for (DataNode each : dataNodes) {// 组装数据表单元装载到路由结果中 result.getTableUnits().getTableUnits().add(new TableUnit(each.getDataSourceName(), logicTableName, each.getTableName())); } } return result; }
数据模型
/** * SQL路由结果. * * @author gaohongtao * @author zhangliang */@RequiredArgsConstructor@Getterpublic final class SQLRouteResult { // sql语句对象 private final SQLStatement sqlStatement; // 最小sql执行单元集合 private final SetexecutionUnits = new LinkedHashSet<>(); private final List generatedKeys = new LinkedList<>();}
/** * SQL最小执行单元. * * @author gaohongtao */@RequiredArgsConstructor@Getter@EqualsAndHashCode@ToStringpublic final class SQLExecutionUnit { // 具体的数据源 private final String dataSource; // 具体要执行的物理sql语句 private final String sql;}
/** * 路由表单元. * * @author zhangliang */@RequiredArgsConstructor@Getter@EqualsAndHashCode@ToStringpublic final class TableUnit { // 数据源名 private final String dataSourceName; // 逻辑表名 private final String logicTableName; // 物理表名 private final String actualTableName;}
/** * 路由表单元集合. * * @author zhangliang */@Getter@ToStringpublic final class TableUnits { // 路由表单元集合 private final ListtableUnits = new LinkedList<>();
/** * 路由结果. * * @author zhangliang */@Getterpublic class RoutingResult { // 表路由单元集合 private final TableUnits tableUnits = new TableUnits();
/** * 路由表单元. * * @author zhangliang */@RequiredArgsConstructor@Getter@EqualsAndHashCode@ToStringpublic final class TableUnit {// 数据源名 private final String dataSourceName;// 逻辑表名 private final String logicTableName;// 物理表名 private final String actualTableName;}
/** * 分库分表数据单元. * * @author zhangliang */@RequiredArgsConstructor@Getter@EqualsAndHashCode@ToStringpublic class DataNode { private static final String DELIMITER = "."; private final String dataSourceName;//数据库名 private final String tableName;//表名
说到最后
以上介绍,仅供参考。