first commit
This commit is contained in:
254
net/clientconn.go
Normal file
254
net/clientconn.go
Normal file
@@ -0,0 +1,254 @@
|
||||
package net
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/go-think/openssl"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
"github.com/gorilla/websocket"
|
||||
"log/slog"
|
||||
"slgserver/constant"
|
||||
"slgserver/util"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// 客户端连接
|
||||
type ClientConn struct {
|
||||
wsSocket *websocket.Conn // 底层websocket
|
||||
isClosed bool
|
||||
Seq int64
|
||||
onClose func(conn*ClientConn)
|
||||
onPush func(conn*ClientConn, body*RspBody)
|
||||
//链接属性
|
||||
property map[string]interface{}
|
||||
//保护链接属性修改的锁
|
||||
propertyLock sync.RWMutex
|
||||
syncCtxs map[int64]*syncCtx
|
||||
syncLock sync.RWMutex
|
||||
handshakeChan chan bool
|
||||
handshake bool
|
||||
}
|
||||
|
||||
func NewClientConn(wsSocket *websocket.Conn) *ClientConn {
|
||||
conn := &ClientConn{
|
||||
wsSocket: wsSocket,
|
||||
isClosed: false,
|
||||
property: make(map[string]interface{}),
|
||||
Seq: 0,
|
||||
syncCtxs: make(map[int64]*syncCtx),
|
||||
handshakeChan: make(chan bool),
|
||||
}
|
||||
|
||||
return conn
|
||||
}
|
||||
|
||||
func (this *ClientConn) waitHandshake() bool{
|
||||
if this.handshake == false{
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
select {
|
||||
case _ = <-this.handshakeChan:{
|
||||
slog.Info("recv handshakeChan")
|
||||
return true
|
||||
}
|
||||
case <-ctx.Done():{
|
||||
slog.Info("recv handshakeChan timeout")
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (this *ClientConn) Start() bool{
|
||||
this.handshake = false
|
||||
go this.wsReadLoop()
|
||||
return this.waitHandshake()
|
||||
}
|
||||
|
||||
func (this *ClientConn) Addr() string {
|
||||
return this.wsSocket.RemoteAddr().String()
|
||||
}
|
||||
|
||||
func (this *ClientConn) Push(name string, data interface{}) {
|
||||
rsp := &WsMsgRsp{Body: &RspBody{Name: name, Msg: data, Seq: 0}}
|
||||
this.write(rsp.Body)
|
||||
}
|
||||
|
||||
func (this *ClientConn) Send(name string, data interface{}) *RspBody{
|
||||
this.syncLock.Lock()
|
||||
sync := newSyncCtx()
|
||||
this.Seq += 1
|
||||
seq := this.Seq
|
||||
req := ReqBody{Name: name, Msg: data, Seq: seq}
|
||||
this.syncCtxs[this.Seq] = sync
|
||||
this.syncLock.Unlock()
|
||||
|
||||
rsp := &RspBody{Code: constant.OK, Name: name, Seq: seq }
|
||||
err := this.write(req)
|
||||
if err != nil{
|
||||
sync.cancel()
|
||||
}else{
|
||||
r := sync.wait()
|
||||
if r == nil{
|
||||
rsp.Code = constant.ProxyConnectError
|
||||
}else{
|
||||
rsp = r
|
||||
}
|
||||
}
|
||||
|
||||
this.syncLock.Lock()
|
||||
delete(this.syncCtxs, seq)
|
||||
this.syncLock.Unlock()
|
||||
|
||||
return rsp
|
||||
}
|
||||
|
||||
func (this *ClientConn) wsReadLoop() {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
e := fmt.Sprintf("%v", err)
|
||||
slog.Error("wsReadLoop error", "err", e)
|
||||
this.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
// 读一个message
|
||||
_, data, err := this.wsSocket.ReadMessage()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
data, err = util.UnZip(data)
|
||||
if err != nil {
|
||||
slog.Error("wsReadLoop UnZip error", "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
//需要检测是否有加密
|
||||
body := &RspBody{}
|
||||
if secretKey, err := this.GetProperty("secretKey"); err == nil {
|
||||
key := secretKey.(string)
|
||||
d, err := util.AesCBCDecrypt(data, []byte(key), []byte(key), openssl.ZEROS_PADDING)
|
||||
if err != nil {
|
||||
slog.Error("AesDecrypt error", "error", err)
|
||||
}else{
|
||||
data = d
|
||||
}
|
||||
}
|
||||
|
||||
if err := util.Unmarshal(data, body); err == nil {
|
||||
if body.Seq == 0 {
|
||||
if body.Name == HandshakeMsg{
|
||||
h := Handshake{}
|
||||
mapstructure.Decode(body.Msg, &h)
|
||||
slog.Info("client 收到握手协议", "data", string(data))
|
||||
if h.Key != ""{
|
||||
this.SetProperty("secretKey", h.Key)
|
||||
}else{
|
||||
this.RemoveProperty("secretKey")
|
||||
}
|
||||
this.handshake = true
|
||||
this.handshakeChan <- true
|
||||
}else{
|
||||
//推送,需要推送到指定的代理连接
|
||||
if this.onPush != nil{
|
||||
this.onPush(this, body)
|
||||
}else{
|
||||
slog.Warn("clientconn not deal push")
|
||||
}
|
||||
}
|
||||
}else{
|
||||
this.syncLock.RLock()
|
||||
s, ok := this.syncCtxs[body.Seq]
|
||||
this.syncLock.RUnlock()
|
||||
if ok {
|
||||
s.outChan <- body
|
||||
}else{
|
||||
slog.Warn("seq not found sync",
|
||||
"seq", body.Seq,
|
||||
"msgName", body.Name)
|
||||
}
|
||||
}
|
||||
|
||||
}else{
|
||||
slog.Error("wsReadLoop Unmarshal error", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
this.Close()
|
||||
}
|
||||
|
||||
|
||||
func (this *ClientConn) write(msg interface{}) error{
|
||||
data, err := util.Marshal(msg)
|
||||
if err == nil {
|
||||
if secretKey, err:= this.GetProperty("secretKey"); err == nil {
|
||||
key := secretKey.(string)
|
||||
slog.Info("secretKey", "secretKey", key)
|
||||
data, _ = util.AesCBCEncrypt(data, []byte(key), []byte(key), openssl.ZEROS_PADDING)
|
||||
}
|
||||
}else {
|
||||
slog.Error("wsWriteLoop Marshal body error", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if data, err := util.Zip(data); err == nil{
|
||||
if err := this.wsSocket.WriteMessage(websocket.BinaryMessage, data); err != nil {
|
||||
this.Close()
|
||||
return err
|
||||
}
|
||||
}else{
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *ClientConn) Close() {
|
||||
this.wsSocket.Close()
|
||||
if !this.isClosed {
|
||||
this.isClosed = true
|
||||
if this.onClose != nil{
|
||||
this.onClose(this)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//设置链接属性
|
||||
func (this *ClientConn) SetProperty(key string, value interface{}) {
|
||||
this.propertyLock.Lock()
|
||||
defer this.propertyLock.Unlock()
|
||||
|
||||
this.property[key] = value
|
||||
}
|
||||
|
||||
//获取链接属性
|
||||
func (this *ClientConn) GetProperty(key string) (interface{}, error) {
|
||||
this.propertyLock.RLock()
|
||||
defer this.propertyLock.RUnlock()
|
||||
|
||||
if value, ok := this.property[key]; ok {
|
||||
return value, nil
|
||||
} else {
|
||||
return nil, errors.New("no property found")
|
||||
}
|
||||
}
|
||||
|
||||
//移除链接属性
|
||||
func (this *ClientConn) RemoveProperty(key string) {
|
||||
this.propertyLock.Lock()
|
||||
defer this.propertyLock.Unlock()
|
||||
|
||||
delete(this.property, key)
|
||||
}
|
||||
|
||||
func (this *ClientConn) SetOnClose(hookFunc func (*ClientConn)) {
|
||||
this.onClose = hookFunc
|
||||
}
|
||||
|
||||
func (this *ClientConn) SetOnPush(hookFunc func (*ClientConn, *RspBody)) {
|
||||
this.onPush = hookFunc
|
||||
}
|
||||
Reference in New Issue
Block a user