顶点云(应用)服务器逻辑实现

分析、设计云存储服务器的执行逻辑,包括用户验证登录、活动连接加入与释放、代理操作等。

服务器结构分析

  • 服务器可按初始化、监听、fork 出线程处理用户请求并继续监听流程处理。在对用户请求新建分支处理时,按照此前 认证、传输协议设计 设计的登录、传输线程接入流程处理。
  • 服务器应当维护一个主要的监听器,一个已登录用户列表以及对数据库操作的接口,其数据结构如下:
1
2
3
4
5
type Server struct {
listener net.Listener
loginUserList []cs.User
db *sql.DB
}
  • 服务器应当封装如下方法:
1
2
3
4
5
6
7
8
9
func (s *Server) InitDB() bool   // 初始化数据库
func (s *Server) BroadCastToAll(message string) // 向全体用户广播消息
func (s *Server) BroadCast(u cs.User, message string) bool // 向指定用户发送消息
func (s *Server) CheckBroadCast() // 用户间通信检查(负责用户之间的短时延通讯)
func (s *Server) AddUser(u cs.User) // 向在线列表添加用户
func (s *Server) RemoveUser(u cs.User) bool // 从在线列表删除用户
func (s *Server) Login(t trans.Transmitable) (cs.User, int) // 授权用户登录
func (s *Server) Communicate(conn net.Conn, level uint8) // 处理一个接入的未授权请求
func (s *Server) Run(ip string, port int, level int) // 监听指定地址并运行
  • 在工程目录下新建 server 目录,要编写的模块名为 server。在该目录下创建文件 server.go, 以下代码均在该文件中编辑。
  • 我们的代码中使用到的包通过如下代码导入:
1
2
3
4
5
6
7
8
9
10
11
12
import (
auth "zenith-cloud/authenticate"
conf "zenith-cloud/config"
cs "zenith-cloud/cstruct"
trans "zenith-cloud/transmit"
"database/sql"
"fmt"
_ "github.com/mattn/go-sqlite3"
"net"
"strings"
"time"
)

初始化

  • 根据 此前对数据库的设计,我们需要在程序运行过程中维护五个表记录,在程序运行前应当对数据库和对应表的存在加以检查。
  • 按照设计的数据库表实现的 InitDB() 代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (s *Server) InitDB() bool {
var err error
s.db, err = sql.Open(conf.DATABASE_TYPE, conf.DATABASE_PATH)
if err != nil {
return false
}
s.db.Exec(`create table cuser (uid INTEGER PRIMARY KEY AUTOINCREMENT,
username VARCHAR(64), password VARCHAR(128), created DATE)`)

s.db.Exec(`create table ufile (uid INTEGER PRIMARY KEY AUTOINCREMENT,
ownerid INTEGER, cfileid INTEGER, path VARCHAR(256), perlink VARCHAR(128),
created DATE, shared INTEGER, downloaded INTEGER, filename VARCHAR(128),
private BOOLEAN, linkpass VARCHAR(4)), isdir BOOLEAN`)

s.db.Exec(`create table cfile (uid INTEGER PRIMARY KEY AUTOINCREMENT,
md5 VARCHAR(32), size INTEGER, ref INTEGER, created DATE)`)

s.db.Exec(`create table cmessages (mesid INTEGER PRIMARY KEY AUTOINCREMENT,
targetid INTEGER, sendid INTEGER, message VARCHAR(512), created DATE)`)

s.db.Exec(`crete table coperations (oprid INTEGER PRIMARY KEY AUTOINCREMENT,
deletedUFileId INTEGER, deletedUFileName VARCHAR(128),
deletedUFilePath VARCHAR(256), relatedCFileId INTEGER, time DATE)`)

return true
}
  • 程序将先检查数据库是否存在,不存在则自动创建(sql.Open 函数会在数据库不存在时自动创建新数据库),之后不管表记录是否存在均新建表(若已存在则不会覆盖)。

消息广播

  • 消息广播通过每个用户客户端设置的被动监听连接传输
  • 系统向所有用户广播,只需对所有在线用户均调用 BroadCast(),具体实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
func (s *Server) BroadCast(u cs.User, message string) bool {
if u.GetInfos() == nil {
return false
}
return u.GetInfos().SendBytes([]byte(message))
}

