一 TCP粘包优雅处理

1.1 粘包产生的原因

在使用TCP协议分高发送数据时,为了提高发送效率,需要对封包进行拼包处理,将小包凑成大包,在TCP层可以节约包头的大小损耗,I/O层的调用损耗也可以有所降低。但此时也容易形成粘包,也就是没有按照预期的大小得到数据,数据包不完整。

在接收TCP封包时,接收缓冲区的大小与发送过来的TCP传输单元大小不等,这时候会造成两种情况:

  • 接收的数据大于等于接收缓冲区大小时,此时需要将数据复制到用户缓冲,接着读取后面的封包。
  • 接收的数据小于接收缓冲区大小时,此时需要继续等待后续的 TCP 封包。

在go语言的io包中有个函数ReadAtLeast()用来处理封装

func ReadAtLeast(r Reader, buf []byte, min int) (n int, err error) {
    if len(buf) < min {
        return 0, ErrShortBuffer
    }
    for n < min && err == nil {
        var nn int
        nn, err = r.Read(buf[n:])
        n += nn
    }
    if n >= min {
        err = nil
    } else if n > 0 && err == EOF {
        err = ErrUnexpectedEOF
    }
    return
}

在else判断中,读取目标已经完结,但是己经读取一些数据,也就是说,没法完成读取任务,发生了不可期望的终结错误。

ReadAtLeast还有个更好的封装:

func ReadFull(r Reader, buf []byte, min int) (n int, err error) {
    return ReadAtLeast(r, buf, len(buf))
}

这个函数只需要提供buf接收缓冲区切片,就可以将这个己经分配的buf填充满 。简单地说就是: 给多大空间,填充多少字节,直到填满空间。
使用ReadFull可以优雅地完成对TCP粘包的处理。

1.2 封包发送

封包格式:

  • Size:大小,代表Size后面包体的大小
  • Body:消息的二进制数据

在发送封包时,需要将封包的Size字段从无符号十六位整型转换为字节数组(利用binary包的Write函数):

// 二进制封包格式
type Packet struct {
    Size uint16 // 包体大小
    Body []byte // 包体数据
}

// 将数据写入dataWriter
func writePacket(dataWriter io.Writer, data []byte) error {

    // 准备一个字节数组缓冲
    var buf bytes.Buffer

    // 将Size写入缓冲
    err := binary.Write(&buf, binary.LittleEndian, uint16(len(data)))
    if err != nil {
        return err
    }

    // 写入包体数据
    _, err = buf.Write(data)
    if err != nil {
        return err
    }

    // 获取写入的完整数据
    out := buf.Bytes()

    // 写入完整数据
    _, err = dataWriter.Write(out)
    if err != nil {
        return err
    }

    return nil
}

1.3 连接器

连接器可以通过给定的地址和发送次数,不断地通过Socket给地址对应的连接发送封包。本例中,将通过循环构造的转为字符串的数字形成封包。由于速度快,包量大,因此能有效地造成粘包:

func Connector(addr string, sendTimes int) {

    // 创建连接
    conn, err := net.Dial("tpc", addr)
    if err != nil {
        fmt.Println("连接错误:", err)
        return
    }

    // 循环请求
    for i := 0; i < sendTimes; i++ {
        str := strconv.Itoa(i) // 将循环序号转化为字符串
        err := writePacket(conn, []byte(str))
        if err != nil {
            fmt.Println("发送错误:", err)
            break
        }
    }
}

1.4 接受器

连接器( Connector)只能发起到接收器( Acceptor)的连接, 一个接受器能接受多个来源连接。

接受器中侦昕的过程被放到一个独立的goroutine中,保证侦昕的过程不会阻塞其他逻辑的执行。在侦昕器停止时,需要使用 sync.WaitGroup进行同步,确认侦昕过程正常结 束。接受器不停的接受连接,当获取到连接时,为连接创建一个并发的会话处理的goroutine( handleSession 函数)。

接受器开始侦昕后,使用 Wait()方法等待侦昕结束。当处理粘包的任务完成时俗,AcceptorStop()方法会被调用,结束侦昕,同时退出整个程序。接受器的实现过程如下:

// 接收器
type Acceptor struct {
    ls            net.Listener   // 保存侦听器
    wg            sync.WaitGroup // 侦听器同步
    OnSeesionData func(net.Conn, []byte) bool
}

// 异步开始侦听
func (a *Acceptor) Start(addr string) {
    go func(addr string) {
        a.wg.Add(1) // 侦听开始时,添加一个任务
        defer a.wg.Done()

        var err error
        a.ls, err = net.Listen("tcp", addr)
        if err != nil {
            fmt.Println("listen err = ", err)
            return
        }
        // 侦听循环
        for {
            conn, err := a.ls.Accept() // 新连接没到来时,Accept阻塞
            if err != nil {
                break
            }
            go handleSession(conn, a.OnSeesionData)
        }
    }(addr)
}

// 停止侦听
func (a *Acceptor) Stop() {
    a.ls.Close()
}

// 等待侦听完全停止
func (a *Acceptor) Wait() {
    a.wg.Wait()
}

func NewAcceptor() *Acceptor {
    return &Acceptor{}
}

1.5 封包读取

本例中的 readPacket()函数可以从 dataReader 中读取封包数据,并且将封包的 Size 和 Body 部分分离并返回 Packet 结构:

func readPacket(dataReader io.Reader) (pkt Packet, err error) {
    var sizeBuffer = make([]byte, 2) // Size为uint16类型,占据2个字节

    // 持续读取Size,直到读到为止
    _, err = io.ReadFull(dataReader, sizeBuffer)
    if err != nil {
        return
    }

    // 读取数据
    sizeReader := bytes.NewReader(sizeBuffer)
    err = binary.Read(sizeReader, binary.LittleEndian, &pkt.Size)
    if err != nil {
        return
    }

    // 分配包体大小
    pkt.Body = make([]byte, pkt.Size)

    // 读取包体数据
    _, err = io.ReadFull(dataReader, pkt.Body)
    return
}

1.6 会话处理

务器在接受一个连接后就会进入会话( Session )处理。会话处理是一个循环,不停地从 Socket 中接收数据并通过 readPacket()函数将数据转换为封包。会话处理会传入一个处理数据的回调,处理好的数据会通过这个回调传送到用户的逻辑回调函数( callback) :

// 连接的会话逻辑
func handleSession(conn net.Conn, callback func(net.Conn, []byte) bool) {
    dataReader := bufio.NewReader(conn) // 创建socket读取器
    for {                               // 循环接收数据
        pkt, err := readPacket(dataReader)
        if err != nil || !callback(conn, pkt.Body) {
            // 回调要求退出
            conn.Close()
            break
        }
    }
}

1.5 测试粘包处理

func main() {

    const TESTCOUNT = 10000 // 测试次数
    const addr = "127.0.0.1:3000"

    var recCounter int // 接收器

    a := NewAcceptor()
    a.Start(addr)

    a.OnSeesionData = func(conn net.Conn, data []byte) bool {
        str := string(data)
        n, err := strconv.Atoi(str)
        if err != nil || recCounter != n {
            panic("failed")
        }
        recCounter++

        if recCounter >= TESTCOUNT {
            a.Stop()
            return false
        }

        return true
    }

    // 连接器不断发送数据
    Connector(addr, TESTCOUNT)

    // 等待侦听器结束
    a.Wait()
}

results matching ""

    No results matching ""