业务场景
我们每天都要对最近三个月内的活跃用户进行批量营销、账单逾期计算等操作,用户数据大概是 800w 。我们的方案是发送一个 CUSTOMER_DAILY 消息,然后订阅这个消息再去分别发送批量营销、账单逾期等业务消息。目前发送完 CUSTOMER_DAILY 消息大约需要五个小时。
勿纠结当下
大家不必纠结当下为什么效率这么低……因为系统都是慢慢优化出来的嘛,以前的代码肯定多少有些问题。或许再过一段时间我们自己的代码也有很多问题,这都很正常。下面简单地贴一些我适当改造过的当前实现逻辑的代码来分析当前方案存在的问题。
目前的方案
目前是采用定时任务触发, 线程池 提交任务的方式,代码如下(行尾注释是我写的 example 值):
// 最大活跃id
long maxId = customerService.findMaxActiveId(beginTime); //
// 最小活跃id
long minId = customerService.findMinActiveId(beginTime); //
// 查询的id最大可能总条数
long listSize = maxId - minId; //
// 开启的 线程 数
int runSize =;
// 平均每次查询数目
long count = listSize / runSize; //
// 创建一个线程池,核心线程数量和开启线程的数量一样
Executor Service executor = CreateThreadUtil.createThread(runSize);
for (int i =; i <= runSize; i++) {
// 计算 SQL 语句中每个分页查询的起始和结束数据下标
long min = minId + i * count; // , 5001 , 10001 , 15001
long max = min + count; //, 10001, 15001 , 20001
executor.execute(() -> {
List<Customer> customers = customerService.findByXxx(beginTime, min, max);
customers.forEach(c -> {
Message message = Message.build();//省略构造消息体
messageService.save(message);
applicat IO nEventPublisher.publishEvent(new MessageSendEvent(message));
});
});
}
大家不用在意什么事务细节,因为这是我为了减少代码简化的,看大致实现逻辑即可。后面就是监听这个事件然后修改数据库消息发送的状态,发送消息
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
@Async
public void listen(MessageSendEvent event) {
//修改数据库消息发送状态为:发送中
//执行发送消息 SDK 的 API
//修改数据库消息发送状态为:成功
}
存在的问题
线程池资源可能未充分利用
仔细观察第一段代码的逻辑,通过查询最大最小的活跃用户id来计算活跃用户总数,这并不是准确数值。它可能存在这样一种情况
maxId =,minId = 1 ~5000 都是非活跃用户,5001~20000 是活跃用户
这样一来 for 循环 中负责 1-5001 的那个线程其实只有一个用户任务需要处理,也就是说总共 4 个线程, 1 个线程执行任务是 0.5 秒 ,其余三个线程可能要十几分钟。这样第一个线程的资源就被浪费着了。看过我前面文章 学习 CompletableFuture 进阶之前先掌握两种线程池 的都知道 ForkJoinPool 有任务窃取机制,可以解决这个问题。
循环访问数据库
由于我们发送消息需要入库记录,发送过程中又要修改两次状态(第一次是发送中、第二次是发送成功),也就是说一条消息会有三次数据库的 IO 操作,这样在大量循环下是一个最大的性能瓶颈。我们可以估算一个 阈值 ,批量地对该阈值的一组消息用一个 batchInsert 只访问一次数据库。
循环调用发送消息 SDK 的 API
其实这个和循环访问数据库是一个道理,正常的消息队列都有批量发送消息的功能,而不是只能一条消息调用一个 SDK 的 API 。
只有一台机器执行该任务
目前使用的定时任务是 xxl-job , 路由策略 是第一个,也就是说在 N 个服务实例上,只有一台实例会执行。这就相当于有 N-1 台实例在这个工作上是处于闲置状态的。我们可以让 N 台机器一起来做这个事情, xxl-job 的分片广播可以满足。
总结下来可以优化的点
- xxl-job 分片广播
- ForkJoinPool
- 批量访问数据库
- 批量发送消息
动手优化
xxl-job 分片广播
分片广播的含义是触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;简单来说就是每一台机器都会触发任务,同时每台机器会接到不同的参数,我们可以根据这不同的参数去分配不同的应用实例处理不同的数据。
具体操作很简单,在 xxl-job 管理页面编辑任务路由策略为分片广播即可。
int shardIndex = XxlJobHelper.getShardIndex();// 当前分片/1/2/3
int shardTotal = XxlJobHelper.getShardTotal();// 总分片
List<Long> customerIds = customerService.findIdsByShard( request );//根据分片查询该机器要处理的数据
findIdsByShard 的实现其实是一个非常简单的 SQL
SELECT ID FROM t_customer WHERE MOD ( ID, ${request.shardTotal} ) = ${request.shardIndex}
我们将 ID 对总分片数 4 进行求余,一个数对 4 求余只有四个结果, 0,1,2,3 。这样一条 SQL 在所有机器上执行的数据结果就能瓜分要执行的总数据。
ForkJoinPool
假设上面每个应用实例拿到的 customerIds 数量是 200w ,那么我们现在使用 ForkJoinPool 对这 200w 数据进行分治。首先定义任务类
public class CustomerDailyTask extends RecursiveTask<Integer> {
private final List<Long> customerIds; //客户 id 集合
private final CustomerService customerService;
public static final int THRESHOLD = 1000; //拆分阈值(这里需要自己多次实验效率最高的阈值,目前我试了十几个值,1000 是最合适的)
//省略 构造方法
@Override
protected Integer compute() {
if (customerIds.size() <= THRESHOLD) {
return customerService.sendDailyMessage(customerIds);
}
int groupSize = (int) Math.ceil(customerIds.size() *.0 / 2); //对半拆分
List<List<Long>> partition = Lists.partition(customerIds, groupSize);
CustomerDailyTask task = new CustomerDailyTask(partition.get(0), customerService);
CustomerDailyTask task = new CustomerDailyTask(partition.get(1), customerService);
invokeAll(task, task2);
return task.join() + task2.join();
}
}
初始化 ForkJoinPool 执行
CustomerDailyTask task = new CustomerDailyTask(customerIds, customerService);
int core = Runtime.getRuntime().availableProcessors();
Fork JoinPool pool = new ForkJoinPool(core - 1); //留一个线程
pool.invoke(task);
批量访问数据库
这里其实就简单了,下面这行代码中
return customerService.sendDailyMessage(customerIds);
根据传入的 customerIds 构造一个 List<Message> 使用 batch Insert 方法插入数据库,然后发送一个 Spring 本地事件 Spring 事件发布 ,之后在事件监听器中 batchUpdate 去更新状态,这里省略。
批量发送消息
这个很简单直接调用批量发送的 SDK 即可,由于我们用的 AWS SNS-SQS ,SDK 版本比较低,我还升级了个SDK版本……这都是小问题,蛋疼的是 SDK 最多支持一次发送 10 条消息,我直接无语……
我表示很疑惑,没办法那就拆吧
// SNS 限制一次最多发 条消息
List<List<Message>> split = Lists.partition(list,);
List<CompletableFuture<PublishBatchResponse>> future = split.stream().map(item -> senderFactory.batchSend(item)).collect(Collectors.toList());
List<PublishBatchResponse> response = CompletableFutureUtil.allOfCompleted(future);
//...更新数据库消息状态为 SUCCESS
这里我们使用 CompletableFuture 批量异步发送消息,其实它内部用的线程池默认也是 ForkJoinPool , allOfCompleted() 实现如下
public static <T> List<T> allOfCompleted(List<CompletableFuture<T>> list) {
CompletableFuture<Void> future = CompletableFuture.allOf(list.toArray(new CompletableFuture[list.size()]));
List<T> result = list.stream().map(CompletableFuture::join).collect(Collectors.toList());
future.thenApply(v -> list); //阻塞主线程,执行完所有异步任务
return result;
}
外面是 ForkJoinPool 拆分数据,每一个拆分的子单元里面又是一个 ForkJoinPool 来发消息……我觉得机器配置如果很高的话这个设计方案就只有两个字 NB!
作者请教
大家看到了,目前我公司使用消息队列的时候是要持久化消息发送到数据库的,先在当前事务中插入一条消息发送记录,状态记为 CREATED ,使用 Spring 事件机制实现当前业务方法事务提交后执行消息发送的 API ,状态记为 PENDING ,发送成功后状态记为 SUCCESS ,反之 FAILED 。
这样做的好处是能 100% 保证消息丢失有迹可寻,每一条消息的发送都有记录。并且方便后续消息重试,或者重发,因为我这里记录了消息体。
但是我觉得这样 emmm 访问数据库的 IO 操作感觉有点浪费,在面对高并发业务时比如秒杀系统,感觉这样的实现是不可用的,因为数据库面临巨大性能瓶颈。问了很多朋友他们公司是怎么做的,一半是入库记录,一半是不入库记录……在此请教广大网友,贵司是咋做的,请留言指教,谢谢!
结语
本篇文章分析了一个大批量任务优化的方案,从 集群实例分担工作、合理使用线程资源、任务的分治、减少数据库IO、减少API的调用 一步步优化出一个目前较为合适的处理方案。