func (s *Server) BroadCastToAll(message string) {
for _, u := range s.loginUserList {
s.BroadCast(u, message)
}
}
  • 用户之间的通讯需要服务器作为中转。然而,服务器将用户逻辑处理交予 cuser 结构代理,我们希望用户 cuser 结构和服务器 server 结构尽可能避免耦合,因此用户不应当在自己的 dealWithRequests() 逻辑中调用服务器的 BroadCast() 方法;另外,用户不应该具有判断另一用户是否在线的权限,因此即使用户具有与 BroadCast() 类似的方法,它也无法判断是否该立刻将待发送的消息投放给接收方。
  • 这里采取一个 naive 但是有效的方法:用户发送 send 指令通讯时,要发送的消息会被存储到 cmessage 表中,我们在服务器中启动一个守护线程,监视 cmessage 表,并在固定时间频率时刻检查该表是否有待发送消息,如果有则将可发送的(接收方用户在线)部分消息投放给接收方,并从 cmessage 表中删除投放成功的消息。我们的检查频率会比较快(如几秒一次),用户之间通讯的延迟将在可忍受范围内,毕竟我们要实现的是云存储系统而非即时通讯系统。用户结构和服务器结构保持了分离,用户也没有具有不该有的权限。
  • 用于处理用户间通讯交互的方法是 CheckBroadCast(),其具体实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func (s *Server) CheckBroadCast() {
chRate := time.Tick(conf.CHECK_MESSAGE_SEPERATE * time.Second)
var queryRows *sql.Rows
var queryRow *sql.Row
var mesid, uid, messageCount int
var message, created string
var err error
for {
<-chRate
for _, u := range s.loginUserList {
queryRow = s.db.QueryRow(fmt.Sprintf(`select count (*) from cmessages where
targetid=%d`, u.GetId()))

if queryRow == nil {
continue
}
err = queryRow.Scan(&messageCount)
if err != nil {
continue
}
id_list := make([]int, 0, messageCount)
queryRows, err = s.db.Query(fmt.Sprintf(`select mesid, sendid, message, created
from cmessages where targetid=%d`, u.GetId()))

if err != nil {
fmt.Println("query error: ", err.Error())
continue
}
for queryRows.Next() {
err = queryRows.Scan(&mesid, &uid, &message, &created)
if err != nil {
fmt.Println("scan error: ", err.Error())
break
}
if s.BroadCast(u, fmt.Sprintf("%d%s%s%s%s", uid, conf.SEPERATER, message,
conf.SEPERATER, created)) {
id_list = append(id_list, mesid)
} else {
break
}
}
for _, id := range id_list {
_, err = s.db.Exec(fmt.Sprintf(`delete from cmessages where mesid=%d`, id))
if err != nil {
fmt.Println("delete error: ", err.Error())
continue
}
}
}
}
}

运行

  • 我们将从上向下分析服务器的运行逻辑
  • 服务器应当对指定端口监听并能够对介入请求创建一个新协程处理:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (s *Server) Run(ip string, port int, level int) {
if !trans.IsIpValid(ip) || !trans.IsPortValid(port) {
return
}
var err error
s.listener, err = net.Listen("tcp", fmt.Sprintf("%s:%d", ip, port))
if err != nil {
fmt.Println("test server starting with an error, break down...")
return
}
defer s.listener.Close()
s.loginUserList = make([]cs.User, 0, conf.START_USER_LIST)
for {
sconn, err := s.listener.Accept()
if err != nil {
fmt.Println("Error accepting", err.Error())
continue
}
fmt.Println("Rececive connection request from",
sconn.RemoteAddr().String())
go s.Communicate(sconn, uint8(level))
}
}
  • 在运行开始时,还应当对在线用户列表做初始化,并将服务器放置在 for 循环中无线监听。当有请求接入,服务器创建一个新的协程来处理请求。传给协程的参数是对应的连接和安全等级。

