顶点云(应用)传输协议实现和封装

使用 Golang 实现上一篇文章设计的云存储系统文件传输协议。将实现代码封装,提供外部访问接口。

设计 transmitter

socket 无法维持协议格式,因此需要提供一个 transmitter 类(尽管 Golang 没有提供 OOP 显式表达,但以下将使用 “” 称呼 strcut,使用 “接口” 称呼 interface,使用 “对象” 称呼 struct 的一个具体实例),该类将 socket 连接包装起来,并向用户提供符合协议要求的接口。此外,下面将使用 “消息” 表示一组符合协议格式的数据流,即满足此前定义的协议格式的一个数据包。

类的私有变量

  • conn net.Conn: transmitter 的内部实现应当基于 socket 通信,因此内部需要一个 net.Conn 对象。
  • buf []byte: socket 通信需要一个缓冲区,因为 socket 本身无法维持应用层的消息边界,所以每次从缓冲区读取的数据长度可能超过一个消息的长度,超出部分的数据实际为下个消息的首部。为了保证数据不被丢失,超过一组消息长度的数据应当被保存在缓冲区中,并和下次读取的数据组成一个新的消息。一个 net.Conn 对象应当使用同一个缓冲区,以保证数据不被遗漏。因此 transmitter 类中应当维护 net.Conn 使用的缓冲区。
  • buflen int64: transmitter 中维护的缓冲区大小,单位为字节。
  • block cipher.Block: transmitter 应当保证传输数据的安全性,block 是 transmitter 使用的加密模块,将在后面加密部分的文章中介绍。下面假定已有函数 AesEncode(plain []byte, block cipher.Block) []byteAesDecode(encipher []byte, length int64, block cipher.Block)([]byte, err),这两个函数的功能分别是使用 blockplain 字节流加密,以及使用 blockencipher 密文解密。前者会返回加密后的字节流,后者会返回解密后的明文,如果解密失败则返回值携带 error。
  • recvLen int64: 缓冲区中已存储的数据长度,即从 socket 读取的,但超过一个消息长度、尚未被使用的数据长度。每次将 socket 接收到的消息读取到缓冲区后,应当将超过一个消息长度的数据移动至缓冲区最开始的位置,这部分数据长度即为 recvLen,下次 socket 的 read 操作将向缓冲区 buf[recvLen:] 写入。

接口需求分析

  • 基本的内部数据获取和设置
    • GetConn() net.Conn:获取该 transmitter 内部的 socket 连接。
    • GetBuf() []byte: 获取该 transmitter 内部的缓冲区。
    • GetBuflen() int64: 获取该 transmitter 的缓冲区长度。
    • GetBlock() cipher.Block: 获取该 transmitter 的加密模块。
    • SetBuflen(int64) bool: 为该 transmitter 设置缓冲区大小,并自动拓展/缩小原有的缓冲区。
  • 析构函数:当 transmitter 寿命终止时,应当调用析构函数 Destroy() 销毁内部的数据并断开 socket 连接。
  • 数据传输接口:transmitter 应当提供一些易使用的公有方法,这些方法可以让用户方便的发送/接收字节流,或者从特定的 Reader 发送字节流,并接收字节流至特定的 Writer。我们在《认证、传输协议设计》中假设过已存在一个 RecvBytes() 函数,该函数能够从 socket 缓冲区读取一组消息。除了要实现该函数,还应当实现对称的 SendBytes(message []byte) 函数,用于向 socket 发送一组消息。同样,应当设计函数 SendFromReader(reader)RecvToWriter(writer) ,它们提供了数据源/目的地是 reader/writer 的操作。

具体定义接口

