参考连接: https://juejin.cn/post/7263378772040122429

Go语言实现UDP socket的ack机制和丢包重传

UDP 在通讯之前不需要建立连接 可以直接发送数据包 是一种无连接协议(常用于音视频传输)

但是在有些场景 即需要UDP传输也需要向TCP一样(TCP的可靠传输)

解决丢包问题:

1.添加 seq/ack机制 确保数据发送到对端

什么是 seq/ack机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

在TCP(传输控制协议)中,序号(Sequence Number,简称seq)和确认应答号(Acknowledgment Number,简称ack)是协议头部非常关键的字段,它们共同确保了TCP的可靠性和数据按顺序传输的特性。

** Sequence Number **
含义:序号是指一个TCP报文段中第一个字节的数据序列标识。它表示在一个TCP连接中,该报文段所携带的数据的开始位置。序号是用来保证数据传输的顺序性和完整性的。

作用:在TCP连接建立时,双方各自随机选择一个初始序列号(ISN)。随后传输的每个报文段的序号将基于这个初始值递增,其增量为该报文段所携带的数据量(字节数)。通过这种方式,接收方可以根据序号重组乱序到达的数据片段,确保数据的正确顺序和完整性。如果接收到的报文段不连续,接收方可以通过TCP的重传机制请求发送方重新发送缺失的数据。



**Acknowledgment Number**
含义:确认应答号是接收方期望从发送方接收到的下一个报文段的序号。它实质上是接收方告诉发送方:“我已经成功接收到了哪个序号之前的所有数据,请从这个序号开始发送后续的数据。”

作用:确认应答号用于实现可靠性传输。当一个报文段被接收方正确接收时,接收方会发送一个ACK报文,其中包含的确认应答号是接收到的数据加上1(即接收方期望接收的下一个数据的序号)。通过检查这个确认应答号,发送方能够知道其发送的数据是否已被接收方正确接收,并据此决定是否需要重传某些数据段。

ack和seq 保证了:

  • 确保数据的顺序性:即使数据片段在网络中的传输过程中顺序被打乱,接收方也能根据序号正确地重组这些数据。
  • 检测丢包:如果发送方发送的数据长时间未被确认(即没有收到对应的ACK报文),它会判断这些数据可能已丢失,并将其重新发送。
  • 实现流量控制和拥塞控制:通过调整发送未被确认数据的量(即控制窗口大小),TCP可以根据网络条件动态调整数据发送的速率,避免网络拥塞。

Golang的socket编程:

Go语言通过标准库中的net包来实现UDP和TCP的socket编程。net包提供了用于创建和管理网络连接的函数,以及用于进行数据传输的相关类型和方法,不同于C++需要手动设置和管理socket API,不论实现UDP还是TCP都可以直接使用封装好的方法进行操作,大大简化了socket编程:

使用net包实现UDP通信
1.client.go

1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package main

import (
"bufio"
"fmt"
"net"
"os"
)

func main() {
// 创建UDP连接到服务器的地址和端口号
c, err := net.DialUDP("udp", nil, &net.UDPAddr{
IP: net.IPv4(127, 0, 0, 1),
Port: 8282,
})
if err != nil {
fmt.Println("dial err: %v\n", err)
return
}
defer c.Close() // 将 defer 放在 if 语句外面

// 从标准输入读取用户输入的数据
input := bufio.NewReader(os.Stdin)
for {
// 读取用户输入知道遇见换行符
s, err := input.ReadString('\n')
if err != nil {
fmt.Printf("read from stdin failed, err: %v\n", err)
return
}

// 将用户输入的数据转换为字节数组并通过UDP连接发送给服务器
_, err = c.Write([]byte(s))
if err != nil {
fmt.Printf("send to server failed, err: %v\n", err)
return
}

// 接收来自服务器的数据
var buf [1024]byte
n, addr, err := c.ReadFromUDP(buf[:])
if err != nil {
fmt.Printf("recv from udp failed, err: %v\n", err)
return
}

// 打印来自服务器的数据
fmt.Printf("服务器 %v, 响应数据: %v\n", addr, string(buf[:n]))
}
}

2.server.go

首先创建UDP监听器监听指定IP和端口,等待连接客户端,连接后会读取客户端发来的数据并打印收到的数据,并将接收的响应信息返回发送给客户端,使用死循环使其能够持续获取客户端数据,同样实现了UDP的数据接收和发送,实现了简单的UDP服务器;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"fmt"
"net"
)