处理未授权请求

  • Communicate 函数用于处理接入服务器但未经授权的请求。服务器要和对方做一定交互以确定远端身份,不合法时要即时断开腾出资源,多次不合法时也要采取防范攻击的措施。鉴于我们设计的云存储仅仅是练习,并不会投入生产中,因此暂不考虑诸如 SYN 洪水攻击等 DDOS 可能性。在后面我会展示 DDOS 攻击对我设计的云存储系统带来的打击。
  • Communicate 函数接受与远端的连接以及安全等级。它将按照此前设计的协议与远端交互:
    • 生成一个随机 token,发送给客户端
    • 客户端使用此 token 和服务器做一系列交互,并确定是否合法(login 函数)
    • 返回授权结果
  • Communicate 函数的 Go 语言实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (s *Server) Communicate(conn net.Conn, level uint8) {
var err error
s_token := auth.GenerateToken(level)
length, err := conn.Write([]byte(s_token))
fmt.Println("send toekn", string(s_token))
if length != conf.TOKEN_LENGTH(level) ||
err != nil {
return
}
mainT := trans.NewTransmitter(conn, conf.AUTHEN_BUFSIZE, s_token)
rc, mode := s.Login(mainT)
if rc == nil || mode == -1 {
mainT.Destroy()
return
}
if !mainT.SendBytes(s_token) {
return
}
if mode == 0 {
rc.SetToken(string(s_token)) // 为新登录用户设置 token
s.AddUser(rc) // 加入在线用户列表
rc.DealWithRequests(s.db) // 处理用户请求
rc.Logout() // 登出用户
s.RemoveUser(rc) // 从在线用户列表移除该用户
} else if mode == 1 && mainT.SetBuflen(conf.BUFLEN) && rc.AddTransmit(mainT) {
rc.DealWithTransmission(s.db, mainT)
} else if mode != 2 {
mainT.Destroy()
fmt.Println("Remote client not valid")
}
}
  • 上面的代码中,10 行以前均为服务器生成 token 并发送 token。之后,服务器生成了一个 transmitter 来交给 Login 函数验证该连接合法性。Login 函数会返回一个指向用户的指针,以及一个 int 类型的 codecode 说明了验证的结果:
    • code 为 0:用户登陆成功
    • code 为 1:用户已经登陆,当前连接是用于传输长数据流的连接
    • code 为 2:用户已经登录,当前连接是用于被动监听广播的连接
    • code 为-1:用户因为某种原因不合法(如密码不匹配,token不一致,数据库出错等)
  • DealWithRequests() 函数在之前已经介绍过一次,用来代理用户处理请求。该函数内部是一个 for 循环,不断监听用户发送的命令,当用户选择退出时才会跳出循环。
  • Login 函数完成了 Communicate 函数验证连接是否合法的任务,它的实现会比较复杂,主要难度在对各种异常的容错上。
  • 将用户添加到在线列表和从在线列表中删除用户的函数非常简单,代码如下,不再解释:
1
2
3
4
5
6
7
8
9
10
11
12
13
func (s *Server) AddUser(u cs.User) {
s.loginUserList = cs.AppendUser(s.loginUserList, u)
}

func (s *Server) RemoveUser(u cs.User) bool {
for i, uc := range s.loginUserList {
if uc == u {
s.loginUserList = append(s.loginUserList[:i], s.loginUserList[i+1:]...)
return true
}
}
return false
}

校验用户身份

  • 我们还剩下最后一部分逻辑没有实现,即对用户登录身份的验证。
  • Login 函数非常复杂,因为它需要处理用户的任何不合法行为,以及对用户的登录模式做出判断。用中文将 Login 的函数流程表达如下:
    1. 服务器按照协议接收用户发送的用户名和密文的头部,共 24 字节,分别对应明文长度、包长度和用户名明文长度
    2. 服务器解析出包长度,并接收到完整的用户名+密码包,若接收时中断则退出
    3. 服务器解析出用户名和密码,若无法解析则退出
    4. 服务器检查已登录用户列表,是否存在该用户名,存在则转入活动连接判断模式,否则转 7 继续进行登录认证
    5. 服务器发现用户已登录,则认为该连接用来传输长数据流,或用来做被动监听
    6. 服务器对比解析出的密码和登录用户的 token 做比对,如果相同,则认为合法,否则退出。若相同,则观察该用户是否已具有 info 连接(即被动监听连接),若已存在 info 则认为该连接是长数据流传输,返回用户指针和模式 1;否则将该连接设置为用户的 info 并返回用户指针和模式 2
    7. 若第 4 步服务器检查登录用户列表并未发现该用户,则认为该用户未登录,此时在数据库中查找该用户,查找不到则退出
    8. 比对查找到的用户密码和用户传输的密码是否相同,不同则退出
    9. 用户密码相同,则统计用户已使用的云存储空间,并将已使用的空间设置到用户结构中
    10. 返回用户指针和 0,表示用户登录成功
  • Login 实现代码如下,大量篇幅用于 iferr 的处理上:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