在工程目录下新建文件夹 transmit ,在该目录下新建代码文件 transmit.go,将私有类 transmitter 的公有接口定义为 Transmitable,其 Golang 代码表示如下。其中,SendFromReaderRecvToWriter 选择的 Reader/Writer 是 bufio 包中的读写器,因为在传输长数据流中将使用 bufio.Readerbufio.Writer 去读/写文件。SendFromReader 的第二个参数是要发送的长度,如果这个参数大于 Reader 所能读取的长度,则读取到 Reader 末尾结束,否则读取到参数长度即结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
// transmit.go
type Transmitable interface {
SendFromReader(*bufio.Reader, int64) bool
SendBytes([]byte) bool
RecvToWriter(*bufio.Writer) bool
RecvBytes() ([]byte, error)
Destroy()
SetBuflen(int64) bool
GetConn() net.Conn
GetBuf() []byte
GetBuflen() int64
GetBlock() cipher.Block
}

实现 transmitter

SendBytes() 和 RecvBytes()

  • SendBytes([]byte) bool 的发送过程如下:首先发送一个 8 字节、大端序的明文表明即将发送的总长度,之后按照《认证、传输协议设计》设计的数据协议格式 格式1 发送消息,因为每个消息内包含的数据长度受缓冲区限制,因此可能需要传送多个消息才能发送所有数据。代码如下,其中第 6 行发送待发送数据明文的总长度,用到的 auth.Int64Bytes(int64) []byte 可以将一个 int64 数据转化为 8 字节、大端序的字节数组,与之对称的 BytesToInt64([]byte) int64 可以将参数的前 8 个字节转化为一个 int64 类型,这两个函数将在后面要实现的 authenticate 包中介绍,现在假设已经拥有这两个函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// transmit.go
func (t *transmitter) SendBytes(toSend []byte) bool {
if t.buf == nil || t.conn == nil {
return false
}
totalLength := len(toSend)
_, err := t.conn.Write(auth.Int64ToBytes(int64(totalLength)))
if err != nil {
return false
}
chRate := time.Tick(2e3)
alSend := 0
var length int
for {
<-chRate
if totalLength == alSend {
break
}
if totalLength-alSend < int(t.buflen/3) {
length = totalLength - alSend
} else {
length = int(t.buflen / 3)
}
copy(t.buf[16:], toSend[alSend:alSend+length])
copy(t.buf, auth.Int64ToBytes(int64(length)))
encoded := auth.AesEncode(t.buf[16:length+16], t.block)
copy(t.buf[8:], auth.Int64ToBytes(int64(len(encoded)+16)))
copy(t.buf[16:], encoded)
_, err = t.conn.Write(t.buf[:len(encoded)+16])
if err != nil {
return false
}
alSend += length
}
return true
}
  • 上面的代码中,chRate 是一个只读的 channel,它来自内置的 time 库,可用于控制发送速度。如果发送方发送频率大于接收方接收频率,则可能出现阻塞和数据丢失,因此必须限制发送方的 chRate 速度慢于接收方的接收速率。alSend 表示已经发送的数据长度。经验表明,使用 AES CFB 加密时,密文长度通常不会超过明文长度的两倍,为了保险,我们设置密文长度上限为缓冲区长度,因此每个消息可携带的明文长度至多为缓冲区长度的 1/3。代码的第 18 行用于判断接下来要发送的消息需要携带的明文长度,如果剩余要发送的数据长度大于 1/3 个缓冲区长度,则发送 1/3 个缓冲区长度的明文,否则只发送剩余的明文。23 ~ 27 行构造发送的消息,消息格式和此前设计的 格式1 相同。函数返回的 bool 值表明发送是否成功。
  • RecvBytes()([]byte, error)的接收过程如下:首先从 socket 读取 8 个字节,按大端序转为 int64 类型,得到要接收数据的总长度。之后严格按照 格式1 接收消息,每次先接收 16 字节,根据前 8 字节获得即将接收的消息携带的明文长度,根据后 8 字节获得整个消息的长度。当接收到的数据长度达到总长度时,将停止接收并以字节流返回本次调用接收到的所有数据。代码如下,其中第 7 行用到的 RecvUntil(until int64, init int64, chRate <-chan time.Time)(int64, err) 方法的作用是,当前缓冲区已有长度为 init 的数据,该方法将以 chRate 指定的速率一直从 socket 读取,直到缓冲区长度达到了 until,并返回现在缓冲区持有的数据长度。如果在方法调用期间出现异常,返回值将携带错误消息。该方法的代码将在介绍完 RecvBytes() 后立刻给出,你可以先拖到下面查看该方法具体实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// transmit.go