// udp server
func main() {
// 创建一个UDP监听器,监听本地IP地址的端口
listen, err := net.ListenUDP("udp", &net.UDPAddr{
IP: net.IPv4(127, 0, 0, 1),
Port: 8282,
})
if err != nil {
fmt.Printf("listen failed,err:%v\n", err)
return
}
defer listen.Close()

for {
var buf [1024]byte
// 从UDP连接中读取数据到buf中,n为读取到的字节数,addr为数据发送者的地址
n, addr, err := listen.ReadFromUDP(buf[:])
if err != nil {
fmt.Printf("read from udp failed,err:%v\n", err)
return
}

// 打印接收到的数据
fmt.Println("接收到的数据:", string(buf[:n]))

// 将接收到的数据原样发送回给数据发送者
_, err = listen.WriteToUDP(buf[:n], addr)
if err != nil {
fmt.Printf("write to %v failed,err:%v\n", addr, err)
return
}
}
}

效果:(好有趣)

1
2
3
4
5
6
7
8
> go run client.go
hello
服务器 127.0.0.1:8282, 响应数据: hello

world
服务器 127.0.0.1:8282, 响应数据: world


1
2
3
4
5
6
 go run server.go
接收到的数据: hello

接收到的数据: world


基于seq/apk

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package main

import (
"fmt"
"net"
"strconv"
"strings"
"time"
)

type Message struct {
Seq int
Msg string
}

func main() {
c, err := net.DialUDP("udp", nil, &net.UDPAddr{
IP: net.IPv4(127, 0, 0, 1),
Port: 8282,
})

if err != nil {
fmt.Printf("dail err:%v\n", err)
}
defer c.Close()

input := []string{"Message1", "Message2", "Message3", "Message4", "Message5"}
seq := 0

for _, msg := range input {
seq++
message := Message{Seq: seq, Msg: msg}
fmt.Printf("Sending seq=%d: %s\n", message.Seq, message.Msg)

// 发送带有序列号的数据包
_, err = c.Write(encodeMessage(message))
if err != nil {
fmt.Printf("send to server failed,err:%v\n", err)
return
}

}
// 等待ACK,设置超时时间
buf := make([]byte, 1024)
c.SetReadDeadline(time.Now().Add(5 * time.Second))
n, _, err := c.ReadFromUDP(buf)
if err != nil {
fmt.Println("ACK not received. Timeout or Error.")
return
} else {
ack := decodeMessage(buf[:n])
if ack.Seq == seq+1 {
fmt.Printf("ACK = %d\n", ack.Seq)
} else {
fmt.Println("Invalid ACK received. Retry.")
return
}
}

}

func encodeMessage(msg Message) []byte {
// 将序列号和消息文本编码成字节数据
return []byte(fmt.Sprintf("%d;%s", msg.Seq, msg.Msg))
}

func decodeMessage(data []byte) Message {
// 解码收到的数据,提取序列号和消息文本
parts := strings.Split(string(data), ";")
seq, _ := strconv.Atoi(parts[0])
msg := parts[1]
return Message{Seq: seq, Msg: msg}
}

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package main

import (
"fmt"
"net"
"strconv"
"strings"
)

type Message2 struct {
Seq int
Msg string
}

func main() {
listen, err := net.ListenUDP("udp", &net.UDPAddr{
IP: net.IPv4(127, 0, 0, 1),
Port: 8282,
})
if err != nil {
fmt.Printf("listen failed,err:%v\n", err)
return
}
defer listen.Close()

for {
var buf [1024]byte
n, addr, err := listen.ReadFromUDP(buf[:])
if err != nil {
fmt.Printf("read from udp failed,err:%v\n", err)
return
}

// 处理接收到的数据,提取序列号和消息文本
message := decodeMessage1(buf[:n])
fmt.Printf("Received seq=%d from %v: %s\n", message.Seq, addr, message.Msg)

// 发送ACK回复给客户端,ACK=Seq+1
ack := Message2{Seq: message.Seq + 1, Msg: "ACK"}
_, err = listen.WriteToUDP(encodeMessage1(ack), addr)
if err != nil {
fmt.Printf("write to %v failed,err:%v\n", addr, err)
return
}
}
}

func encodeMessage1(msg Message2) []byte {
// 将序列号和消息文本编码成字节数据
return []byte(fmt.Sprintf("%d;%s", msg.Seq, msg.Msg))
}

func decodeMessage1(data []byte) Message2 {
// 解码收到的数据,提取序列号和消息文本
parts := strings.Split(string(data), ";")
seq, _ := strconv.Atoi(parts[0])
msg := parts[1]
return Message2{Seq: seq, Msg: msg}
}

}

问题核心:在同一文件夹内,Go 默认将所有文件视为同一包的一部分,可能导致重复定义或未定义的错误。
推荐解决方法:
分文件夹运行。
提取共享逻辑到单独的文件。
运行时显式指定所有 .go 文件。
额外改进:通过参数区分服务端和客户端运行模式,进一步优化程序结构。

