Node.js高级编程之UDP可靠性分析

JavaScript/前端
288
0
0
2023-09-08
标签   NodeJs
目录
  • 前言
  • 不可靠的 UDP
  • 基于 UDP 的简单可靠传输协议
  • 乱序问题
  • 丢包问题
  • 代码

前言

UDP 协议是我们平时较少接触到的知识,不同于 TCP,它是“不可靠”的,今天我们就来实战一下看下它到底怎么个不可靠法?

不可靠的 UDP

实验前,我们先介绍一下需要用到的工具(Mac 环境,其他环境请自行搜索相关工具):

  • Network Link Conditioner:模拟丢包场景,可以去苹果开发者网站上下载
  • Wireshark:抓包分析工具
  • 云主机:因为实现发现 Network Link Conditioner 对本地回环地址不起作用,如果有更好的方法求大佬指出

然后我们准备两段代码,一段作为 UDP Server,一段作为 UDP Client,Client 会向 Server 发送 26 个英文大写字母,Server 会将他们存到文件:

// udp-server.js
const udp = require('dgram')
const server = udp.createSocket('udp')
const fs = require('fs')
server.on('listening', function () {
  var address = server.address()
  var port = address.port
  console.log('Server is listening at port ' + port)
})
server.on('message', function (msg, info) {
  console.log(
    `Data received from ${info.address}:${info.port}: ${msg.toString()}`
  )
  fs.appendFileSync('./out', msg.toString())
})
server.on('error', function (error) {
  console.log('Error: ' + error)
  server.close()
})
server.bind()
// udp-client.js
const udp = require('dgram')
const client = udp.createSocket('udp')
for (let i =; i < 26; i++) {
  const char = String.fromCharCode(x41 + i)
  client.send(Buffer.from(char),, '********', function (error) {
    if (error) {
      console.log(error)
    }
  })
}

接着我们按照下面步骤开始实验:

  • 通过 Network Link Conditioner 把丢包率设置为 50%:

  • 设置好 Wireshark 的抓包参数:

  • 在云主机上启动 Server,在本地启动 Client。

接着,我们来看一下实验结果:

  • 首先,我们可以看到服务端接收到的字母少了很多,只有 14 个:

  • 服务端接收到的字母顺序是乱序的,比如 U 跑到了 T 的前面:

为了进行对比,我们可以换成 TCP 试试,代码如下,结果就不贴了:

// tcp-server.js
const net = require('net')
const server = net.createServer()
const fs = require('fs')
server.on('connection', function (conn) {
  conn.on('data', (msg) => {
    console.log(
      `Data received from ${conn.address().address}:${
        conn.address().port
      }: ${msg.toString()}`
    )
    fs.appendFileSync('./out', msg.toString())
  })
})
server.listen(, () => {
  console.log('server listening to %j', server.address().port)
})
// tcp-client.js
var net = require('net')
var client = new net.Socket()
client.connect(, '********', function () {
  for (let i =; i < 26; i++) {
    const char = String.fromCharCode(x41 + i)
    client.write(char)
  }
})

接下我们试试基于 UDP 来实现一个可靠的传输协议,主要解决上面的丢包和乱序问题。

基于 UDP 的简单可靠传输协议

首先,需要设计一下我们的协议格式。为了简单起见,我们只在原来 UDP 的数据部分分别新增 4 个字节的 SEQ 和 ACK:

+-------------------------------+
| 个字节的 UDP 首部       |
+-------------------------------+
|  SEQ( 个字节) |  ACK(4 个字节) |
+-------------------------------+
|             Data              |
+-------------------------------+

其中 SEQ 表示当前包的序号,ACK 表示回复序号。

接下来看看,我们如何解决前面的两个问题。

乱序问题

接收方需要维护一个变量 expectedSeq 的变量表示期待接收到的包序号。为了简单起见,我们制定如下规则:如果当前接收到的包序号等于 expectedSeq,则把包交给应用层处理,并发送 ACK 给发送方;否则我们都直接丢弃。当然更好的做法是维护一个接收窗口,这样可以批量的提交数据给应用层,也可以用来缓存大于 expectedSeq 的包。

假设现在发送方发送了 1 2 3 两个包,但是到达接收方的顺序是 3 2 1,按照我们的规则接收方会丢弃 3 和 2,接收 1。好家伙,顺序倒是不乱了,但是包没了。

所以还得把丢包问题也解决了才行。

丢包问题

发送方维护一个发送窗口用来存储已发送但是还未被确认的包:

+---+---+---+---+
| | 2 | 3 | 4 |
+---+---+---+---+

发送方每发送一个包的同时还需要将包放入发送窗口,并设置一个定时器用来重发这个包。当发送方接收到来自接收方的 ACK 时,需要取消掉对应包的定时器,并将发送窗口中小于 ACK 的包都删除。

+---+---+---+---+
| | 2 | 3 | 4 |
+---+---+---+---+
// ACK =,删除 1 2 3,并取消掉他们的定时器
+---+
| |
+---+

