一 事物五大类
二 事物使用区分
1 自动回滚和手动回滚不能一起使用回报错冲突除非PROPAGATION_REQUIRES_NEW新事物才不会和自动事物冲突
2 手动回滚包含两种
1》 sql Session
- // 获取数据库连接,获取会话(内部自有事务)
- SqlSession sqlSession = sqlContext.getSqlSession();
- Connection connection = sqlSession.getConnection();
2 》 transaction Manager
注意 DataSource TransactionManager.rollback和 TransactionStatus.setRollbackOnly区别
| setRollbackOnly | rollback |
不同点 | 控制范围小,资源释放需要配合自动事物 | 控制范围大,释放不需要自动事物管理 |
相同点 | 能回滚 | 能回滚 |
三 下面是测试分析
情况一手动事物死锁
情况二手动事物不锁
情况三自动+手动事物不锁表(注意手动需要开启新事物不然冲突)
情况四自动+手动事物锁表
情况五自动+手动事物锁表
情况六自动+手动事物不锁表
查看 mysql 事物锁
//当前运行的所有事务 首先我们查看被锁表的进程
SELECT * FROM information_schema.INNODB_TRX;
//当前出现的锁
SELECT * FROM information_schema.INNODB_LOCKs;
//锁等待的对应关系
SELECT * FROM information_schema.INNODB_LOCK_waits;
解决:kill事务,清理表数据,转移到历史表,检查定时任务
然后找到进程号,即 trx_mysql_thread_id
然后执行;
kill 进程号;
四 多线程 事物推荐一下两种
注意 线程 不能用太多,调整mysql最大连接数5.7默认151改成500,根据cpu情况来
推荐最大核心数=cpu*2+1,但是还有其它线程使用sleep和await,所见建议处理大数据量的时候
线程控制在10以内
@Override
public void testTranslationOfThreads6() throws SQL Exception {
//准备数据条
List<Cdr> list = new ArrayList<>();
for (int i =; i < 50000; i++) {
Cdr cdr = new Cdr();
cdr.setSrc("" + i);
cdr.setDatetime(DateUtils.formatTime(new Date()));
list.add(cdr);
}
// 获取数据库连接,获取会话(内部自有事务)
SqlSession sqlSession = sqlContext.getSqlSession();
Connection connection = sqlSession.getConnection();
try {
// 设置手动提交
connection.setAutoCommit(false);
//获取mapper
CdrMapper employeeMapper = sqlSession.getMapper(CdrMapper.class);
//先做删除操作
// employeeMapper.remove()
//获取执行器
Executor Service service = ExecutorConfig.getThreadPool();
List<Callable<Integer>> callableList = new ArrayList<>();
//拆分list
List<List<Cdr>> lists = averageAssign(list,);
Atomic boolean atomic Boolean = new AtomicBoolean(true);
for (int i =; i < lists.size(); i++) {
if (i == lists.size() -) {
atomicBoolean.set(false);
}
List<Cdr> list = lists.get(i);
//使用返回结果的callable去执行,
Callable<Integer> callable = () -> {
//让最后一个线程抛出异常
if (!atomicBoolean.get()) {
throw new JeecgBootException(, "出现异常");
}
int insert = employeeMapper.insert(list.get(0));
return insert;
};
callableList.add(callable);
}
//执行子线程
List<Future<Integer>> futures = service.invokeAll(callableList);
for (Future<Integer> future : futures) {
//如果有一个执行不成功,则全部回滚
if (future.get() <=) {
connection.rollback();
return;
}
}
connection.commit();
System.out.println("添加完毕");
} catch (Exception e) {
connection.rollback();
log.info("error", e);
throw new JeecgBootException(, "出现异常");
} finally {
connection.close();
}
}
@Override
public void testTranslationOfThreads() {
long startTime = System.currentTimeMillis();
//准备数据条
List<Cdr> list = new ArrayList<>();
for (int i =; i < 50000; i++) {
Cdr cdr = new Cdr();
cdr.setSrc("" + i);
cdr.setDatetime(DateUtils.formatTime(new Date()));
list.add(cdr);
}
// 线程数量
final Integer threadCount =;
//每个线程处理的数据量
final Integer dataPartionLength = (list.size() + threadCount -) / threadCount;
// 创建多线程处理任务
ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
CountDownLatch threadLatchs = new CountDownLatch(threadCount);
AtomicBoolean isError = new AtomicBoolean(false);
try {
for (int i =; i < threadCount; i++) {
// 每个线程处理的数据
List<Cdr> threadDatas = list.stream()
.skip(i * dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());
studentThreadPool.execute(() -> {
try {
try {
this.updateStudentsTransaction(transactionManager, transactionStatuses, threadDatas);
} catch (Throwable e) {
e.printStackTrace();
isError.set(true);
} finally {
threadLatchs.countDown();
}
} catch (Exception e) {
e.printStackTrace();
isError.set(true);
}
});
}
// 倒计时锁设置超时时间s
boolean await = threadLatchs.await(, TimeUnit.SECONDS);
// 判断是否超时
if (!await) {
isError.set(true);
}
} catch (Exception e) {
e.printStackTrace();
isError.set(true);
}
if (!transactionStatuses.isEmpty()) {
if (isError.get()) {
transactionStatuses.forEach(s -> transactionManager.rollback(s));
} else {
transactionStatuses.forEach(s -> transactionManager.commit(s));
}
}
long endTime = System.currentTimeMillis();
log.info("共耗时:{}", (endTime - startTime) / + "秒");
System.out.println("主线程完成");
}