高并发多线程数据采集程序设计

IT知识
594
0
0
2022-04-19
标签   网络爬虫

工作中我们一定会遇到高并发的数据采集,今天和大家一起编写一个高并发数据采集系统,打算这样引入:

  1. 程序开发需求
  2. 高并发采集系统的功能
  3. 抽象一下设计
  4. 代码尝试编写

希望大家跟着的思路走,不要掉队啊。

需求引入

例如:老板要求你去采集公交车或者手持POS机的GPS信息、又或者室外探测装置感知数据的采集。(总之数据量很大),这个需求描述很简单,老板告诉你的也就这些。

大概的工作思路:问设备厂家要数据接入方式,实现他们的数据解析,保存到数据库就可以。实际上这样做也没有什么问题,但是你会想到以下几点问题:

  • 设备多少(城市级别的设备量)
  • 数据量并发多少(有心跳,有业务数据,并发一定很高)
  • 数据主动推送还是被动获取(我们被动接收,不能堵了)
  • 数据丢失敏感吗(要求可靠,不能丢)
  • 对数据实时性要求高不高(实时要求较高,不能堵了)
  • 数据落到什么位置(数据库)

系统功能

我们看到需求之后,有了大概的思路,同时也对需求做了一些疑问,大家想想还有没有什么问题,我想的也不是全面的。我们根据这些疑问把程序设计功能要求列出来:

  • 首先传统的单线程模式是行不通了,需要多线程同时获取数据。
  • 因为并发较高且数据是被动接收,需要设计一个任务队列,保存并发推送的任务。
  • 因为引入了队列,所以另外一个必要的队列监控功能不可缺少。
  • 数据可靠性要求较高,还需要设计失败重试机制,保证数据不丢。
  • 服务不能有单点故障,设计集群模式。
  • 因为有心跳和业务数据丢要采集,需要设计数据采集(消费)优先级。
  • 数据存储持久化层,存储到mysql、mongoDB等。

抽象设计

为什么需要抽象设计呢?有个思考方向:计算机分层设计、网络7层模型,我们不妨把问题也分层思考下看看。

数据接入层:例如从MQ、TCP、UDP、Websocket等方式的接入,收集是原始数据。这层类似的角色有老板,销售,总之是需求来源。(connector)

高并发多线程数据采集程序设计

从接入层获得了各种数据,根据厂家提供的协议进行解析和包装,类似需求转化的意思。我们把它定义成Rule或者Protocol(规则或协议接口)

高并发多线程数据采集程序设计

任务管理层:当数据比较多,需求比较多,就应该有个需求总管。我们把它叫做任务管理者,负责掌管着所有任务的打包、分发,合并、任务队列等功能。类似公司老板下达需求给产品经理,产品经理负责把任务分类,送往不同产品线。所以任务管理层有两个重要的概念,产品经理需求管控。(暂时命名Task和TaskManager)

高并发多线程数据采集程序设计

大概有这些功能

任务调度层:我们可以这么理解,产品经理已经把需求梳理好了,然后产品经理只需要把需求交给项目经理就可以了。由项目经理分解任务,并交给伟大的程序员,让程序员完成编码,有的程序员是新手,项目经理检查有问题,还需要重新做,相当于重试。这里有两个角色:WorkerManager、Worker。

高并发多线程数据采集程序设计

任务的执行者

上面这些抽象,感觉和公司的组织是相似的,应该比较好理解。

代码编写

先看下数据接入层:

首先我们看下connector包,打通数据通路。有mq、tcp、udp,前面我分享了一些列关于socket编程的文章,大家看下。

高并发多线程数据采集程序设计

connector管理类

高并发多线程数据采集程序设计

具体实现接入的类

从上面包结构上可以看出来,我们需要把接入层作为多线程启动起来,因此有ConnectorStarter这个类,负责启动多个Connector,Connector们(例如上面的TcpConnector)监听着数据,一旦有数据就会触发ConnectionListener接口里面的方法,作用就是传递接收到的数据。至于传递到什么位置,就看谁实现了Listener了,一般情况下,应该是产品经理,也就是TaskManager。

高并发多线程数据采集程序设计

感兴趣的项目经理,监听数据

我们再来看下产品经理和项目经理:Task和Worker以及对应的Manager

高并发多线程数据采集程序设计

看下主要的任务管理和任务执行类结构


task类,就是一项或多项任务,内部有优先级、有重试、有指标等,task自己应该知道自己要到哪里去,所以应该有doTask之类的接口,worker只是从workerManager那里获得task,然后在流水线上根据task的要求,完成任务的执行。因为我现在大概只是一个思路,接口我大概写下,下面贴一下接口:

高并发多线程数据采集程序设计

ITask接口

通过Task接口,我们可以看到task自己有doTask的方法,另外具有任务优先级别,在任务排队的的时候可以尝试降级。

高并发多线程数据采集程序设计

IWorker接口

高并发多线程数据采集程序设计

ITaskManager接口

高并发多线程数据采集程序设计

IWorkerManager接口

最后就是Task具体怎么做业务的事情了,保存到数据库,或者广播等逻辑,这个在每个Task实现类里面做就可以了,worker就乖乖的做task,当然多线程执行了,我们可以初始化多个worker,例如使用Executors.new出线程池,提交worker就可以。我还是举个例子吧,今天废话太多,code少:

高并发多线程数据采集程序设计

Task实现类

高并发多线程数据采集程序设计

worker

高并发多线程数据采集程序设计

TaskManager

维持一个任务队列,创建任务并放入队列中。poll、remove、offer等方法。

高并发多线程数据采集程序设计

线程池执行worker

总结:

我们大概分析了一下高并发采集系统的设计思路,具体代码虽然没有编写到位,但是大概的框架已经搭建出来了。我也不知道有多少读者对此感兴趣,暂时就写到这里吧。有兴趣读者留言,对哪块感兴趣,我单独拿出来分享。

感谢您的阅读,有不足之处,在评论中指出。