完整代码及使用 Demo 见文末,现在可以正常按顺序输出 26 个字母了,但是离“可靠”协议还差得远。比如第一次输出完 26 个字母后,我们再次启动客户端时发现就没有任何输出了。原因在于此时接收端的 expectedSeq 已经是 20 多了,但是新启动的 client 发送的 SEQ 还是从 1 开始的,结果就是接收端一直丢弃接收到的包,发送端一直重试。

要解决这个问题,可以参考 TCP 在传输两端建立“连接”的概念,在开始发送前通过“三次握手”建立连接,也就是确定起始 SEQ,初始化窗口等工作,结束前通过“四次挥手”断开连接,即清理窗口定时器等工作。这个就留到以后再说吧。

代码

// packet.js
class Packet {
  constructor({seq, ack, data = ''}) {
    this.seq = seq // 序列号
    this.ack = ack // 确认号
    this.data = data // 数据
  }
  // 将 Packet 转换成 Buffer,以便通过网络传输
  toBuffer() {
    const seqBuffer = Buffer.alloc()
    seqBuffer.writeUIntBE(this.seq)
    const ackBuffer = Buffer.alloc()
    ackBuffer.writeUIntBE(this.ack)
    const dataBuffer = Buffer.from(this.data)
    return Buffer.concat([seqBuffer, ackBuffer, dataBuffer])
  }
  // 从 Buffer 中解析出 Packet
  static fromBuffer(buffer) {
    const seq = buffer.readUIntBE()
    const ack = buffer.readUIntBE(4)
    const data = buffer.slice()
    return new Packet({seq, ack, data})
  }
}
module.exports = Packet
// reliableUDP.js
const dgram = require('dgram')
const Packet = require('./packet')
class ReliableUDP {
  constructor() {
    this.socket = dgram.createSocket('udp')
    this.socket.on('message', this.handleMessage.bind(this))
    this.sendWindow = [] // 发送窗口,用于存放待确认的数据包
    this.receiveWindow = [] // 接收窗口,用于存放已接收的数据包
    this.expectedSeq = // 期望接收的数据包序列号
    this.nextSeq = // 下一个要发送的数据包序列号
    this.timeout = // 超时时间,单位为毫秒
    this.timeoutIds = {} // 用于存放定时器 ID
  }
  listen(port, address, fn) {
    this.socket.bind(port, address, fn)
  }
  // 发送数据包
  sendPacket(packet, address, port) {
    const buffer = packet.toBuffer()
    this.socket.send(buffer, port, address, (err) => {
      if (err) {
        console.error(err)
      }
    })
    if (packet.ack) return
    if (!this.sendWindow.includes((p) => p.seq === packet.seq))
      this.sendWindow.push(packet)
    // 设置超时定时器
    const timeoutId = setTimeout(() => {
      this.handleTimeout(packet.seq, address, port)
    }, this.timeout)
    this.timeoutIds[packet.seq] = timeoutId
  }
  // 处理接收到的数据包
  handleMessage(msg, rinfo) {
    const {address, port} = rinfo
    const packet = Packet.fromBuffer(msg)
    // 收到的是应答的包
    if (packet.ack) {
      const ackNum = packet.ack -
      // 处理发送窗口中已经确认的数据包
      while (this.sendWindow.length > && this.sendWindow[0].seq <= ackNum) {
        this.sendWindow.shift()
      }
      // 清除超时定时器
      if (this.timeoutIds[ackNum]) {
        clearTimeout(this.timeoutIds[ackNum])
        delete this.timeoutIds[ackNum]
      }
    } else {
      // 如果是重复的数据包,则忽略
      if (packet.seq < this.expectedSeq) {
        return
      }
      // 如果是期望接收的数据包
      if (packet.seq === this.expectedSeq) {
        this.receiveWindow.push(packet)
        this.expectedSeq++
        // 处理接收窗口中已经确认的数据包
        while (
          this.receiveWindow.length > &&
          this.receiveWindow[].seq <= this.expectedSeq
        ) {
          const packet = this.receiveWindow.shift()
          this.onPacketReceived(packet.data)
        }
        const ackPacket = new Packet({
          seq: this.nextSeq++,
          ack: this.expectedSeq,
        })
        this.sendPacket(ackPacket, address, port)
      } else {
        // 如果是未来的数据包,暂不做处理,更好的做法是缓存起来
      }
    }
  }
  // 应用层调用该方法发送数据
  send(data, address, port) {
    const packet = new Packet({
      seq: this.nextSeq,
      ack: null,
      data,
    })
    this.sendPacket(packet, address, port)
    this.nextSeq++
  }
  // 应用层调用该方法注册回调函数,接收数据
  onReceive(callback) {
    this.onPacketReceived = callback
  }
  // 处理超时
  handleTimeout(seq, address, port) {
    // 重传超时的数据包
    const packet = this.sendWindow.find((p) => p.seq === seq)
    if (packet) {
      this.sendPacket(packet, address, port)
    }
  }
}
module.exports = ReliableUDP
// server.js
const ReliableUDP = require('./reliableUDP')
const server = new ReliableUDP()
server.listen(, 'localhost')
server.onReceive((data) => {
  console.log(data.toString())
})
// client.js
const ReliableUDP = require('./reliableUDP')
const client = new ReliableUDP()
for (let i =; i < 26; i++) {
  const char = String.fromCharCode(x41 + i)
  client.send(char, 'localhost',)
}