目录
- 1. 注入多数据源
- 2. 动态数据源
- (1) 创建并注入动态数据源
- (2) Mybatis配置类
- (3) 使用注解简化数据源切换
- 3. 结语
现在有一个Mysql数据源和一个Postgresql数据源,使用Mybatis对两个数据源进行操作:
1. 注入多数据源
可以对两个数据源分别实现其Service层和Mapper层,以及Mybatis的配置类:
// 这里需要配置扫描包路径,以及sqlSessionTemplateRef | |
public class MysqlMybatisConfigurer { | |
/** | |
* 注入Mysql数据源 | |
*/ | |
public DataSource mysqlDatasource() { | |
return new DruidDataSource(); | |
} | |
/** | |
* 注入mysqlSqlSessionFactory | |
*/ | |
public SqlSessionFactory mysqlSqlSessionFactory(DataSource mysqlDatasource) throws Exception { | |
SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean(); | |
factoryBean.setDataSource(mysqlDatasource); | |
// 设置对应的mapper文件 | |
factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:" + | |
"/mappers/MysqlMapper.xml")); | |
return factoryBean.getObject(); | |
} | |
/** | |
* 注入mysqlSqlSessionTemplate | |
*/ | |
public SqlSessionTemplate mysqlSqlSessionTemplate(SqlSessionFactory mysqlSqlSessionFactory) { | |
return new SqlSessionTemplate(mysqlSqlSessionFactory); | |
} | |
/** | |
* 注入mysqlTransactionalManager | |
*/ | |
public DataSourceTransactionManager mysqlTransactionalManager(DataSource mysqlDatasource) { | |
return new DataSourceTransactionManager(mysqlDatasource); | |
} | |
} | |
// 这里需要配置扫描包路径,以及sqlSessionTemplateRef | |
public class PostgresqlMybatisConfigurer { | |
/** | |
* 注入Postgresql数据源 | |
*/ | |
public DataSource postgresqlDatasource() { | |
return new DruidDataSource(); | |
} | |
/** | |
* 注入postgresqlSqlSessionFactory | |
*/ | |
public SqlSessionFactory postgresqlSqlSessionFactory(DataSource postgresqlDatasource) throws Exception { | |
SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean(); | |
factoryBean.setDataSource(postgresqlDatasource); | |
// 设置对应的mapper文件 | |
factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:" + | |
"/mappers/PostgresqlMapper.xml")); | |
return factoryBean.getObject(); | |
} | |
/** | |
* 注入postgresqlSqlSessionTemplate | |
*/ | |
public SqlSessionTemplate postgresqlSqlSessionTemplate(SqlSessionFactory postgresqlSqlSessionFactory) { | |
return new SqlSessionTemplate(postgresqlSqlSessionFactory); | |
} | |
/** | |
* 注入postgresqlTransactionalManager | |
*/ | |
public DataSourceTransactionManager postgresqlTransactionalManager(DataSource postgresqlDatasource) { | |
return new DataSourceTransactionManager(postgresqlDatasource); | |
} | |
} |
在配置类中,分别注入了一个事务管理器TransactionManager,这个和事务管理是相关的。在使用@Transactional注解时,需要配置其value属性指定对应的事务管理器。
2. 动态数据源
Spring中提供了AbstractRoutingDataSource抽象类,可以用于动态地选择数据源。
public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean { | |
private Map<Object, Object> targetDataSources; | |
private Object defaultTargetDataSource; | |
private boolean lenientFallback = true; | |
private DataSourceLookup dataSourceLookup = new JndiDataSourceLookup(); | |
private Map<Object, DataSource> resolvedDataSources; | |
private DataSource resolvedDefaultDataSource; | |
// 略 | |
} |
通过源码可以看到,该抽象类实现了InitializingBean接口,并在其afterPropertiesSet方法中将数据源以<lookupkey, dataSource>的形式放入一个Map中。
public void afterPropertiesSet() { | |
if (this.targetDataSources == null) { | |
throw new IllegalArgumentException("Property 'targetDataSources' is required"); | |
} else { | |
this.resolvedDataSources = CollectionUtils.newHashMap(this.targetDataSources.size()); | |
this.targetDataSources.forEach((key, value) -> { | |
Object lookupKey = this.resolveSpecifiedLookupKey(key); | |
DataSource dataSource = this.resolveSpecifiedDataSource(value); | |
// 将数据源以<lookupkey, dataSource>的形式放入Map中 | |
this.resolvedDataSources.put(lookupKey, dataSource); | |
}); | |
if (this.defaultTargetDataSource != null) { | |
this.resolvedDefaultDataSource = this.resolveSpecifiedDataSource(this.defaultTargetDataSource); | |
} | |
} | |
} |
该类中还有一个determineTargetDataSource方法,是根据lookupkey从Map中获取对应的数据源,如果没有获取到,则使用默认的数据源。
protected DataSource determineTargetDataSource() { | |
Assert.notNull(this.resolvedDataSources, "DataSource router not initialized"); | |
Object lookupKey = this.determineCurrentLookupKey(); | |
// 根据lookupkey从Map中获取对应的数据源 | |
DataSource dataSource = (DataSource)this.resolvedDataSources.get(lookupKey); | |
if (dataSource == null && (this.lenientFallback || lookupKey == null)) { | |
dataSource = this.resolvedDefaultDataSource; | |
} | |
if (dataSource == null) { | |
throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]"); | |
} else { | |
return dataSource; | |
} | |
} |
lookupkey是通过determineTargetDataSource方法获取到的,而它是一个抽象方法,我们要做的就是通过实现这个方法,来控制获取到的数据源。
@Nullable | |
protected abstract Object determineCurrentLookupKey(); |
(1) 创建并注入动态数据源
创建AbstractRoutingDataSource的子类,实现determineCurrentLookupKey方法
public class RoutingDataSource extends AbstractRoutingDataSource { | |
protected Object determineCurrentLookupKey() { | |
return DataSourceContextHolder.get(); | |
} | |
} |
这里的DataSourceContextHolder是一个操作ThreadLocal对象的工具类
public class DataSourceContextHolder { | |
/** | |
* 数据源上下文 | |
*/ | |
private static final ThreadLocal<DataSourceType> contextHolder = new ThreadLocal<>(); | |
/** | |
* 设置数据源类型 | |
*/ | |
public static void set(DataSourceType type) { | |
contextHolder.set(type); | |
} | |
/** | |
* 获取数据源类型 | |
* | |
* @return DataSourceType | |
*/ | |
public static DataSourceType get() { | |
return contextHolder.get(); | |
} | |
/** | |
* 使用MYSQL数据源 | |
*/ | |
public static void mysql() { | |
set(DataSourceType.MYSQL); | |
} | |
/** | |
* 使用Postgresql数据源 | |
*/ | |
public static void postgresql() { | |
set(DataSourceType.POSTGRESQL); | |
} | |
public static void remove() { | |
contextHolder.remove(); | |
} | |
} |
通过调用DataSourceContextHolder.mysql()或者DataSourceContextHolder.postgresql()就能修改contextHolder的值,从而在动态数据源的determineTargetDataSource方法中就能获取到对应的数据源。
在数据源配置类中,将mysql和postgresql的数据源设置到动态数据源的Map中,并注入容器。
public class DataSourceConfigurer { | |
"spring.datasource.mysql") | (prefix =|
public DataSource mysqlDatasource() { | |
return new DruidDataSource(); | |
} | |
"spring.datasource.postgresql") | (prefix =|
public DataSource postgresqlDatasource() { | |
return new DruidDataSource(); | |
} | |
public RoutingDataSource routingDataSource(DataSource mysqlDatasource, DataSource postgresqlDatasource) { | |
Map<Object, Object> dataSources = new HashMap<>(); | |
dataSources.put(DataSourceType.MYSQL, mysqlDatasource); | |
dataSources.put(DataSourceType.POSTGRESQL, postgresqlDatasource); | |
RoutingDataSource routingDataSource = new RoutingDataSource(); | |
routingDataSource.setDefaultTargetDataSource(mysqlDatasource); | |
// 设置数据源 | |
routingDataSource.setTargetDataSources(dataSources); | |
return routingDataSource; | |
} | |
} |
(2) Mybatis配置类
由于使用了动态数据源,所以只需要编写一个配置类即可。
public class MybatisConfigurer { | |
// 注入动态数据源 | |
private RoutingDataSource routingDataSource; | |
public SqlSessionFactory sqlSessionFactory() throws Exception { | |
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean(); | |
sqlSessionFactoryBean.setDataSource(routingDataSource); | |
// 这里可以直接设置所有的mapper.xml文件 | |
sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath" + | |
":mappers/*.xml")); | |
return sqlSessionFactoryBean.getObject(); | |
} | |
public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) { | |
return new SqlSessionTemplate(sqlSessionFactory); | |
} | |
public DataSourceTransactionManager transactionalManager(DataSource mysqlDatasource) { | |
return new DataSourceTransactionManager(mysqlDatasource); | |
} | |
} |
(3) 使用注解简化数据源切换
我们虽然可以使用DataSourceContextHolder类中的方法进行动态数据源切换,但是这种方式有些繁琐,不够优雅。可以考虑使用注解的形式简化数据源切换。
我们先定义两个注解,表示使用Mysql数据源或Postgresql数据源:
@Target({ElementType.TYPE, ElementType.METHOD}) | |
@Retention(RetentionPolicy.RUNTIME) | |
public @interface Mysql { | |
} | |
@Target({ElementType.TYPE, ElementType.METHOD}) | |
@Retention(RetentionPolicy.RUNTIME) | |
public @interface Postgresql { | |
} |
再定义一个切面,当使用了注解时,会先调用切换数据源的方法,再执行后续逻辑。
@Component | |
@Aspect | |
public class DataSourceAspect { | |
@Pointcut("@within(com.example.mybatisdemo.aop.Mysql) || @annotation(com.example.mybatisdemo.aop.Mysql)") | |
public void mysqlPointcut() { | |
} | |
@Pointcut("@within(com.example.mybatisdemo.aop.Postgresql) || @annotation(com.example.mybatisdemo.aop.Postgresql)") | |
public void postgresqlPointcut() { | |
} | |
@Before("mysqlPointcut()") | |
public void mysql() { | |
DataSourceContextHolder.mysql(); | |
} | |
@Before("postgresqlPointcut()") | |
public void postgresql() { | |
DataSourceContextHolder.postgresql(); | |
} | |
} |
在使用动态数据源的事务操作时有两个需要注意的问题:
问题一 同一个事务操作两个数据源
Mybatis使用Executor执行SQL时需要获取连接,BaseExecutor类中的getConnection方法调用了SpringManagedTransaction中的getConnection方法,这里优先从connection字段获取连接,如果connection为空,才会调用openConnection方法,并把连接赋给connection字段。
也就是说,如果你使用的是同一个事务来操作两个数据源,那拿到的都是同一个连接,会导致数据源切换失败。
protected Connection getConnection(Log statementLog) throws SQLException { | |
Connection connection = this.transaction.getConnection(); | |
return statementLog.isDebugEnabled() ? ConnectionLogger.newInstance(connection, statementLog, this.queryStack) : connection; | |
} | |
public Connection getConnection() throws SQLException { | |
if (this.connection == null) { | |
this.openConnection(); | |
} | |
return this.connection; | |
} | |
private void openConnection() throws SQLException { | |
this.connection = DataSourceUtils.getConnection(this.dataSource); | |
this.autoCommit = this.connection.getAutoCommit(); | |
this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource); | |
LOGGER.debug(() -> { | |
return "JDBC Connection [" + this.connection + "] will" + (this.isConnectionTransactional ? " " : " not ") + "be managed by Spring"; | |
}); | |
} |
问题二 两个独立事务分别操作两个数据源
(1) 在开启事务的时候,DataSourceTransactionManager中的doBegin方法会先获取Connection,并保存到ConnectionHolder中,将数据源和ConnectionHolder的对应关系绑定到TransactionSynchronizationManager中。
protected void doBegin(Object transaction, TransactionDefinition definition) { | |
DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction; | |
Connection con = null; | |
try { | |
if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { | |
// 获取连接 | |
Connection newCon = this.obtainDataSource().getConnection(); | |
if (this.logger.isDebugEnabled()) { | |
this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); | |
} | |
// 保存到ConnectionHolder中 | |
txObject.setConnectionHolder(new ConnectionHolder(newCon), true); | |
} | |
txObject.getConnectionHolder().setSynchronizedWithTransaction(true); | |
// 从ConnectionHolder获取连接 | |
con = txObject.getConnectionHolder().getConnection(); | |
// 略 | |
// 将数据源和ConnectionHolder的关系绑定到TransactionSynchronizationManager中 | |
if (txObject.isNewConnectionHolder()) { | |
TransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder()); | |
} | |
// 略 | |
} |
(2) TransactionSynchronizationManager的bindResource方法将数据源和ConnectionHolder的对应关系存入线程变量resources中。
public abstract class TransactionSynchronizationManager { | |
// 线程变量 | |
private static final ThreadLocal<Map<Object, Object>> resources = | |
new NamedThreadLocal<>("Transactional resources"); | |
// 略 | |
// 绑定数据源和ConnectionHolder的对应关系 | |
public static void bindResource(Object key, Object value) throws IllegalStateException { | |
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); | |
Assert.notNull(value, "Value must not be null"); | |
Map<Object, Object> map = resources.get(); | |
// set ThreadLocal Map if none found | |
if (map == null) { | |
map = new HashMap<>(); | |
resources.set(map); | |
} | |
Object oldValue = map.put(actualKey, value); | |
// Transparently suppress a ResourceHolder that was marked as void... | |
if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) { | |
oldValue = null; | |
} | |
if (oldValue != null) { | |
throw new IllegalStateException( | |
"Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread"); | |
} | |
} | |
// 略 | |
} |
(3) 上边提到的openConnection方法,其实最终也是从TransactionSynchronizationManager的resources中获取连接的
public static Connection doGetConnection(DataSource dataSource) throws SQLException { | |
Assert.notNull(dataSource, "No DataSource specified"); | |
// 获取ConnectionHolder | |
ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(dataSource); | |
if (conHolder == null || !conHolder.hasConnection() && !conHolder.isSynchronizedWithTransaction()) { | |
logger.debug("Fetching JDBC Connection from DataSource"); | |
Connection con = fetchConnection(dataSource); | |
if (TransactionSynchronizationManager.isSynchronizationActive()) { | |
try { | |
ConnectionHolder holderToUse = conHolder; | |
if (conHolder == null) { | |
holderToUse = new ConnectionHolder(con); | |
} else { | |
conHolder.setConnection(con); | |
} | |
holderToUse.requested(); | |
TransactionSynchronizationManager.registerSynchronization(new DataSourceUtils.ConnectionSynchronization(holderToUse, dataSource)); | |
holderToUse.setSynchronizedWithTransaction(true); | |
if (holderToUse != conHolder) { | |
TransactionSynchronizationManager.bindResource(dataSource, holderToUse); | |
} | |
} catch (RuntimeException var4) { | |
releaseConnection(con, dataSource); | |
throw var4; | |
} | |
} | |
return con; | |
} else { | |
conHolder.requested(); | |
if (!conHolder.hasConnection()) { | |
logger.debug("Fetching resumed JDBC Connection from DataSource"); | |
conHolder.setConnection(fetchConnection(dataSource)); | |
} | |
// 从ConnectionHolder中获取连接 | |
return conHolder.getConnection(); | |
} | |
} |
也就是说,如果修改了数据源,那么resources中就找不到对应的连接,就可以重新获取连接,从而达到切换数据源的目的。然而我们数据源的只有一个,就是动态数据源,因此即使使用两个独立事务,也不能成功切换数据源。
3. 结语
如果想要使用动态数据源的事务处理,可能需要考虑使用多线程分布式的事务处理机制;
如果使用直接注入多个数据源的方式实现事务处理,实现简单,但是各数据源事务是独立的;
应该根据具体情况进行选择。