文章来自微信公众号:Go语言圈
参考链接:www.jb51.net/article/231909.htm
前置知识:
- go基本语法
- 消息队列概念,也就三个:生产者、消费者、队列
目的
- 没想着实现多复杂,因为时间有限,就mini就好,mini到什么程度呢
- 使用双向链表数据结构作为队列
- 有多个topic可供生产者生成消息和消费者消费消息
- 支持生产者并发写
- 支持消费者读,且ok后,从队列删除
- 消息不丢失(持久化)
- 高性能(先这样想)
设计
整体架构
协议
通讯协议底层使用tcp,mq是基于tcp自定义了一个协议,协议如下
type Msg struct {
Id int64
TopicLen int64
Topic string
// 1-consumer 2-producer 3-comsumer-ack 4-error
MsgType int64 // 消息类型
Len int64 // 消息长度
Payload []byte // 消息
}
Payload
使用字节数组,是因为不管数据是什么,只当做字节数组来处理即可。Msg
承载着生产者生产的消息,消费者消费的消息,ACK
、和错误消息,前两者会有负载,而后两者负载和长度都为空。
协议的编解码处理,就是对字节的处理,接下来有从字节转为Msg
,和从Msg
转为字节两个函数
func BytesToMsg(reader io.Reader) Msg {
m := Msg{}
var buf [128]byte
n, err := reader.Read(buf[:])
if err != nil {
fmt.Println("read failed, err:", err)
}
fmt.Println("read bytes:", n)
// id
buff := bytes.NewBuffer(buf[0:8])
binary.Read(buff, binary.LittleEndian, &m.Id)
// topiclen
buff = bytes.NewBuffer(buf[8:16])
binary.Read(buff, binary.LittleEndian, &m.TopicLen)
// topic
msgLastIndex := 16 + m.TopicLen
m.Topic = string(buf[16: msgLastIndex])
// msgtype
buff = bytes.NewBuffer(buf[msgLastIndex : msgLastIndex + 8])
binary.Read(buff, binary.LittleEndian, &m.MsgType)
buff = bytes.NewBuffer(buf[msgLastIndex : msgLastIndex + 16])
binary.Read(buff, binary.LittleEndian, &m.Len)
if m.Len <= 0 {
return m
}
m.Payload = buf[msgLastIndex + 16:]
return m
}
func MsgToBytes(msg Msg) []byte {
msg.TopicLen = int64(len([]byte(msg.Topic)))
msg.Len = int64(len([]byte(msg.Payload)))
var data []byte
buf := bytes.NewBuffer([]byte{})
binary.Write(buf, binary.LittleEndian, msg.Id)
data = append(data, buf.Bytes()...)
buf = bytes.NewBuffer([]byte{})
binary.Write(buf, binary.LittleEndian, msg.TopicLen)
data = append(data, buf.Bytes()...)
data = append(data, []byte(msg.Topic)...)
buf = bytes.NewBuffer([]byte{})
binary.Write(buf, binary.LittleEndian, msg.MsgType)
data = append(data, buf.Bytes()...)
buf = bytes.NewBuffer([]byte{})
binary.Write(buf, binary.LittleEndian, msg.Len)
data = append(data, buf.Bytes()...)
data = append(data, []byte(msg.Payload)...)
return data
}
队列
使用container/list
,实现先入先出,生产者在队尾写,消费者在队头读取
package broker
import (
"container/list"
"sync"
)
type Queue struct {
len int
data list.List
}
var lock sync.Mutex
func (queue *Queue) offer(msg Msg) {
queue.data.PushBack(msg)
queue.len = queue.data.Len()
}
func (queue *Queue) poll() Msg{
if queue.len == 0 {
return Msg{}
}
msg := queue.data.Front()
return msg.Value.(Msg)
}
func (queue *Queue) delete(id int64) {
lock.Lock()
for msg := queue.data.Front(); msg != nil; msg = msg.Next() {
if msg.Value.(Msg).Id == id {
queue.data.Remove(msg)
queue.len = queue.data.Len()
break
}
}
lock.Unlock()
}
方法offer
往队列里插入数据,poll
从队列头读取数据素,delete
根据消息ID从队列删除数据。这里使用Queue
结构体对List
进行封装,其实是有必要的,List
作为底层的数据结构,我们希望隐藏更多的底层操作,只给客户提供基本的操作。
delete
操作是在消费者消费成功且发送ACK
后,对消息从队列里移除的,因为消费者可以多个同时消费,所以这里进入临界区时加锁(em,加锁是否就一定会影响对性能有较大的影响呢)。
broker
broker作为服务器角色,负责接收连接,接收和响应请求。
package broker
import (
"bufio"
"net"
"os"
"sync"
"time"
)
var topics = sync.Map{}
func handleErr(conn net.Conn) {
defer func() {
if err := recover(); err != nil {
println(err.(string))
conn.Write(MsgToBytes(Msg{MsgType: 4}))
}
}()
}
func Process(conn net.Conn) {
handleErr(conn)
reader := bufio.NewReader(conn)
msg := BytesToMsg(reader)
queue, ok := topics.Load(msg.Topic)
var res Msg
if msg.MsgType == 1 {
// comsumer
if queue == nil || queue.(*Queue).len == 0{
return
}
msg = queue.(*Queue).poll()
msg.MsgType = 1
res = msg
} else if msg.MsgType == 2 {
// producer
if ! ok {
queue = &Queue{}
queue.(*Queue).data.Init()
topics.Store(msg.Topic, queue)
}
queue.(*Queue).offer(msg)
res = Msg{Id: msg.Id, MsgType: 2}
} else if msg.MsgType == 3 {
// consumer ack
if queue == nil {
return
}
queue.(*Queue).delete(msg.Id)
}
conn.Write(MsgToBytes(res))
}
MsgType
等于1时,直接消费消息;MsgType
等于2时是生产者生产消息,如果队列为空,那么还需创建一个新的队列,放在对应的topic
下;MsgType
等于3时,代表消费者成功消费,可以
删除消息
我们说消息不丢失,这里实现不完全,我就实现了持久化(持久化也没全部实现)。思路就是该topic
对应的队列里的消息,按协议格式进行序列化,当broker
启动时,从文件恢复。
持久化需要考虑的是增量还是全量,需要保存多久,这些都会影响实现的难度和性能(想想Kafka
和Redis
的持久化),这里表示简单实现就好:定时器定时保存
func Save() {
ticker := time.NewTicker(60)
for {
select {
case <-ticker.C:
topics.Range(func(key, value interface{}) bool {
if value == nil {
return false
}
file, _ := os.Open(key.(string))
if file == nil {
file, _ = os.Create(key.(string))
}
for msg := value.(*Queue).data.Front(); msg != nil; msg = msg.Next() {
file.Write(MsgToBytes(msg.Value.(Msg)))
}
_ := file.Close()
return false
})
default:
time.Sleep(1)
}
}
}
有一个问题是,当上面的delete
操作时,这里的file
文件需不需要跟着delete
掉对应的消息?答案是需要删除的,如果不删除,只能等下一次的全量持久化来覆盖了,中间就有脏数据问题.
下面是启动逻辑
package main
import (
"awesomeProject/broker"
"fmt"
"net"
)
func main() {
listen, err := net.Listen("tcp", "127.0.0.1:12345")
if err != nil {
fmt.Print("listen failed, err:", err)
return
}
go broker.Save()
for {
conn, err := listen.Accept()
if err != nil {
fmt.Print("accept failed, err:", err)
continue
}
go broker.Process(conn)
}
}
生产者
package main
import (
"awesomeProject/broker"
"fmt"
"net"
)
func produce() {
conn, err := net.Dial("tcp", "127.0.0.1:12345")
if err != nil {
fmt.Print("connect failed, err:", err)
}
defer conn.Close()
msg := broker.Msg{Id: 1102, Topic: "topic-test", MsgType: 2, Payload: []byte("我")}
n, err := conn.Write(broker.MsgToBytes(msg))
if err != nil {
fmt.Print("write failed, err:", err)
}
fmt.Print(n)
}
消费者
package main
import (
"awesomeProject/broker"
"bytes"
"fmt"
"net"
)
func comsume() {
conn, err := net.Dial("tcp", "127.0.0.1:12345")
if err != nil {
fmt.Print("connect failed, err:", err)
}
defer conn.Close()
msg := broker.Msg{Topic: "topic-test", MsgType: 1}
n, err := conn.Write(broker.MsgToBytes(msg))
if err != nil {
fmt.Println("write failed, err:", err)
}
fmt.Println("n", n)
var res [128]byte
conn.Read(res[:])
buf := bytes.NewBuffer(res[:])
receMsg := broker.BytesToMsg(buf)
fmt.Print(receMsg)
// ack
conn, _ = net.Dial("tcp", "127.0.0.1:12345")
l, e := conn.Write(broker.MsgToBytes(broker.Msg{Id: receMsg.Id, Topic: receMsg.Topic, MsgType: 3}))
if e != nil {
fmt.Println("write failed, err:", err)
}
fmt.Println("l:", l)
}
消费者这里ack
时重新创建了连接,如果不创建连接的话,那服务端那里就需要一直从conn
读取数据,直到结束。思考一下,像RabbitMQ
的ack
就有自动和手工的ack
,如果是手工的ack
,必然需要一个新的连接,因为不知道客户端什么时候发送ack
,自动的话,当然可以使用同一个连接,but
这里就简单创建一条新连接吧
启动
先启动broker
,再启动producer
,然后启动comsumer
能实现发送消息到队列.