func (t *transmitter) RecvBytes() ([]byte, error) {
var err error
if t.buf == nil || t.conn == nil {
return nil, err
}
chRate := time.Tick(1e3)
length, err := t.RecvUntil(8, t.recvLen, chRate)
if err != nil {
return nil, err
}
totalLength := auth.BytesToInt64(t.buf[:8])
var toRecvLength int64 = totalLength
var plength int64 = 0
var elength int64 = 0
var pRecv int64 = length - 8
defer func() { t.recvLen = pRecv }()
copy(t.buf, t.buf[8:length])
returnBytes := make([]byte, 0, conf.AUTHEN_BUFSIZE)
for {
if toRecvLength == int64(0) {
return returnBytes, nil
}
pRecv, err = t.RecvUntil(int64(16), pRecv, chRate)
if err != nil {
return nil, err
}
plength = auth.BytesToInt64(t.buf[:8])
elength = auth.BytesToInt64(t.buf[8:16])
pRecv, err = t.RecvUntil(elength, pRecv, chRate)
if err != nil {
return nil, err
}
receive, err := auth.AesDecode(t.buf[16:elength], plength, t.block)
if err != nil {
return nil, err
}
returnBytes = append(returnBytes, receive...)
toRecvLength -= plength
copy(t.buf, t.buf[elength:pRecv])
pRecv -= elength
}
}
  • 上面的代码中,第 12 行获取要接收的数据总长度,并将 toRecvLengthtotalLength 均赋值为该长度。第 16 行将 pRecv 赋值为当前缓冲区已有数据长度,在方法执行期间,该变量将始终指代缓冲区存在的待处理数据长度,第 17 行指定当方法结束时将 pRecv 赋值给 transmitter 内私有变量 recvLen。在每次接收消息前,先判断当前已接收到全部明文(toRecvLength == 0),之后接收每个消息的前 16 个字节,再接收整个消息。elength 为当前要接收的消息的总长度(密文长度+16),plength 为当前要接收消息携带的明文长度。在上面代码的倒数第 3 ~ 4 行,pRecv -= elength 维护缓冲区待处理数据长度,copy(t.buf, t.buf[elength:pRecv]) 维护缓冲区待处理数据内容。
  • 方法 RecvUntil(int64, int64, <-chan time.Time) (int64, error) 的实现如下。这个方法应当被定义为私有的(方法首字母小写),这里因为编写时的失误没有注意,在这里保留这个漏洞,作为警醒。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// transmit.go
func (t *transmitter) RecvUntil(until int64, init int64, chR <-chan time.Time) (int64, error) {
for {
if init >= until {
break
}
<-chR
length, err := t.conn.Read(t.buf[init:])
if err != nil {
return init, err
}
init += int64(length)
}
return init, nil
}

SendFromReader() 和 RecvToWriter()

  • 这两个方法和上面的 SendBytes()RecvBytes()类似,区别在于读取/接收的对象是 Reader/Writer。
  • SendFromReader(*bufio.Reader, int64) bool 方法代码如下,和 SendBytes() 的主要不同在,每次发送的数据长度由交付 Reader 的缓冲区约束,同时要判断当前读取的 Reader 是否到达 EOF 边界,如果到达则表明发送结束,应当返回 true,否则才返回 false 表明传输出错。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// transmit.go