func (s *Server) Login(t trans.Transmitable) (cs.User, int) {
// mode : failed=-1, new=0, transmission=1
chRate := time.Tick(1e3)
var recvL int64 = 0
var err error
recvL, err = t.RecvUntil(int64(24), recvL, chRate)
if err != nil {
fmt.Println("1 error:", err.Error())
return nil, -1
}
srcLength := auth.BytesToInt64(t.GetBuf()[:8])
encLength := auth.BytesToInt64(t.GetBuf()[8:16])
nmLength := auth.BytesToInt64(t.GetBuf()[16:24])
recvL, err = t.RecvUntil(encLength, recvL, chRate)
if err != nil {
fmt.Println("2 error:", err.Error())
return nil, -1
}
var nameApass []byte
nameApass, err = auth.AesDecode(t.GetBuf()[24:24+encLength], srcLength, t.GetBlock())
if err != nil {
fmt.Println("decode error:", err.Error())
return nil, -1
}
fmt.Println(string(nameApass[:nmLength]), string(nameApass[nmLength:]))

pc := cs.UserIndexByName(s.loginUserList, string(nameApass[:nmLength]))
// 该连接由已登陆用户建立
if pc != nil {
fmt.Println("userfined, ", pc.GetUsername())
fmt.Println("pc token is ", pc.GetToken())
if pc.GetToken() != string(nameApass[nmLength:]) {
fmt.Println("token verify error! not valid!")
return nil, -1
} else {
// background message receiver
if pc.GetInfos() == nil {
pc.SetInfos(t)
return pc, 2
} else {
// transmission
return pc, 1
}
}
}
// 该连接来自新用户
username := string(nameApass[:nmLength])
row := s.db.QueryRow(fmt.Sprintf("SELECT * FROM cuser where username='%s'", username))
if row == nil {
return nil, -1
}
var uid int
var susername string
var spassword string
var screated string

err = row.Scan(&uid, &susername, &spassword, &screated)
if err != nil || spassword != strings.ToUpper(string(nameApass[nmLength:])) {
return nil, -1
}
rc := cs.NewCUser(string(nameApass[:nmLength]), int64(uid), "/")
if rc == nil {
return nil, -1
}
rc.SetListener(t)
// 统计用户已使用的云存储空间
rows, err := s.db.Query(fmt.Sprintf("SELECT cfileid FROM ufile where ownerid=%d", uid))
if err != nil {
return nil, -1
}
defer rows.Close()
var cid, size int
var totalSize int64 = 0
for rows.Next() {
err = rows.Scan(&cid)
if err != nil || cid < 0{
continue
}
row = s.db.QueryRow(fmt.Sprintf("select size from cfile where uid=%d", cid))
if row == nil {
continue
}
err = row.Scan(&size)
if err != nil {
continue
}
totalSize += int64(size)
}
rc.SetUsed(int64(totalSize))
return rc, 0
}
  • 上面的代码基本遵循中文描述的过程,细节方面实现可能有顺序偏差,但大体一致。注意 Login 中对数据库查询出现错误的不同处理,在不影响用户正常行为逻辑的情况下,无所谓的错误可以用 continue 跳过。服务器会按一定周期检查有无遗失的文件记录。

专栏目录:顶点云(应用)设计与实现
此专栏的上一篇文章:顶点云(应用)传输、认证单元测试
此专栏的下一篇文章:顶点云(应用)用户代理

原创作品,允许转载,转载时无需告知,但请务必以超链接形式标明文章原始出处(http://blog.forec.cn/2016/11/23/zenith-cloud-6/) 、作者信息(Forec)和本声明。

分享到