目录
- 前言
- 不可靠的 UDP
- 基于 UDP 的简单可靠传输协议
- 乱序问题
- 丢包问题
- 代码
前言
UDP 协议是我们平时较少接触到的知识,不同于 TCP,它是“不可靠”的,今天我们就来实战一下看下它到底怎么个不可靠法?
不可靠的 UDP
实验前,我们先介绍一下需要用到的工具(Mac 环境,其他环境请自行搜索相关工具):
- Network Link Conditioner:模拟丢包场景,可以去苹果开发者网站上下载
- Wireshark:抓包分析工具
- 云主机:因为实现发现 Network Link Conditioner 对本地回环地址不起作用,如果有更好的方法求大佬指出
然后我们准备两段代码,一段作为 UDP Server,一段作为 UDP Client,Client 会向 Server 发送 26 个英文大写字母,Server 会将他们存到文件:
// udp-server.js
const udp = require('dgram')
const server = udp.createSocket('udp')
const fs = require('fs')
server.on('listening', function () {
var address = server.address()
var port = address.port
console.log('Server is listening at port ' + port)
})
server.on('message', function (msg, info) {
console.log(
`Data received from ${info.address}:${info.port}: ${msg.toString()}`
)
fs.appendFileSync('./out', msg.toString())
})
server.on('error', function (error) {
console.log('Error: ' + error)
server.close()
})
server.bind()
// udp-client.js
const udp = require('dgram')
const client = udp.createSocket('udp')
for (let i =; i < 26; i++) {
const char = String.fromCharCode(x41 + i)
client.send(Buffer.from(char),, '********', function (error) {
if (error) {
console.log(error)
}
})
}
接着我们按照下面步骤开始实验:
- 通过 Network Link Conditioner 把丢包率设置为 50%:
- 设置好 Wireshark 的抓包参数:
- 在云主机上启动 Server,在本地启动 Client。
接着,我们来看一下实验结果:
- 首先,我们可以看到服务端接收到的字母少了很多,只有 14 个:
- 服务端接收到的字母顺序是乱序的,比如 U 跑到了 T 的前面:
为了进行对比,我们可以换成 TCP 试试,代码如下,结果就不贴了:
// tcp-server.js
const net = require('net')
const server = net.createServer()
const fs = require('fs')
server.on('connection', function (conn) {
conn.on('data', (msg) => {
console.log(
`Data received from ${conn.address().address}:${
conn.address().port
}: ${msg.toString()}`
)
fs.appendFileSync('./out', msg.toString())
})
})
server.listen(, () => {
console.log('server listening to %j', server.address().port)
})
// tcp-client.js
var net = require('net')
var client = new net.Socket()
client.connect(, '********', function () {
for (let i =; i < 26; i++) {
const char = String.fromCharCode(x41 + i)
client.write(char)
}
})
接下我们试试基于 UDP 来实现一个可靠的传输协议,主要解决上面的丢包和乱序问题。
基于 UDP 的简单可靠传输协议
首先,需要设计一下我们的协议格式。为了简单起见,我们只在原来 UDP 的数据部分分别新增 4 个字节的 SEQ 和 ACK:
+-------------------------------+
| 个字节的 UDP 首部 |
+-------------------------------+
| SEQ( 个字节) | ACK(4 个字节) |
+-------------------------------+
| Data |
+-------------------------------+
其中 SEQ 表示当前包的序号,ACK 表示回复序号。
接下来看看,我们如何解决前面的两个问题。
乱序问题
接收方需要维护一个变量 expectedSeq 的变量表示期待接收到的包序号。为了简单起见,我们制定如下规则:如果当前接收到的包序号等于 expectedSeq,则把包交给应用层处理,并发送 ACK 给发送方;否则我们都直接丢弃。当然更好的做法是维护一个接收窗口,这样可以批量的提交数据给应用层,也可以用来缓存大于 expectedSeq 的包。
假设现在发送方发送了 1 2 3 两个包,但是到达接收方的顺序是 3 2 1,按照我们的规则接收方会丢弃 3 和 2,接收 1。好家伙,顺序倒是不乱了,但是包没了。
所以还得把丢包问题也解决了才行。
丢包问题
发送方维护一个发送窗口用来存储已发送但是还未被确认的包:
+---+---+---+---+
| | 2 | 3 | 4 |
+---+---+---+---+
发送方每发送一个包的同时还需要将包放入发送窗口,并设置一个定时器用来重发这个包。当发送方接收到来自接收方的 ACK 时,需要取消掉对应包的定时器,并将发送窗口中小于 ACK 的包都删除。
+---+---+---+---+
| | 2 | 3 | 4 |
+---+---+---+---+
// ACK =,删除 1 2 3,并取消掉他们的定时器
+---+
| |
+---+
完整代码及使用 Demo 见文末,现在可以正常按顺序输出 26 个字母了,但是离“可靠”协议还差得远。比如第一次输出完 26 个字母后,我们再次启动客户端时发现就没有任何输出了。原因在于此时接收端的 expectedSeq 已经是 20 多了,但是新启动的 client 发送的 SEQ 还是从 1 开始的,结果就是接收端一直丢弃接收到的包,发送端一直重试。
要解决这个问题,可以参考 TCP 在传输两端建立“连接”的概念,在开始发送前通过“三次握手”建立连接,也就是确定起始 SEQ,初始化窗口等工作,结束前通过“四次挥手”断开连接,即清理窗口定时器等工作。这个就留到以后再说吧。
代码
// packet.js
class Packet {
constructor({seq, ack, data = ''}) {
this.seq = seq // 序列号
this.ack = ack // 确认号
this.data = data // 数据
}
// 将 Packet 转换成 Buffer,以便通过网络传输
toBuffer() {
const seqBuffer = Buffer.alloc()
seqBuffer.writeUIntBE(this.seq)
const ackBuffer = Buffer.alloc()
ackBuffer.writeUIntBE(this.ack)
const dataBuffer = Buffer.from(this.data)
return Buffer.concat([seqBuffer, ackBuffer, dataBuffer])
}
// 从 Buffer 中解析出 Packet
static fromBuffer(buffer) {
const seq = buffer.readUIntBE()
const ack = buffer.readUIntBE(4)
const data = buffer.slice()
return new Packet({seq, ack, data})
}
}
module.exports = Packet
// reliableUDP.js
const dgram = require('dgram')
const Packet = require('./packet')
class ReliableUDP {
constructor() {
this.socket = dgram.createSocket('udp')
this.socket.on('message', this.handleMessage.bind(this))
this.sendWindow = [] // 发送窗口,用于存放待确认的数据包
this.receiveWindow = [] // 接收窗口,用于存放已接收的数据包
this.expectedSeq = // 期望接收的数据包序列号
this.nextSeq = // 下一个要发送的数据包序列号
this.timeout = // 超时时间,单位为毫秒
this.timeoutIds = {} // 用于存放定时器 ID
}
listen(port, address, fn) {
this.socket.bind(port, address, fn)
}
// 发送数据包
sendPacket(packet, address, port) {
const buffer = packet.toBuffer()
this.socket.send(buffer, port, address, (err) => {
if (err) {
console.error(err)
}
})
if (packet.ack) return
if (!this.sendWindow.includes((p) => p.seq === packet.seq))
this.sendWindow.push(packet)
// 设置超时定时器
const timeoutId = setTimeout(() => {
this.handleTimeout(packet.seq, address, port)
}, this.timeout)
this.timeoutIds[packet.seq] = timeoutId
}
// 处理接收到的数据包
handleMessage(msg, rinfo) {
const {address, port} = rinfo
const packet = Packet.fromBuffer(msg)
// 收到的是应答的包
if (packet.ack) {
const ackNum = packet.ack -
// 处理发送窗口中已经确认的数据包
while (this.sendWindow.length > && this.sendWindow[0].seq <= ackNum) {
this.sendWindow.shift()
}
// 清除超时定时器
if (this.timeoutIds[ackNum]) {
clearTimeout(this.timeoutIds[ackNum])
delete this.timeoutIds[ackNum]
}
} else {
// 如果是重复的数据包,则忽略
if (packet.seq < this.expectedSeq) {
return
}
// 如果是期望接收的数据包
if (packet.seq === this.expectedSeq) {
this.receiveWindow.push(packet)
this.expectedSeq++
// 处理接收窗口中已经确认的数据包
while (
this.receiveWindow.length > &&
this.receiveWindow[].seq <= this.expectedSeq
) {
const packet = this.receiveWindow.shift()
this.onPacketReceived(packet.data)
}
const ackPacket = new Packet({
seq: this.nextSeq++,
ack: this.expectedSeq,
})
this.sendPacket(ackPacket, address, port)
} else {
// 如果是未来的数据包,暂不做处理,更好的做法是缓存起来
}
}
}
// 应用层调用该方法发送数据
send(data, address, port) {
const packet = new Packet({
seq: this.nextSeq,
ack: null,
data,
})
this.sendPacket(packet, address, port)
this.nextSeq++
}
// 应用层调用该方法注册回调函数,接收数据
onReceive(callback) {
this.onPacketReceived = callback
}
// 处理超时
handleTimeout(seq, address, port) {
// 重传超时的数据包
const packet = this.sendWindow.find((p) => p.seq === seq)
if (packet) {
this.sendPacket(packet, address, port)
}
}
}
module.exports = ReliableUDP
// server.js
const ReliableUDP = require('./reliableUDP')
const server = new ReliableUDP()
server.listen(, 'localhost')
server.onReceive((data) => {
console.log(data.toString())
})
// client.js
const ReliableUDP = require('./reliableUDP')
const client = new ReliableUDP()
for (let i =; i < 26; i++) {
const char = String.fromCharCode(x41 + i)
client.send(char, 'localhost',)
}