288 lines
6.0 KiB
Go
288 lines
6.0 KiB
Go
package net
|
||
|
||
import (
|
||
"errors"
|
||
"fmt"
|
||
"log/slog"
|
||
"slgserver/util"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/go-think/openssl"
|
||
"github.com/mitchellh/mapstructure"
|
||
"github.com/gorilla/websocket"
|
||
)
|
||
|
||
// 客户端连接
|
||
type ServerConn struct {
|
||
wsSocket *websocket.Conn // 底层websocket
|
||
outChan chan *WsMsgRsp // 写队列
|
||
isClosed bool
|
||
needSecret bool
|
||
Seq int64
|
||
router *Router
|
||
beforeClose func(conn WSConn)
|
||
onClose func(conn WSConn)
|
||
//链接属性
|
||
property map[string]interface{}
|
||
//保护链接属性修改的锁
|
||
propertyLock sync.RWMutex
|
||
closeChan chan struct{}
|
||
closeOnce sync.Once
|
||
}
|
||
|
||
func NewServerConn(wsSocket *websocket.Conn, needSecret bool) *ServerConn {
|
||
conn := &ServerConn{
|
||
wsSocket: wsSocket,
|
||
outChan: make(chan *WsMsgRsp, 1000),
|
||
isClosed: false,
|
||
property: make(map[string]interface{}),
|
||
needSecret: needSecret,
|
||
Seq: 0,
|
||
closeChan: make(chan struct{}),
|
||
}
|
||
|
||
return conn
|
||
}
|
||
|
||
// 开启异步
|
||
func (this *ServerConn) Start() {
|
||
go this.wsReadLoop()
|
||
go this.wsWriteLoop()
|
||
}
|
||
|
||
func (this *ServerConn) Addr() string {
|
||
return this.wsSocket.RemoteAddr().String()
|
||
}
|
||
|
||
func (this *ServerConn) Push(name string, data interface{}) {
|
||
rsp := &WsMsgRsp{Body: &RspBody{Name: name, Msg: data, Seq: 0}}
|
||
this.enqueue(rsp)
|
||
}
|
||
|
||
func (this *ServerConn) Send(name string, data interface{}) {
|
||
this.Seq += 1
|
||
rsp := &WsMsgRsp{Body: &RspBody{Name: name, Msg: data, Seq: this.Seq}}
|
||
this.enqueue(rsp)
|
||
}
|
||
|
||
func (this *ServerConn) wsReadLoop() {
|
||
defer func() {
|
||
if err := recover(); err != nil {
|
||
e := fmt.Sprintf("%v", err)
|
||
slog.Error("wsReadLoop error", "err", e)
|
||
this.Close()
|
||
}
|
||
}()
|
||
|
||
for {
|
||
select {
|
||
case <-this.closeChan:
|
||
return
|
||
default:
|
||
}
|
||
// 读一个message
|
||
_, data, err := this.wsSocket.ReadMessage()
|
||
slog.Debug("ws read message", "size", len(data))
|
||
if err != nil {
|
||
break
|
||
}
|
||
|
||
data, err = util.UnZip(data)
|
||
if err != nil {
|
||
slog.Error("wsReadLoop UnZip error", "error", err)
|
||
continue
|
||
}
|
||
|
||
body := &ReqBody{}
|
||
if this.needSecret {
|
||
//检测是否有加密,没有加密发起Handshake
|
||
if secretKey, err := this.GetProperty("secretKey"); err == nil {
|
||
key := secretKey.(string)
|
||
d, err := util.AesCBCDecrypt(data, []byte(key), []byte(key), openssl.ZEROS_PADDING)
|
||
if err != nil {
|
||
slog.Error("AesDecrypt error", "error", err)
|
||
this.Handshake()
|
||
} else {
|
||
data = d
|
||
}
|
||
} else {
|
||
slog.Info("secretKey not found client need handshake", "error", err)
|
||
this.Handshake()
|
||
return
|
||
}
|
||
}
|
||
|
||
if err := util.Unmarshal(data, body); err == nil {
|
||
req := &WsMsgReq{Conn: this, Body: body}
|
||
rsp := &WsMsgRsp{Body: &RspBody{Name: body.Name, Seq: req.Body.Seq}}
|
||
|
||
if req.Body.Name == HeartbeatMsg {
|
||
h := &Heartbeat{}
|
||
mapstructure.Decode(body.Msg, h)
|
||
h.STime = time.Now().UnixNano() / 1e6
|
||
rsp.Body.Msg = h
|
||
} else {
|
||
if this.router != nil {
|
||
this.router.Run(req, rsp)
|
||
}
|
||
}
|
||
this.outChan <- rsp
|
||
} else {
|
||
slog.Error("wsReadLoop Unmarshal error", "error", err)
|
||
this.Handshake()
|
||
}
|
||
}
|
||
|
||
this.Close()
|
||
}
|
||
|
||
func (this *ServerConn) wsWriteLoop() {
|
||
|
||
defer func() {
|
||
if err := recover(); err != nil {
|
||
slog.Error("wsWriteLoop error")
|
||
this.Close()
|
||
}
|
||
}()
|
||
|
||
for {
|
||
select {
|
||
case <-this.closeChan:
|
||
return
|
||
case msg := <-this.outChan:
|
||
if msg == nil {
|
||
continue
|
||
}
|
||
// 写给websocket
|
||
this.write(msg.Body)
|
||
}
|
||
}
|
||
}
|
||
|
||
func (this *ServerConn) write(msg interface{}) error {
|
||
data, err := util.Marshal(msg)
|
||
slog.Debug("ws write message", "size", len(data))
|
||
if err == nil {
|
||
if this.needSecret {
|
||
if secretKey, err := this.GetProperty("secretKey"); err == nil {
|
||
key := secretKey.(string)
|
||
data, _ = util.AesCBCEncrypt(data, []byte(key), []byte(key), openssl.ZEROS_PADDING)
|
||
}
|
||
}
|
||
} else {
|
||
slog.Error("wsWriteLoop Marshal body error", "error", err)
|
||
return err
|
||
}
|
||
|
||
if data, err := util.Zip(data); err == nil {
|
||
if err := this.wsSocket.WriteMessage(websocket.BinaryMessage, data); err != nil {
|
||
this.Close()
|
||
return err
|
||
}
|
||
} else {
|
||
return err
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (this *ServerConn) Close() {
|
||
this.closeOnce.Do(func() {
|
||
this.wsSocket.Close()
|
||
close(this.closeChan)
|
||
if !this.isClosed {
|
||
this.isClosed = true
|
||
|
||
if this.beforeClose != nil {
|
||
this.beforeClose(this)
|
||
}
|
||
|
||
if this.onClose != nil {
|
||
this.onClose(this)
|
||
}
|
||
}
|
||
})
|
||
}
|
||
|
||
// 设置链接属性
|
||
func (this *ServerConn) SetProperty(key string, value interface{}) {
|
||
this.propertyLock.Lock()
|
||
defer this.propertyLock.Unlock()
|
||
|
||
this.property[key] = value
|
||
}
|
||
|
||
// 获取链接属性
|
||
func (this *ServerConn) GetProperty(key string) (interface{}, error) {
|
||
this.propertyLock.RLock()
|
||
defer this.propertyLock.RUnlock()
|
||
|
||
if value, ok := this.property[key]; ok {
|
||
return value, nil
|
||
} else {
|
||
return nil, errors.New("no property found")
|
||
}
|
||
}
|
||
|
||
func (this *ServerConn) SetRouter(router *Router) {
|
||
this.router = router
|
||
}
|
||
|
||
func (this *ServerConn) SetOnClose(hookFunc func(WSConn)) {
|
||
this.onClose = hookFunc
|
||
}
|
||
|
||
func (this *ServerConn) SetOnBeforeClose(hookFunc func(WSConn)) {
|
||
this.beforeClose = hookFunc
|
||
}
|
||
|
||
// 移除链接属性
|
||
func (this *ServerConn) RemoveProperty(key string) {
|
||
this.propertyLock.Lock()
|
||
defer this.propertyLock.Unlock()
|
||
|
||
delete(this.property, key)
|
||
}
|
||
|
||
// 握手协议
|
||
func (this *ServerConn) Handshake() {
|
||
|
||
secretKey := ""
|
||
if this.needSecret {
|
||
key, err := this.GetProperty("secretKey")
|
||
if err == nil {
|
||
secretKey = key.(string)
|
||
} else {
|
||
secretKey = util.RandSeq(16)
|
||
}
|
||
}
|
||
|
||
handshake := &Handshake{Key: secretKey}
|
||
body := &RspBody{Name: HandshakeMsg, Msg: handshake}
|
||
if data, err := util.Marshal(body); err == nil {
|
||
if secretKey != "" {
|
||
this.SetProperty("secretKey", secretKey)
|
||
} else {
|
||
this.RemoveProperty("secretKey")
|
||
}
|
||
|
||
slog.Info("handshake secretKey",
|
||
"secretKey", secretKey)
|
||
|
||
if data, err = util.Zip(data); err == nil {
|
||
this.wsSocket.WriteMessage(websocket.BinaryMessage, data)
|
||
}
|
||
|
||
} else {
|
||
slog.Error("handshake Marshal body error", "error", err)
|
||
}
|
||
}
|
||
|
||
func (this *ServerConn) enqueue(rsp *WsMsgRsp) {
|
||
select {
|
||
case <-this.closeChan:
|
||
return
|
||
case this.outChan <- rsp:
|
||
}
|
||
}
|