一 事物五大类
二 事物使用区分
1 自动回滚和手动回滚不能一起使用回报错冲突除非PROPAGATION_REQUIRES_NEW新事物才不会和自动事物冲突
2 手动回滚包含两种
1》 sql Session
- // 获取数据库连接,获取会话(内部自有事务)
- SqlSession sqlSession = sqlContext.getSqlSession();
- Connection connection = sqlSession.getConnection();
2 》 transaction Manager
注意 DataSource TransactionManager.rollback和 TransactionStatus.setRollbackOnly区别
| setRollbackOnly | rollback |
不同点 | 控制范围小,资源释放需要配合自动事物 | 控制范围大,释放不需要自动事物管理 |
相同点 | 能回滚 | 能回滚 |
三 下面是测试分析
情况一手动事物死锁
情况二手动事物不锁
情况三自动+手动事物不锁表(注意手动需要开启新事物不然冲突)
情况四自动+手动事物锁表
情况五自动+手动事物锁表
情况六自动+手动事物不锁表
查看 mysql 事物锁 | |
//当前运行的所有事务 首先我们查看被锁表的进程 | |
SELECT * FROM information_schema.INNODB_TRX; | |
//当前出现的锁 | |
SELECT * FROM information_schema.INNODB_LOCKs; | |
//锁等待的对应关系 | |
SELECT * FROM information_schema.INNODB_LOCK_waits; | |
解决:kill事务,清理表数据,转移到历史表,检查定时任务 | |
然后找到进程号,即 trx_mysql_thread_id | |
然后执行; | |
kill 进程号; |
四 多线程 事物推荐一下两种
注意 线程 不能用太多,调整mysql最大连接数5.7默认151改成500,根据cpu情况来
推荐最大核心数=cpu*2+1,但是还有其它线程使用sleep和await,所见建议处理大数据量的时候
线程控制在10以内
public void testTranslationOfThreads6() throws SQL Exception { | |
//准备数据条 | |
List<Cdr> list = new ArrayList<>(); | |
for (int i =; i < 50000; i++) { | |
Cdr cdr = new Cdr(); | |
cdr.setSrc("" + i); | |
cdr.setDatetime(DateUtils.formatTime(new Date())); | |
list.add(cdr); | |
} | |
// 获取数据库连接,获取会话(内部自有事务) | |
SqlSession sqlSession = sqlContext.getSqlSession(); | |
Connection connection = sqlSession.getConnection(); | |
try { | |
// 设置手动提交 | |
connection.setAutoCommit(false); | |
//获取mapper | |
CdrMapper employeeMapper = sqlSession.getMapper(CdrMapper.class); | |
//先做删除操作 | |
// employeeMapper.remove() | |
//获取执行器 | |
Executor Service service = ExecutorConfig.getThreadPool(); | |
List<Callable<Integer>> callableList = new ArrayList<>(); | |
//拆分list | |
List<List<Cdr>> lists = averageAssign(list,); | |
Atomic boolean atomic Boolean = new AtomicBoolean(true); | |
for (int i =; i < lists.size(); i++) { | |
if (i == lists.size() -) { | |
atomicBoolean.set(false); | |
} | |
List<Cdr> list = lists.get(i); | |
//使用返回结果的callable去执行, | |
Callable<Integer> callable = () -> { | |
//让最后一个线程抛出异常 | |
if (!atomicBoolean.get()) { | |
throw new JeecgBootException(, "出现异常"); | |
} | |
int insert = employeeMapper.insert(list.get(0)); | |
return insert; | |
}; | |
callableList.add(callable); | |
} | |
//执行子线程 | |
List<Future<Integer>> futures = service.invokeAll(callableList); | |
for (Future<Integer> future : futures) { | |
//如果有一个执行不成功,则全部回滚 | |
if (future.get() <=) { | |
connection.rollback(); | |
return; | |
} | |
} | |
connection.commit(); | |
System.out.println("添加完毕"); | |
} catch (Exception e) { | |
connection.rollback(); | |
log.info("error", e); | |
throw new JeecgBootException(, "出现异常"); | |
} finally { | |
connection.close(); | |
} | |
} | |
public void testTranslationOfThreads() { | |
long startTime = System.currentTimeMillis(); | |
//准备数据条 | |
List<Cdr> list = new ArrayList<>(); | |
for (int i =; i < 50000; i++) { | |
Cdr cdr = new Cdr(); | |
cdr.setSrc("" + i); | |
cdr.setDatetime(DateUtils.formatTime(new Date())); | |
list.add(cdr); | |
} | |
// 线程数量 | |
final Integer threadCount =; | |
//每个线程处理的数据量 | |
final Integer dataPartionLength = (list.size() + threadCount -) / threadCount; | |
// 创建多线程处理任务 | |
ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount); | |
CountDownLatch threadLatchs = new CountDownLatch(threadCount); | |
AtomicBoolean isError = new AtomicBoolean(false); | |
try { | |
for (int i =; i < threadCount; i++) { | |
// 每个线程处理的数据 | |
List<Cdr> threadDatas = list.stream() | |
.skip(i * dataPartionLength).limit(dataPartionLength).collect(Collectors.toList()); | |
studentThreadPool.execute(() -> { | |
try { | |
try { | |
this.updateStudentsTransaction(transactionManager, transactionStatuses, threadDatas); | |
} catch (Throwable e) { | |
e.printStackTrace(); | |
isError.set(true); | |
} finally { | |
threadLatchs.countDown(); | |
} | |
} catch (Exception e) { | |
e.printStackTrace(); | |
isError.set(true); | |
} | |
}); | |
} | |
// 倒计时锁设置超时时间s | |
boolean await = threadLatchs.await(, TimeUnit.SECONDS); | |
// 判断是否超时 | |
if (!await) { | |
isError.set(true); | |
} | |
} catch (Exception e) { | |
e.printStackTrace(); | |
isError.set(true); | |
} | |
if (!transactionStatuses.isEmpty()) { | |
if (isError.get()) { | |
transactionStatuses.forEach(s -> transactionManager.rollback(s)); | |
} else { | |
transactionStatuses.forEach(s -> transactionManager.commit(s)); | |
} | |
} | |
long endTime = System.currentTimeMillis(); | |
log.info("共耗时:{}", (endTime - startTime) / + "秒"); | |
System.out.println("主线程完成"); | |
} |