,服务器监听的端口可能是某个固定端口(例如 12345),而客户端的UDP数据包来自 127.0.0.1:59653。这意味着:
UDP允许客户端选择任意可用的端口号作为源端口,并将数据包发送到服务器的监听端口。这也是为什么你看到不同的端口号。


4.实现超时重传

最简单的方法就是当超时没有收到ack回复或者ack回复错误乱序时,阻塞后续发包,先进行重传,
把丢失的包重传后再继续发包,因此主要是在客户端更改一些发包的逻辑,服务端则没有变化;

client.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package main

import (
"fmt"
"net"
"strconv"
"strings"
"time"
)

type Message struct {
Seq int
Msg string
}

func main() {
c, err := net.DialUDP("udp", nil, &net.UDPAddr{
IP: net.IPv4(127, 0, 0, 1),
Port: 8282,
})
if err != nil {
fmt.Printf("dial,err:%v\n", err)
return
}
defer c.Close()

// 示例数据
input := []string{"Message 1", "Message 2", "Message 3", "Message 4", "Message 5"}

for seq, msg := range input {
for {
message := Message{Seq: seq + 1, Msg: msg}
fmt.Printf("Sending seq=%d: %s\n", message.Seq, message.Msg)

// 发送带有序列号的数据包
_, err := c.Write(encodeMessage(message))
if err != nil {
fmt.Printf("send to server failed,err:%v\n", err)
return
}

// 开始等待ACK,设置超时时间
buf := make([]byte, 1024)
c.SetReadDeadline(time.Now().Add(5 * time.Second))

// 循环等待ACK,直到收到正确的ACK或超时
n, _, err := c.ReadFromUDP(buf)
if err != nil {
// 超时或发生错误,需要重传
fmt.Println("ACK not received. Timeout or Error. Retrying...")
continue
} else {
//解码从服务器传来的ack
ack := decodeMessage(buf[:n])
if ack.Seq == seq+2 {
fmt.Printf("ACK = %d\n", ack.Seq)
// 收到正确的ACK,跳出内部循环,继续发送下一个消息
break
} else {
// 收到错误的ACK,继续等待,内部循环会重发相同的消息
fmt.Println("Invalid ACK received. Waiting for correct ACK...")
continue
}
}
}
}
}

func encodeMessage(msg Message) []byte {
// 将序列号和消息文本编码成字节数据
return []byte(fmt.Sprintf("%d;%s", msg.Seq, msg.Msg))
}

func decodeMessage(data []byte) Message {
// 解码收到的数据,提取序列号和消息文本
parts := strings.Split(string(data), ";")
seq, _ := strconv.Atoi(parts[0])
msg := parts[1]
return Message{Seq: seq, Msg: msg}
}

server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package main

import (
"fmt"
"math/rand"
"net"
"strconv"
"strings"
)

type Message struct {
Seq int
Msg string
}

func main() {
listen, err := net.ListenUDP("udp", &net.UDPAddr{
IP: net.IPv4(127, 0, 0, 1),
Port: 8282,
})
if err != nil {
fmt.Printf("listen failed,err:%v\n", err)
return
}
defer listen.Close()

for {
var buf [1024]byte
n, addr, err := listen.ReadFromUDP(buf[:])
if err != nil {
fmt.Printf("read from udp failed,err:%v\n", err)
return
}

// 以20%的概率模拟丢包
if rand.Float32() < 0.2 {
fmt.Printf("From %v lost package\n", addr)
continue
}

// 处理接收到的数据,解码字节流 提取序列号和消息文本
message := decodeMessage(buf[:n])
fmt.Printf("Received seq=%d from %v: %s\n", message.Seq, addr, message.Msg)

// 发送ACK回复给客户端,ACK=Seq+1
ack := Message{Seq: message.Seq + 1, Msg: "ACK"}
//写入ack向UDP连接中
_, err = listen.WriteToUDP(encodeMessage(ack), addr)
if err != nil {
fmt.Printf("write to %v failed,err:%v\n", addr, err)
return
}
}
}

func encodeMessage(msg Message) []byte {
// 将序列号和消息文本编码成字节数据
return []byte(fmt.Sprintf("%d;%s", msg.Seq, msg.Msg))
}

func decodeMessage(data []byte) Message {
// 解码收到的数据,提取序列号和消息文本
parts := strings.Split(string(data), ";")
seq, _ := strconv.Atoi(parts[0])
msg := parts[1]
return Message{Seq: seq, Msg: msg}
}

上述方法实现的丢包重传虽然能够正常工作,但是发送端使用双层循环嵌套,并且每次丢包都阻塞了后续发包,这样会导致重传的效率很低,只适用于小宽带低延时的情况,而且超时重传容易产生误判,主要有以下两种情况:

对方收到了数据包,但是ack发送途中丢失,其实就是我服务器模拟丢包的情况,服务器可能收到了数据,但是因为某种原因ack没能正确发送;
ack在回传的途中,但是时间已经超过了发送端的ack等待时间即超过了一次RTO,这样也会导致接收端收到数据却仍然重传的问题。

学习 滑动窗口 拥塞控制 优化一些东西 深入理解seq/ack的原理和超时重传的逻辑 Go语言进行socket编程的方法 UDP和TCP的一些知识

借鉴文章:https://blog.csdn.net/weixin_41500064/article/details/135119715

流量控制:

作用:为了解决发送方和接收方速度不同而导致的数据丢失问题,当发送方发送的太快,接收方来不及接受就会导致数据丢失;

方式:由接收端采用滑动窗口的形式,告知发送方允许/停止发包解决TCP丢包问题。

拥塞控制:

作用:为了解决过多的数据注入到网络导致网络崩溃和超负荷问题;

方式:由发送方采用拥塞窗口的形式去判断网络状态,从而采取不同算法执行TCP动态发包解决网络整体质量问题。

典型的场景如以太网场景中10G设备向1G设备发TCP包:
10G网卡 10Gbps 持续发送TCP包 交换机 以 1Gbps每秒接收TCP包
TCP 提供一种机制可以让发送端根据接收端的实际接收能力控制发送的数据量。这就是所谓的流控制。
接收端主机向发送端主机通知自已可以接收数据的大小;
是发送端会发送不超过这个限度的数据,该大小限度就被称作窗口大小。窗口大小的值由接收端主机决定,而在TCP 首部中,专门有一个字段用来通知窗口大小:
接收主机将自己可以接收的缓冲区大小放入这个字段中通知给发送端,这个字段的值越大,说明网络的吞吐量越高。
发送端主机会根据接收端主机的指示,对发送数据的量进行控制。这也就形成了一个完整的TCP流控制(流量控制)。
当Window=0时,即告知发送方停止发送数据。

慢开始

发送方先探测网络拥塞程度,并不是一开始就发送大量的数据,发送方会根据拥塞程度增大拥塞窗口cwnd。
拥塞窗口cwnd值是几就能发送几个数据段

拥塞避免

继上图达到cwnd≥ssthresh后采用该算法用来控制拥塞窗口的增长速率。

计算方法:每经过一个传输轮次cwnd值加1,让cwnd值呈线性缓慢增大

拥塞发生

当网络发生拥塞丢包时(如图假定cwnd=24),会有两种情况:

超时重传(1988年TCP Tahoe版本,已废弃不用)

更新后的 ssthresh 值变为 12(即为出现超时时的窗口数值 24 的一半),拥窗口再重新设置为1
并执行慢开始算法。当cwnd = ssthresh = 12 时改为执行拥塞避免算法拥塞窗口按线性规律增长,
每经过一个往返时间增加一个MSS的大小。在TCP拥塞控制的文献中经常可看见“乘法减小”(Multiplicative Decrease)和“加法增大”(Additive Increase)这样的提法。
“乘法减小”是指不论在开始阶段还是拥塞避免阶段,只要出现超时(即很可能出现了网络拥塞),就把慢开始门限值 ssthresh 减半,即设置为当前的拥塞窗口的一半(与此同时,执行慢开始算法)。
而“加法增大”是指执行拥塞避免算法后,使拥塞窗口缓慢增大,以防止网络过早出现拥塞。上面两种算法合起来常称为 AIMD算法(加法增大乘法减小)。

快重传与快恢复

超时重传算法直接把cwnd减小到1太过强烈,若还能收到连续3个ACK说明网络质量没那么差。同时采用快重传和快恢复算法,过程如下:

(1)当发送方连续收到3个重复确认时即执行“乘法减小”算法,把慢开始门限ssthresh 减半;

(2)由于发送方现在认为网络不那么差,于是把cwnd值设置为开始门限ssthresh 减半后的数值;

(3)开始执行拥塞避免算法 (“加法增大”)使拥塞窗口缓慢地线性增大。


12.25 日

根据计算机网络这本书,深入了解UDP套接字编程:

整体的一个过程: 进程间彼此通过向套接字发送报文来进行通信

一个主机-》多个进程 1个进程-》多个套接字

帮助记忆: 进程是一间房子 而 套接字是房子门 还需要一个 端口 来识别我具体要将信息发送到哪间房子,也就是具体的房间号(端口)

  1. 客户从键盘键入数据,并向服务器发送数据(将数据编码成字节流),
  2. 服务器接收数据(接收字节流)-》并将其解码为原始客户端输入的数据 (并把小写转换为大写)
  3. 服务器将修改的数据(编码为字节流)发送回客户端
  4. 客户端接收数据后解码 并显示在屏幕上

接收源地址(ip+端口号)和分组