func (t *transmitter) SendFromReader(reader *bufio.Reader, totalLength int64) bool {
if t.buf == nil || t.conn == nil {
return false
}
_, err := t.conn.Write(auth.Int64ToBytes(totalLength))
if err != nil {
return false
}
sendLength := totalLength
chRate := time.Tick(2e3)
var encodeBufLen int64 = t.buflen/3 - 16
var length int
for {
<-chRate
if sendLength <= 0 {
return true
}
if sendLength >= encodeBufLen {
length, err = reader.Read(t.buf[16 : 16+encodeBufLen])
} else {
length, err = reader.Read(t.buf[16 : 16+sendLength])
}
if err != nil {
if err.Error() == "EOF" {
return true
} else {
return false
}
}
copy(t.buf, auth.Int64ToBytes(int64(length)))
encoded := auth.AesEncode(t.buf[16:length+16], t.block)
copy(t.buf[8:], auth.Int64ToBytes(int64(len(encoded)+16)))
copy(t.buf[16:], encoded)
_, err = t.conn.Write(t.buf[:len(encoded)+16])
if err != nil {
return false
}
sendLength -= int64(length)
if length == 0 {
return true
}
}
}
  • RecvToWriter(*bufio.Writer) bool 方法实现代码如下,它和 RecvBytes() 的主要不同在于,在函数退出时需要使用 Writer 的 Flush() 方法将缓冲区数据全部写入,并且在 Writer 写入的过程中需要检查写入是否正确。此外,增加了 valid 变量,因为 Writer 写入的错误不应当影响网络连接的传输,因此当出现本地系统错误时,为了不破坏传输,将使 valid 为 FALSE,但传输将继续进行,最终返回给用户的值为 FALSE。这里的实现非常不人性化,另一种实现方式是,无论出现的错误是用户客户端本地错误还是网络连接错误,均直接返回 FALSE,通常应用 RecvToWriter 的场景是传输长数据流,在返回 FALSE 时,多数调用 transmitter 的父函数会立刻命令 transmitter 终止。因此这里直接返回 FALSE 可能是更好的实现方式,避免了不必要的带宽浪费。这里经过权衡,最终代码中选择直接返回 FALSE,但这里保留使用 valid 变量的代码留作对比。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// transmit.go
func (t *transmitter) RecvToWriter(writer *bufio.Writer) bool {
var err error
if t.buf == nil || t.conn == nil {
return false
}
chRate := time.Tick(1e3)
length, err := t.RecvUntil(8, t.recvLen, chRate)
if err != nil {
return false
}
totalLength := auth.BytesToInt64(t.buf[:8])
var valid bool = true
var recvLength int64 = 0
var plength int64 = 0
var elength int64 = 0
var pRecv int64 = length - 8
defer func() { t.recvLen = pRecv }()
copy(t.buf, t.buf[8:length])
for {
if recvLength == int64(totalLength) {
writer.Flush()
return valid
}
pRecv, err = t.RecvUntil(int64(16), pRecv, chRate)
if err != nil {
return false
}
plength = auth.BytesToInt64(t.buf[:8])
elength = auth.BytesToInt64(t.buf[8:16])
pRecv, err = t.RecvUntil(elength, pRecv, chRate)
if err != nil {
return false
}
receive, err := auth.AesDecode(t.buf[16:elength], plength, t.block)
if err != nil {
valid = false
}
outputLength, outputError := writer.Write(receive)
if outputError != nil || outputLength != int(plength) {
valid = false
}
recvLength = recvLength + plength
copy(t.buf, t.buf[elength:pRecv])
pRecv -= elength
}
}

Transmitable 接口的调用

  • 当试图发送一个文本类型的指令 command 时,Transmitable 变量可以直接调用 SendBytes(command),远端调用 RecvBytes() 即可获取 command
  • 当试图发送一个文件时,发送端创建一个 Reader 对象 reader 并获取文件大小 length,调用 SendFromReader(reader, length),接收端创建一个 Writer 对象 writer,调用 RecvToWriter(writer)

专栏目录:顶点云(应用)设计与实现
此专栏的上一篇文章:顶点云(应用)认证、传输协议设计
此专栏的下一篇文章:顶点云(应用)认证基础模块实现

原创作品,允许转载,转载时无需告知,但请务必以超链接形式标明文章原始出处(http://blog.forec.cn/2016/11/14/zenith-cloud-2/) 、作者信息(Forec)和本声明。

分享到