最近使用mybatis-plus的 saveOrUpdateBath 和saveBath接口执行特别慢,数据量大时往往需要十几分钟,打开日志查看原来批量操作也是循环单条数据插入的,那有没有批量更新的办法呢??
mybatis-plus 提供了一个自定义方法sql注入器DefaultSqlInjector我们可以通过继DefaultSqlInjector来加入自定义的方法达到批量插入的效果。
import com.baomidou.mybatisplus.core.injector.AbstractMethod; | |
import com.baomidou.mybatisplus.core.injector.DefaultSqlInjector; | |
import org.springframework.stereotype.Component; | |
import java.util.List; | |
/** | |
* @Description: 自定义方法SQL注入器 | |
* @Title: CustomizedSqlInjector | |
* @Package com.highgo.edu.common.batchOperation | |
* @Author: | |
* @Copyright | |
* @CreateTime: 2022/11/3 16:21 | |
*/ | |
public class CustomizedSqlInjector extends DefaultSqlInjector { | |
/** | |
* 如果只需增加方法,保留mybatis plus自带方法, | |
* 可以先获取super.getMethodList(),再添加add | |
*/ | |
public List<AbstractMethod> getMethodList(Class<?> mapperClass) { | |
List<AbstractMethod> methodList = super.getMethodList(mapperClass); | |
methodList.add(new InsertBatchMethod()); | |
// methodList.add(new UpdateBatchMethod()); | |
methodList.add(new MysqlInsertOrUpdateBath()); | |
methodList.add(new PGInsertOrUpdateBath()); | |
return methodList; | |
} | |
} |
同时我们需要继承BaseMapper<T> 定义
import com.baomidou.mybatisplus.core.mapper.BaseMapper; | |
import org.apache.ibatis.annotations.Param; | |
import java.util.List; | |
/** | |
* @description:自定义接口覆盖BaseMapper,解决mybatis-plus 批量操作慢的问题 | |
* @author: | |
* @date: 2022/11/3 15:14 | |
* @param: null | |
* @return: | |
**/ | |
public interface RootMapper<T> extends BaseMapper<T> { | |
/** | |
* @description:批量插入 | |
* @author: | |
* @date: 2022/11/3 15:13 | |
* @param: [list] | |
* @return: int | |
**/ | |
int insertBatch(; List<T> list) | |
/** | |
* @description:批量插入更新 | |
* @author: | |
* @date: 2022/11/3 15:14 | |
* @param: [list] | |
* @return: int | |
**/ | |
int mysqlInsertOrUpdateBatch(; List<T> list) | |
int pgInsertOrUpdateBatch(; List<T> list) | |
} |
在需要使用批量更新插入的mapper上使用自定义的RootMapper
如下图
import com.XX.edu.common.batchOperation.RootMapper; | |
import com.XX.edu.exam.model.TScore; | |
import org.springframework.stereotype.Repository; | |
/** | |
* @Entity com.XX.edu.exam.model.TScore | |
*/ | |
public interface TScoreMapper extends RootMapper<TScore> { | |
} |
下面我们来定义批量插入的方法:
package com.XX.edu.common.batchOperation; | |
import com.baomidou.mybatisplus.annotation.IdType; | |
import com.baomidou.mybatisplus.core.enums.SqlMethod; | |
import com.baomidou.mybatisplus.core.injector.AbstractMethod; | |
import com.baomidou.mybatisplus.core.metadata.TableInfo; | |
import com.baomidou.mybatisplus.core.metadata.TableInfoHelper; | |
import org.apache.commons.lang3.StringUtils; | |
import org.apache.ibatis.executor.keygen.Jdbc3KeyGenerator; | |
import org.apache.ibatis.executor.keygen.KeyGenerator; | |
import org.apache.ibatis.executor.keygen.NoKeyGenerator; | |
import org.apache.ibatis.mapping.MappedStatement; | |
import org.apache.ibatis.mapping.SqlSource; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
/** | |
* @Description: 批量插入的方法 | |
* @Title: InsertBatchMethod | |
* @Package com.XX.edu.common.batchOperation | |
* @Author: | |
* @CreateTime: 2022/11/3 15:16 | |
*/ | |
public class InsertBatchMethod extends AbstractMethod { | |
Logger logger = LoggerFactory.getLogger(getClass()); | |
public MappedStatement injectMappedStatement(Class<?> mapperClass, Class<?> modelClass, TableInfo tableInfo) { | |
final String sql = "<script>insert into %s %s values %s</script>"; | |
final String fieldSql = prepareFieldSql(tableInfo); | |
final String valueSql = prepareValuesSql(tableInfo); | |
final String sqlResult = String.format(sql, tableInfo.getTableName(), fieldSql, valueSql); | |
logger.debug("sqlResult----->{}", sqlResult); | |
SqlSource sqlSource = languageDriver.createSqlSource(configuration, sqlResult, modelClass); | |
KeyGenerator keyGenerator = new NoKeyGenerator(); | |
SqlMethod sqlMethod = SqlMethod.INSERT_ONE; | |
String keyProperty = null; | |
String keyColumn = null; | |
// 表包含主键处理逻辑,如果不包含主键当普通字段处理 | |
if (StringUtils.isNotEmpty(tableInfo.getKeyProperty())) { | |
if (tableInfo.getIdType() == IdType.AUTO) { | |
/* 自增主键 */ | |
keyGenerator = new Jdbc3KeyGenerator(); | |
keyProperty = tableInfo.getKeyProperty(); | |
keyColumn = tableInfo.getKeyColumn(); | |
} else { | |
if (null != tableInfo.getKeySequence()) { | |
keyGenerator = TableInfoHelper.genKeyGenerator(sqlMethod.getMethod(),tableInfo, builderAssistant); | |
keyProperty = tableInfo.getKeyProperty(); | |
keyColumn = tableInfo.getKeyColumn(); | |
} | |
} | |
} | |
// 第三个参数必须和RootMapper的自定义方法名一致 | |
return this.addInsertMappedStatement(mapperClass, modelClass, "insertBatch", sqlSource, keyGenerator, keyProperty, keyColumn); | |
} | |
/** | |
* @description: 拼接字段值 | |
* @author: | |
* @date: 2022/11/3 15:20 | |
* @param: [tableInfo] | |
* @return: java.lang.String | |
**/ | |
private String prepareValuesSql(TableInfo tableInfo) { | |
final StringBuilder valueSql = new StringBuilder(); | |
valueSql.append("<foreach collection=\"list\" item=\"item\" index=\"index\" open=\"(\" separator=\"),(\" close=\")\">"); | |
//valueSql.append("#{item.").append(tableInfo.getKeyProperty()).append("},"); | |
tableInfo.getFieldList().forEach(x -> valueSql.append("#{item.").append(x.getProperty()).append("},")); | |
valueSql.delete(valueSql.length() - 1, valueSql.length()); | |
valueSql.append("</foreach>"); | |
return valueSql.toString(); | |
} | |
/** | |
* @description:拼接字段 | |
* @author: | |
* @date: 2022/11/3 15:20 | |
* @param: [tableInfo] | |
* @return: java.lang.String | |
**/ | |
private String prepareFieldSql(TableInfo tableInfo) { | |
StringBuilder fieldSql = new StringBuilder(); | |
//fieldSql.append(tableInfo.getKeyColumn()).append(","); | |
tableInfo.getFieldList().forEach(x -> { | |
fieldSql.append(x.getColumn()).append(","); | |
}); | |
fieldSql.delete(fieldSql.length() - 1, fieldSql.length()); | |
fieldSql.insert(0, "("); | |
fieldSql.append(")"); | |
return fieldSql.toString(); | |
} | |
} |
继续定义批量插入更新的抽象方法
package com.XX.edu.common.batchOperation; | |
import com.baomidou.mybatisplus.core.injector.AbstractMethod; | |
import com.baomidou.mybatisplus.core.metadata.TableInfo; | |
import org.apache.ibatis.executor.keygen.NoKeyGenerator; | |
import org.apache.ibatis.mapping.MappedStatement; | |
import org.apache.ibatis.mapping.SqlSource; | |
/** | |
* @Description: 批量插入更新 | |
* @Title: InsertOrUpdateBath | |
* @Package com.XX.edu.common.batchOperation | |
* @Author: | |
* @Copyright | |
* @CreateTime: 2022/11/3 15:23 | |
*/ | |
public abstract class InsertOrUpdateBathAbstract extends AbstractMethod { | |
public MappedStatement injectMappedStatement(Class<?> mapperClass, Class<?> modelClass, TableInfo tableInfo) { | |
final SqlSource sqlSource = prepareSqlSource(tableInfo, modelClass); | |
// 第三个参数必须和RootMapper的自定义方法名一致 | |
return this.addInsertMappedStatement(mapperClass, modelClass, prepareInsertOrUpdateBathName(), sqlSource, new NoKeyGenerator(), null, null); | |
} | |
protected abstract SqlSource prepareSqlSource(TableInfo tableInfo, Class<?> modelClass); | |
protected abstract String prepareInsertOrUpdateBathName(); | |
} |
继承上面的抽象类----mysql版本(本版本未测试 根据自己需求修改)
package com.XX.edu.common.batchOperation; | |
import com.baomidou.mybatisplus.core.metadata.TableInfo; | |
import org.apache.ibatis.mapping.SqlSource; | |
import org.springframework.util.StringUtils; | |
/** | |
* @Description: 批量插入更新 | |
* @Title: InsertOrUpdateBath | |
* @Package com.XX.edu.common.batchOperation | |
* @Author: | |
* @Copyright | |
* @CreateTime: 2022/11/3 15:23 | |
*/ | |
public class MysqlInsertOrUpdateBath extends InsertOrUpdateBathAbstract { | |
protected SqlSource prepareSqlSource(TableInfo tableInfo, Class<?> modelClass) { | |
final String sql = "<script>insert into %s %s values %s ON DUPLICATE KEY UPDATE %s</script>"; | |
final String tableName = tableInfo.getTableName(); | |
final String filedSql = prepareFieldSql(tableInfo); | |
final String modelValuesSql = prepareModelValuesSql(tableInfo); | |
final String duplicateKeySql = prepareDuplicateKeySql(tableInfo); | |
final String sqlResult = String.format(sql, tableName, filedSql, modelValuesSql, filedSql, duplicateKeySql); | |
//String.format(sql, tableName, filedSql, modelValuesSql, duplicateKeySql); | |
//System.out.println("savaorupdatesqlsql="+sqlResult); | |
return languageDriver.createSqlSource(configuration, sqlResult, modelClass); | |
} | |
protected String prepareInsertOrUpdateBathName() { | |
return "mysqlInsertOrUpdateBath"; | |
} | |
String prepareDuplicateKeySql(TableInfo tableInfo) { | |
final StringBuilder duplicateKeySql = new StringBuilder(); | |
if (!StringUtils.isEmpty(tableInfo.getKeyColumn())) { | |
duplicateKeySql.append(tableInfo.getKeyColumn()).append("=values(").append(tableInfo.getKeyColumn()).append("),"); | |
} | |
tableInfo.getFieldList().forEach(x -> { | |
duplicateKeySql.append(x.getColumn()) | |
.append("=values(") | |
.append(x.getColumn()) | |
.append("),"); | |
}); | |
duplicateKeySql.delete(duplicateKeySql.length() - 1, duplicateKeySql.length()); | |
return duplicateKeySql.toString(); | |
} | |
String prepareModelValuesSql(TableInfo tableInfo) { | |
final StringBuilder valueSql = new StringBuilder(); | |
valueSql.append("<foreach collection=\"list\" item=\"item\" index=\"index\" open=\"(\" separator=\"),(\" close=\")\">"); | |
if (!StringUtils.isEmpty(tableInfo.getKeyProperty())) { | |
valueSql.append("#{item.").append(tableInfo.getKeyProperty()).append("},"); | |
} | |
tableInfo.getFieldList().forEach(x -> valueSql.append("#{item.").append(x.getProperty()).append("},")); | |
valueSql.delete(valueSql.length() - 1, valueSql.length()); | |
valueSql.append("</foreach>"); | |
return valueSql.toString(); | |
} | |
/** | |
* @description:准备属性名 | |
* @author: | |
* @date: 2022/11/3 15:25 | |
* @param: [tableInfo] | |
* @return: java.lang.String | |
**/ | |
String prepareFieldSql(TableInfo tableInfo) { | |
StringBuilder fieldSql = new StringBuilder(); | |
fieldSql.append(tableInfo.getKeyColumn()).append(","); | |
tableInfo.getFieldList().forEach(x -> { | |
fieldSql.append(x.getColumn()).append(","); | |
}); | |
fieldSql.delete(fieldSql.length() - 1, fieldSql.length()); | |
fieldSql.insert(0, "("); | |
fieldSql.append(")"); | |
return fieldSql.toString(); | |
} | |
} |
继承上面的抽象类----postgresql版本(已测试完成,其中id使用序列自增)
package com.XX.edu.common.batchOperation; | |
import com.baomidou.mybatisplus.core.metadata.TableInfo; | |
import org.apache.ibatis.mapping.SqlSource; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.util.StringUtils; | |
/** | |
* @Description: 批量插入更新 | |
* @Title: InsertOrUpdateBath | |
* @Package com.XX.edu.common.batchOperation | |
* @Author: | |
* @Copyright | |
* @CreateTime: 2022/11/3 15:23 | |
*/ | |
public class PGInsertOrUpdateBath extends InsertOrUpdateBathAbstract { | |
Logger logger = LoggerFactory.getLogger(getClass()); | |
protected SqlSource prepareSqlSource(TableInfo tableInfo, Class<?> modelClass) { | |
final String sql = "<script>insert into %s %s values %s on conflict (id) do update set %s </script>"; | |
final String tableName = tableInfo.getTableName(); | |
final String filedSql = prepareFieldSql(tableInfo); | |
final String modelValuesSql = prepareModelValuesSql(tableInfo); | |
final String duplicateKeySql = prepareDuplicateKeySql(tableInfo); | |
final String sqlResult = String.format(sql, tableName, filedSql, modelValuesSql, duplicateKeySql); | |
logger.info("sql=={}",sqlResult); | |
return languageDriver.createSqlSource(configuration, sqlResult, modelClass); | |
} | |
protected String prepareInsertOrUpdateBathName() { | |
return "pgInsertOrUpdateBatch"; | |
} | |
private String prepareDuplicateKeySql(TableInfo tableInfo) { | |
final StringBuilder duplicateKeySql = new StringBuilder(); | |
if (!StringUtils.isEmpty(tableInfo.getKeyColumn())) { | |
duplicateKeySql.append(tableInfo.getKeyColumn()).append("=excluded.").append(tableInfo.getKeyColumn()).append(","); | |
} | |
tableInfo.getFieldList().forEach(x -> { | |
duplicateKeySql.append(x.getColumn()) | |
.append("=excluded.") | |
.append(x.getColumn()) | |
.append(","); | |
}); | |
duplicateKeySql.delete(duplicateKeySql.length() - 1, duplicateKeySql.length()); | |
return duplicateKeySql.toString(); | |
} | |
private String prepareModelValuesSql(TableInfo tableInfo) { | |
final StringBuilder valueSql = new StringBuilder(); | |
valueSql.append("<foreach collection=\"list\" item=\"item\" index=\"index\" open=\"(\" separator=\"),(\" close=\")\">"); | |
if (!StringUtils.isEmpty(tableInfo.getKeyProperty())) { | |
valueSql.append("#{item.").append(tableInfo.getKeyProperty()).append("},"); | |
} | |
tableInfo.getFieldList().forEach(x -> valueSql.append("#{item.").append(x.getProperty()).append("},")); | |
valueSql.delete(valueSql.length() - 1, valueSql.length()); | |
valueSql.append("</foreach>"); | |
return valueSql.toString(); | |
} | |
/** | |
* @description:准备属性名 | |
* @author: | |
* @date: 2022/11/3 15:25 | |
* @param: [tableInfo] | |
* @return: java.lang.String | |
**/ | |
private String prepareFieldSql(TableInfo tableInfo) { | |
StringBuilder fieldSql = new StringBuilder(); | |
if (!StringUtils.isEmpty(tableInfo.getKeyProperty())) { | |
fieldSql.append(tableInfo.getKeyColumn()).append(","); | |
} | |
tableInfo.getFieldList().forEach(x -> { | |
fieldSql.append(x.getColumn()).append(","); | |
}); | |
fieldSql.delete(fieldSql.length() - 1, fieldSql.length()); | |
fieldSql.insert(0, "("); | |
fieldSql.append(")"); | |
return fieldSql.toString(); | |
} | |
} |
到此定义结束,下面开始使用
public class TNewExerciseServiceImpl extends ServiceImpl<TNewExerciseMapper, TNewExercise> | |
implements TNewExerciseService { | |
Logger logger = LoggerFactory.getLogger(getClass()); | |
//引入mapper | |
TScoreMapper scoreMapper; | |
//这样就可以批量新增更新操作了 | |
public void test(List<TScore> collect){ | |
scoreMapper.pgInsertOrUpdateBatch(collect); | |
} | |
} |
但是如果collect数据量太大会出现异常
“Tried to send an out-of-range integer as a 2-byte value: 87923”
是因为pg对于sql语句的参数数量是有限制的,最大为32767。
看pg源码
public void sendInteger2(int val) throws IOException { | |
if (val >= -32768 && val <= 32767) { | |
this.int2Buf[0] = (byte)(val >>> 8); | |
this.int2Buf[1] = (byte)val; | |
this.pgOutput.write(this.int2Buf); | |
} else { | |
throw new IOException("Tried to send an out-of-range integer as a 2-byte value: " + val); | |
} | |
} |
从源代码中可以看到pgsql使用2个字节的integer,故其取值范围为[-32768, 32767]。
这意味着sql语句的参数数量,即行数*列数之积必须小于等于32767.
比如,总共有17个字段,因为最大是32767,这样最多允许32767/ 17 大约是1 927个,所以要分批操作,或有能力的童鞋可以自己修改pg的驱动呦
分批插入代码如下:
/** | |
* @description: | |
* @author: | |
* @date: 2022/11/4 14:57 | |
* @param: [list, fieldCount:列数] | |
* @return: void | |
**/ | |
public void detachSaveOrUpdate_score(List<TScore> list, int fieldCount) { | |
int numberBatch = 32767; //每一次插入的最大数 | |
//每一次插入的最大行数 , 向下取整 | |
int v = ((Double) Math.floor(numberBatch / (fieldCount * 1.0))).intValue(); | |
double number = list.size() * 1.0 / v; | |
int n = ((Double) Math.ceil(number)).intValue(); //向上取整 | |
for (int i = 0; i < n; i++) { | |
int end = v * (i + 1); | |
if (end > list.size()) { | |
end = list.size(); //如果end不能超过最大索引值 | |
} | |
scoreMapper.pgInsertOrUpdateBatch(list.subList(v * i, end)); //插入数据库 | |
logger.info("更新一次~~~{}-{}", v * i, end); | |
} | |
} |
完成收工~~~