多线程事物回滚、多线程造成死锁,造成连接资源不够的解决方案

Java
267
0
0
2023-06-11
标签   Java多线程

一 事物五大类

多线程事物回滚、多线程造成死锁,造成连接资源不够的解决方案

二 事物使用区分

1 自动回滚和手动回滚不能一起使用回报错冲突除非PROPAGATION_REQUIRES_NEW新事物才不会和自动事物冲突

2 手动回滚包含两种

1》 sql Session

  • // 获取数据库连接,获取会话(内部自有事务)
  • SqlSession sqlSession = sqlContext.getSqlSession();
  • Connection connection = sqlSession.getConnection();

2 》 transaction Manager

注意 DataSource TransactionManager.rollback和 TransactionStatus.setRollbackOnly区别

setRollbackOnly

rollback

不同点

控制范围小,资源释放需要配合自动事物

控制范围大,释放不需要自动事物管理

相同点

能回滚

能回滚

三 下面是测试分析

情况一手动事物死锁

情况二手动事物不锁

情况三自动+手动事物不锁表(注意手动需要开启新事物不然冲突)

情况四自动+手动事物锁表

情况五自动+手动事物锁表

情况六自动+手动事物不锁表

 查看 mysql 事物锁
//当前运行的所有事务 首先我们查看被锁表的进程
SELECT * FROM information_schema.INNODB_TRX;
//当前出现的锁
SELECT * FROM information_schema.INNODB_LOCKs;
//锁等待的对应关系
SELECT * FROM information_schema.INNODB_LOCK_waits;
解决:kill事务,清理表数据,转移到历史表,检查定时任务
然后找到进程号,即 trx_mysql_thread_id
然后执行;
kill 进程号;  

四 多线程 事物推荐一下两种

注意 线程 不能用太多,调整mysql最大连接数5.7默认151改成500,根据cpu情况来

推荐最大核心数=cpu*2+1,但是还有其它线程使用sleep和await,所见建议处理大数据量的时候

线程控制在10以内

 @Override
    public  void  testTranslationOfThreads6() throws SQL Exception  {
        //准备数据条
        List<Cdr> list = new ArrayList<>();
        for (int i =; i < 50000; i++) {
            Cdr cdr = new Cdr();
            cdr.setSrc("" + i);
            cdr.setDatetime(DateUtils.formatTime(new Date()));
            list.add(cdr);
        }
        // 获取数据库连接,获取会话(内部自有事务)
        SqlSession sqlSession = sqlContext.getSqlSession();
        Connection connection = sqlSession.getConnection();
        try {
            // 设置手动提交
            connection.setAutoCommit(false);
            //获取mapper
            CdrMapper employeeMapper = sqlSession.getMapper(CdrMapper.class);
            //先做删除操作
//            employeeMapper.remove()
            //获取执行器
             Executor Service service = ExecutorConfig.getThreadPool();
            List<Callable<Integer>> callableList = new ArrayList<>();
            //拆分list
            List<List<Cdr>> lists = averageAssign(list,);
            Atomic boolean  atomic Boolean  = new AtomicBoolean(true);
            for (int i =; i < lists.size(); i++) {
                if (i == lists.size() -) {
                    atomicBoolean.set(false);
                }
                List<Cdr> list = lists.get(i);
                //使用返回结果的callable去执行,
                Callable<Integer> callable = () -> {
                    //让最后一个线程抛出异常
                     if  (!atomicBoolean.get()) {
                        throw new JeecgBootException(, "出现异常");
                    }
                    int insert = employeeMapper.insert(list.get(0));

                    return insert;
                };
                callableList.add(callable);
            }
            //执行子线程
            List<Future<Integer>> futures = service.invokeAll(callableList);
            for (Future<Integer> future : futures) {
                //如果有一个执行不成功,则全部回滚
                if (future.get() <=) {
                    connection.rollback();
                    return;
                }
            }
            connection.commit();
            System.out.println("添加完毕");
        } catch (Exception e) {
            connection.rollback();
            log.info("error", e);
            throw new JeecgBootException(, "出现异常");
        } finally {
            connection.close();
        }

    }  
 @Override
    public void testTranslationOfThreads() {
        long startTime = System.currentTimeMillis();
        //准备数据条
        List<Cdr> list = new ArrayList<>();
        for (int i =; i < 50000; i++) {
            Cdr cdr = new Cdr();
            cdr.setSrc("" + i);
            cdr.setDatetime(DateUtils.formatTime(new Date()));
            list.add(cdr);
        }
        // 线程数量
        final Integer threadCount =;
        //每个线程处理的数据量
        final Integer dataPartionLength = (list.size() + threadCount -) / threadCount;
        // 创建多线程处理任务
        ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
        CountDownLatch threadLatchs = new CountDownLatch(threadCount);
        AtomicBoolean isError = new AtomicBoolean(false);
        try {
            for (int i =; i < threadCount; i++) {
                // 每个线程处理的数据
                List<Cdr> threadDatas = list.stream()
                        .skip(i * dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());
                studentThreadPool.execute(() -> {
                    try {
                        try {
                            this.updateStudentsTransaction(transactionManager, transactionStatuses, threadDatas);
                        } catch (Throwable e) {
                            e.printStackTrace();
                            isError.set(true);
                        } finally {
                            threadLatchs.countDown();
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        isError.set(true);
                    }
                });
            }

            // 倒计时锁设置超时时间s
            boolean await = threadLatchs.await(, TimeUnit.SECONDS);
            // 判断是否超时
            if (!await) {
                isError.set(true);
            }

        } catch (Exception e) {
            e.printStackTrace();
            isError.set(true);
        }
        if (!transactionStatuses.isEmpty()) {
            if (isError.get()) {
                transactionStatuses.forEach(s -> transactionManager.rollback(s));
            } else {
                transactionStatuses.forEach(s -> transactionManager.commit(s));
            }
        }
        long endTime = System.currentTimeMillis();
        log.info("共耗时:{}", (endTime - startTime) / + "秒");
        System.out.println("主线程完成");
    }