英文:
goroutines have high idle wake up calls
问题
我正在使用GoLang运行两个WebSocket客户端(一个用于私有数据,一个用于公共数据),并使用goroutine同时运行它们。表面上看,一切似乎都正常工作。两个客户端都能接收来自WebSocket服务器传输的数据。然而,我认为可能是我设置了一些错误的东西,因为当我检查活动监视器时,我的程序始终有500-1500个空闲唤醒,并且使用了>200%的CPU。对于只有两个WebSocket客户端这样简单的任务来说,这似乎不正常。
我将代码放在片段中,这样阅读的内容就少一些(希望这样更容易理解),但如果你需要整个代码,我也可以发布。这是运行ws客户端的主要函数中的代码:
comms := make(chan os.Signal, 1)
signal.Notify(comms, os.Interrupt, syscall.SIGTERM)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
wg.Add(1)
go pubSocket.PubListen(ctx, &wg, &activeSubs, testing)
wg.Add(1)
go privSocket.PrivListen(ctx, &wg, &activeSubs, testing)
<-comms
cancel()
wg.Wait()
这是客户端运行goroutine的代码:
func (socket *Socket) PubListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {
defer wg.Done()
for {
select {
case <-ctx.Done():
log.Println("closing public socket")
socket.Close()
return
default:
socket.OnTextMessage = func(message string, socket Socket) {
log.Println(message)
pubJsonDecoder(message, testing)
//tradesParser(message);
}
}
}
}
func (socket *Socket) PrivListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {
defer wg.Done()
for {
select {
case <-ctx.Done():
log.Println("closing private socket")
socket.Close()
return
default:
socket.OnTextMessage = func(message string, socket Socket) {
log.Println(message)
}
}
}
}
关于为什么空闲唤醒次数那么高,有什么想法吗?我应该使用多线程而不是并发吗?提前感谢您的帮助!
英文:
I'm using GoLang to run two websocket clients (one for private and one for public data) simultaneously using goroutines. On the surface, everything seems to work fine. Both clients receive data transmitted from the websocket server. I believe I may have set something up wrong, however, since when I check activity monitor, my program consistently has between 500 - 1500 Idle Wake Ups and is using >200% of my CPU. This doesn't seem normal for something as simple as two websocket clients.
I've put the code in snippets so there's less to read (hopefully that makes it easier to understand), but if you need the entire code, I can post that as well. Here is the code in my main func that runs the ws clients
comms := make(chan os.Signal, 1)
signal.Notify(comms, os.Interrupt, syscall.SIGTERM)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
wg.Add(1)
go pubSocket.PubListen(ctx, &wg, &activeSubs, testing)
wg.Add(1)
go privSocket.PrivListen(ctx, &wg, &activeSubs, testing)
<- comms
cancel()
wg.Wait()
Here is the code for how the clients run the go routines
func (socket *Socket) PubListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {
defer wg.Done()
for {
select {
case <- ctx.Done():
log.Println("closing public socket")
socket.Close()
return
default:
socket.OnTextMessage = func(message string, socket Socket) {
log.Println(message)
pubJsonDecoder(message, testing)
//tradesParser(message);
}
}
}
}
func (socket *Socket) PrivListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {
defer wg.Done()
for {
select {
case <- ctx.Done():
log.Println("closing private socket")
socket.Close()
return
default:
socket.OnTextMessage = func(message string, socket Socket) {
log.Println(message)
}
}
}
}
Any ideas on why the Idle Wake Ups are so high? Should I be using multithreading instead of concurrency? Thanks in advance for any help!
答案1
得分: 3
你在这里浪费了CPU资源(多余的循环):
for {
// ...
default:
// 这里CPU使用率很高。
}
}
尝试像这样修改:
func (socket *Socket) PubListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {
defer wg.Done()
defer socket.Close()
socket.OnTextMessage = func(message string, socket Socket) {
log.Println(message)
pubJsonDecoder(message, testing)
//tradesParser(message);
}
<-ctx.Done()
log.Println("关闭公共套接字")
}
func (socket *Socket) PrivListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {
defer wg.Done()
defer socket.Close()
socket.OnTextMessage = func(message string, socket Socket) {
log.Println(message)
}
<-ctx.Done()
log.Println("关闭私有套接字")
}
这个链接可能也有帮助:
https://github.com/gorilla/websocket/blob/master/examples/chat/client.go
英文:
You're wasting CPU here (superfluous loop):
for {
// ...
default:
// High CPU usage here.
}
}
Try something like this:
func (socket *Socket) PubListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {
defer wg.Done()
defer socket.Close()
socket.OnTextMessage = func(message string, socket Socket) {
log.Println(message)
pubJsonDecoder(message, testing)
//tradesParser(message);
}
<-ctx.Done()
log.Println("closing public socket")
}
func (socket *Socket) PrivListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {
defer wg.Done()
defer socket.Close()
socket.OnTextMessage = func(message string, socket Socket) {
log.Println(message)
}
<-ctx.Done()
log.Println("closing private socket")
}
Also this may help:
https://github.com/gorilla/websocket/blob/master/examples/chat/client.go
答案2
得分: 2
简要概述:Websockets很难处理
看起来你可能有几个旋转器。你在for-select语句的默认情况下为OnTextMessage()分配了处理函数。默认情况下,如果没有其他情况准备好执行,那么默认情况总是会执行。因为默认情况下没有阻塞的内容,所以这个for循环会失控。这两个无限循环的goroutine可能会占用2个核心。Websockets是网络IO,这些goroutine很可能会并行运行。这就是为什么你看到200%的利用率。
看一下gorilla/websocket库。我不会说它比其他任何websocket库好还是不好,但我对它有很多经验。
https://github.com/gorilla/websocket
下面是我多次使用的一个实现。
它的设置方式是注册在接收到特定消息时触发的处理程序函数。比如你的消息中的某个值是"type":"start-job",websocket服务器将调用你为"start-job" websocket消息分配的处理程序。感觉就像为http路由器编写端点一样。
Package serverws
context.go
package serverws
import (
"errors"
"fmt"
"strings"
"sync"
)
// ConnContext 是连接上下文,用于跟踪连接的websocket用户
type ConnContext struct {
specialKey string
supportGzip string
UserID string
mu sync.Mutex // Websockets are not thread safe, we'll use a mutex to lock writes.
}
// HashKeyAsCtx 根据提供的哈希返回一个ConnContext
func HashKeyAsCtx(hashKey string) (*ConnContext, error) {
values := strings.Split(hashKey, ":")
if len(values) != 3 {
return nil, errors.New("Invalid Key received: " + hashKey)
}
return &ConnContext{values[0], values[1], values[2], sync.Mutex{}}, nil
}
// AsHashKey 返回给定连接上下文ConnContext的哈希键
func (ctx *ConnContext) AsHashKey() string {
return strings.Join([]string{ctx.specialKey, ctx.supportGzip, ctx.UserID}, ":")
}
// String 返回给定连接上下文ConnContext的哈希字符串
func (ctx *ConnContext) String() string {
return fmt.Sprint("specialkey: ", ctx.specialKey, " gzip ", ctx.supportGzip, " auth ", ctx.UserID)
}
wshandler.go
package serverws
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/rs/zerolog/log"
)
var (
receiveFunctionMap = make(map[string]ReceiveObjectFunc)
ctxHashMap sync.Map
)
// ReceiveObjectFunc 是websocket请求处理程序的函数签名
type ReceiveObjectFunc func(conn *websocket.Conn, ctx *ConnContext, t map[string]interface{})
// WebSocketHandler 处理WebSockets(使其对我们来说更容易处理)
type WebSocketHandler struct {
wsupgrader websocket.Upgrader
}
// WebSocketMessage 是通过websocket发送的消息。消息必须具有一个conversation type,以便服务器和客户端JS知道正在讨论什么以及在服务器和客户端上引发什么信号。
// "Notification"消息指示客户端显示警报弹出窗口。
type WebSocketMessage struct {
MessageType string `json:"type"`
Message interface{} `json:"message"`
}
// NewWebSocketHandler 设置一个新的websocket。
func NewWebSocketHandler() *WebSocketHandler {
wsh := new(WebSocketHandler)
wsh.wsupgrader = websocket.Upgrader{
ReadBufferSize: 4096,
WriteBufferSize: 4096,
}
return wsh
}
// RegisterMessageType 为消息类型设置一个事件总线。当从客户端到达与messageTypeName匹配的消息时,然后调用处理该消息的函数。
func (wsh *WebSocketHandler) RegisterMessageType(messageTypeName string, f ReceiveObjectFunc) {
receiveFunctionMap[messageTypeName] = f
}
// onMessage 在底层websocket接收到消息时触发。
func (wsh *WebSocketHandler) onMessage(conn *websocket.Conn, ctx *ConnContext, msg []byte, msgType int) {
// 处理文本消息或二进制消息。二进制通常是一些gzip文本。
if msgType == websocket.TextMessage {
wsh.processIncomingTextMsg(conn, ctx, msg)
}
if msgType == websocket.BinaryMessage {
}
}
// onOpen 在底层websocket建立连接时触发。
func (wsh *WebSocketHandler) onOpen(conn *websocket.Conn, r *http.Request) (ctx *ConnContext, err error) {
//user, err := gothic.GetFromSession("ID", r)
user := "TestUser"
if err := r.ParseForm(); err != nil {
return nil, errors.New("parameter check error")
}
specialKey := r.FormValue("specialKey")
supportGzip := r.FormValue("support_gzip")
if user != "" && err == nil {
ctx = &ConnContext{specialKey, supportGzip, user, sync.Mutex{}}
} else {
ctx = &ConnContext{specialKey, supportGzip, "", sync.Mutex{}}
}
keyString := ctx.AsHashKey()
if oldConn, ok := ctxHashMap.Load(keyString); ok {
wsh.onClose(oldConn.(*websocket.Conn), ctx)
oldConn.(*websocket.Conn).Close()
}
ctxHashMap.Store(keyString, conn)
return ctx, nil
}
// onClose 在底层websocket关闭时触发
func (wsh *WebSocketHandler) onClose(conn *websocket.Conn, ctx *ConnContext) {
//log.Info().Msg(("client close itself as " + ctx.String()))
wsh.closeConnWithCtx(ctx)
}
// onError 在websocket连接中断时触发
func (wsh *WebSocketHandler) onError(errMsg string) {
//log.Error().Msg(errMsg)
}
// HandleConn 当用户连接到我们的监听点时发生。我们要求用户进行身份验证,然后发送所需的HTTP Upgrade返回代码。
func (wsh *WebSocketHandler) HandleConn(w http.ResponseWriter, r *http.Request) {
user := ""
if r.URL.Path == "/websocket" {
user = "TestUser" // authenticate however you want
if user == "" {
fmt.Println("UNAUTHENTICATED USER TRIED TO CONNECT TO WEBSOCKET FROM ", r.Header.Get("X-Forwarded-For"))
return
}
}
// 不要这样做。你需要检查origin,但这里只是一个占位符
wsh.wsupgrader.CheckOrigin = func(r *http.Request) bool {
return true
}
conn, err := wsh.wsupgrader.Upgrade(w, r, nil)
if err != nil {
log.Error().Msg("Failed to set websocket upgrade: " + err.Error())
return
}
defer conn.Close()
ctx, err := wsh.onOpen(conn, r)
if err != nil {
log.Error().Msg("Open connection failed " + err.Error() + r.URL.RawQuery)
if user != "" {
ctx.UserID = user
}
return
}
if user != "" {
ctx.UserID = user
}
conn.SetPingHandler(func(message string) error {
conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second))
return nil
})
// Message pump for the underlying websocket connection
for {
t, msg, err := conn.ReadMessage()
if err != nil {
// Read errors are when the user closes the tab. Ignore.
wsh.onClose(conn, ctx)
return
}
switch t {
case websocket.TextMessage, websocket.BinaryMessage:
wsh.onMessage(conn, ctx, msg, t)
case websocket.CloseMessage:
wsh.onClose(conn, ctx)
return
case websocket.PingMessage:
case websocket.PongMessage:
}
}
}
func (wsh *WebSocketHandler) closeConnWithCtx(ctx *ConnContext) {
keyString := ctx.AsHashKey()
ctxHashMap.Delete(keyString)
}
func (wsh *WebSocketHandler) processIncomingTextMsg(conn *websocket.Conn, ctx *ConnContext, msg []byte) {
//log.Debug().Msg("CLIENT SAID " + string(msg))
data := WebSocketMessage{}
// 尝试将其转换为数据
err := json.Unmarshal(msg, &data)
// 尝试获取底层数据
var raw = make(map[string]interface{})
terr := json.Unmarshal(msg, &raw)
if err == nil {
// 这是什么类型的消息?
if receiveFunctionMap[data.MessageType] != nil {
// 我们将尝试将此消息转换并调用其处理程序
if terr == nil {
if v, ok := raw["message"].(map[string]interface{}); ok {
receiveFunctionMap[data.MessageType](conn, ctx, v)
} else {
log.Debug().Msg("Nonsense sent over the websocket.")
}
} else {
log.Debug().Msg("Nonsense sent over the websocket.")
}
}
} else {
// 从传输器接收到垃圾。
}
}
// SendJSONToSocket 向特定的websocket发送特定的消息
func (wsh *WebSocketHandler) SendJSONToSocket(socketID string, msg interface{}) {
fields := strings.Split(socketID, ":")
message, _ := json.Marshal(msg)
ctxHashMap.Range(func(key interface{}, value interface{}) bool {
if ctx, err := HashKeyAsCtx(key.(string)); err != nil {
wsh.onError(err.Error())
} else {
if ctx.specialKey == fields[0] {
ctx.mu.Lock()
if value != nil {
err = value.(*websocket.Conn).WriteMessage(websocket.TextMessage, message)
}
ctx.mu.Unlock()
}
if err != nil {
ctx.mu.Lock() // We'll lock here even though we're going to destroy this
wsh.onClose(value.(*websocket.Conn), ctx)
value.(*websocket.Conn).Close()
ctxHashMap.Delete(key) // Remove the websocket immediately
//wsh.onError("WRITE ERR TO USER " + key.(string) + " ERR: " + err.Error())
}
}
return true
})
}
package wsocket
types.go
package wsocket
// Acknowledgement 用于ACK简单消息和发送错误
type Acknowledgement struct {
ResponseID string `json:"responseId"`
Status string `json:"status"`
IPAddress string `json:"ipaddress"`
ErrorText string `json:"errortext"`
}
wsocket.go
package wsocket
import (
"fmt"
server "project/serverws"
"sync"
"time"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/inconshreveable/log15"
)
var (
WebSocket *server.WebSocketHandler // So other packages can send out websocket messages
WebSocketLocation string
Log log15.Logger = log15.New("package", "wsocket")
)
func SetupWebsockets(r *gin.Engine, socket *server.WebSocketHandler, debug_mode bool) {
WebSocket = socket
WebSocketLocation = "example.mydomain.com"
//WebSocketLocation = "example.mydomain.com"
r.GET("/websocket", func(c *gin.Context) {
socket.HandleConn(c.Writer, c.Request)
})
socket.RegisterMessageType("Hello", func(conn *websocket.Conn, ctx *server.ConnContext, data map[string]interface{}) {
response := Acknowledgement{
ResponseID: "Hello",
Status: fmt.Sprintf("OK/%v", ctx.AuthID),
IPAddress: conn.RemoteAddr().String(),
}
// mapstructure.Decode(data, &request) -- used if we wanted to read what was fed in
socket.SendJSONToSocket(ctx.AsHashKey(), &response)
})
socket.RegisterMessageType("start-job", func(conn *websocket.Conn, ctx *server.ConnContext, data map[string]interface{}) {
response := Acknowledgement{
ResponseID: "starting_job",
Status: fmt.Sprintf("%s is being dialed.", data["did"]),
IPAddress: conn.RemoteAddr().String(),
}
// mapstructure.Decode(data, &request) -- used if we wanted to read what was fed in to a struct.
socket.SendJSONToSocket(ctx.AsHashKey(), &response)
})
}
这个实现是为一个Web应用程序而设计的。这是JavaScript中客户端的简化版本。你可以使用这个实现处理许多并发连接,你只需要定义包含与switch语句中的一个case匹配的responseID的对象/结构,它基本上是一个很长的switch语句,将其序列化并发送到另一端,另一端将进行确认。我在几个生产环境中都有这个实现的某个版本运行。
websocket.js
$(() => {
function wsMessage(object) {
switch (object.responseId) {
case "Hello": // HELLO! :-)
console.log("Heartbeat received, we're connected.");
break;
case "Notification":
if (object.errortext != "") {
$.notify({
// options
message: '<center><B><i class="fas fa-exclamation-triangle"></i> ' + object.errortext + '</B></center>',
}, {
// settings
type: 'danger',
offset: 50,
placement: {
align: 'center',
}
});
} else {
$.notify({
// options
message: '<center><B>' + object.status + '</B></center>',
}, {
// settings
type: 'success',
offset: 50,
placement: {
align: 'center',
}
});
}
break;
}
}
$(document).ready(function () {
function heartbeat() {
if (!websocket) return;
if (websocket.readyState !== 1) return;
websocket.send("{\"type\": \"Hello\", \"message\": { \"RequestID\": \"Hello\", \"User\":\"" + /*getCookie("_loginuser")*/"TestUser" + "\"} }");
setTimeout(heartbeat, 24000);
}
//TODO: CHANGE TO WSS once tls is enabled.
function wireUpWebsocket() {
websocket = new WebSocket('wss://' + WEBSOCKET_LOCATION + '/websocket?specialKey=' + WEBSOCKET_KEY + '&support_gzip=0');
websocket.onopen = function (event) {
console.log("Websocket connected.");
heartbeat();
//if it exists
if (typeof (wsReady) !== 'undefined') {
//execute it
wsReady();
}
};
websocket.onerror = function (event) {
console.log("WEBSOCKET ERROR " + event.data);
};
websocket.onmessage = function (event) {
wsMessage(JSON.parse(event.data));
};
websocket.onclose = function () {
// Don't close!
// Replace key
console.log("WEBSOCKET CLOSED");
WEBSOCKET_KEY = Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15);
websocketreconnects++;
if (websocketreconnects > 30) { // Too much, time to bounce
// location.reload(); Don't reload the page anymore, just re-connect.
}
setTimeout(function () { wireUpWebsocket(); }, 3000);
};
}
wireUpWebsocket();
});
});
function getCookie(name) {
var value = "; " + document.cookie;
var parts = value.split("; " + name + "=");
if (parts.length == 2) return parts.pop().split(";").shift();
}
function setCookie(cname, cvalue, exdays) {
var d = new Date();
d.setTime(d.getTime() + (exdays * 24 * 60 * 60 * 1000));
var expires = "expires=" + d.toUTCString();
document.cookie = cname + "=" + cvalue + ";" + expires + ";path=/";
}
在一个无限循环中反复分配处理函数肯定行不通。
英文:
tl/dr: websockets are hard
It looks like you might have a couple of spinners. You are assigning the handler function for OnTextMessage() in the default case of a for - select statement. The default case always executes if no other cases are ready. Because there is nothing that blocks in the default case, that for loop just spins out of control. Both goroutines spinning like this will likely peg 2 cores. Websockets are network IO and those goroutines are likely to run in parallel. This is why you are seeing 200% utilization.
Take a look at the gorilla/websocket library. I'm not going to say that it is better or worse than any other websocket library, I have a lot of experience with it.
https://github.com/gorilla/websocket
Below is an implementation that I have used many times.
The way it is set up is you register handler functions that are triggered when a certain message is received. Say one of the values in your message was "type" : "start-job", the websocket server will call the handler you assigned to the "start-job" websocket message. It feels like writing endpoints for an http router.
Package serverws
context.go
package serverws
import (
"errors"
"fmt"
"strings"
"sync"
)
// ConnContext is the connection context to track a connected websocket user
type ConnContext struct {
specialKey string
supportGzip string
UserID string
mu sync.Mutex // Websockets are not thread safe, we'll use a mutex to lock writes.
}
// HashKeyAsCtx returns a ConnContext based on the hash provided
func HashKeyAsCtx(hashKey string) (*ConnContext, error) {
values := strings.Split(hashKey, ":")
if len(values) != 3 {
return nil, errors.New("Invalid Key received: " + hashKey)
}
return &ConnContext{values[0], values[1], values[2], sync.Mutex{}}, nil
}
// AsHashKey returns the hash key for a given connection context ConnContext
func (ctx *ConnContext) AsHashKey() string {
return strings.Join([]string{ctx.specialKey, ctx.supportGzip, ctx.UserID}, ":")
}
// String returns a string of the hash of a given connection context ConnContext
func (ctx *ConnContext) String() string {
return fmt.Sprint("specialkey: ", ctx.specialKey, " gzip ", ctx.supportGzip, " auth ", ctx.UserID)
}
wshandler.go
package serverws
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/rs/zerolog/log"
)
var (
receiveFunctionMap = make(map[string]ReceiveObjectFunc)
ctxHashMap sync.Map
)
// ReceiveObjectFunc is a function signature for a websocket request handler
type ReceiveObjectFunc func(conn *websocket.Conn, ctx *ConnContext, t map[string]interface{})
// WebSocketHandler does what it says, handles WebSockets (makes them easier for us to deal with)
type WebSocketHandler struct {
wsupgrader websocket.Upgrader
}
// WebSocketMessage that is sent over a websocket. Messages must have a conversation type so the server and the client JS know
// what is being discussed and what signals to raise on the server and the client.
// The "Notification" message instructs the client to display an alert popup.
type WebSocketMessage struct {
MessageType string `json:"type"`
Message interface{} `json:"message"`
}
// NewWebSocketHandler sets up a new websocket.
func NewWebSocketHandler() *WebSocketHandler {
wsh := new(WebSocketHandler)
wsh.wsupgrader = websocket.Upgrader{
ReadBufferSize: 4096,
WriteBufferSize: 4096,
}
return wsh
}
// RegisterMessageType sets up an event bus for a message type. When messages arrive from the client that match messageTypeName,
// the function you wrote to handle that message is then called.
func (wsh *WebSocketHandler) RegisterMessageType(messageTypeName string, f ReceiveObjectFunc) {
receiveFunctionMap[messageTypeName] = f
}
// onMessage triggers when the underlying websocket has received a message.
func (wsh *WebSocketHandler) onMessage(conn *websocket.Conn, ctx *ConnContext, msg []byte, msgType int) {
// Handling text messages or binary messages. Binary is usually some gzip text.
if msgType == websocket.TextMessage {
wsh.processIncomingTextMsg(conn, ctx, msg)
}
if msgType == websocket.BinaryMessage {
}
}
// onOpen triggers when the underlying websocket has established a connection.
func (wsh *WebSocketHandler) onOpen(conn *websocket.Conn, r *http.Request) (ctx *ConnContext, err error) {
//user, err := gothic.GetFromSession("ID", r)
user := "TestUser"
if err := r.ParseForm(); err != nil {
return nil, errors.New("parameter check error")
}
specialKey := r.FormValue("specialKey")
supportGzip := r.FormValue("support_gzip")
if user != "" && err == nil {
ctx = &ConnContext{specialKey, supportGzip, user, sync.Mutex{}}
} else {
ctx = &ConnContext{specialKey, supportGzip, "", sync.Mutex{}}
}
keyString := ctx.AsHashKey()
if oldConn, ok := ctxHashMap.Load(keyString); ok {
wsh.onClose(oldConn.(*websocket.Conn), ctx)
oldConn.(*websocket.Conn).Close()
}
ctxHashMap.Store(keyString, conn)
return ctx, nil
}
// onClose triggers when the underlying websocket has been closed down
func (wsh *WebSocketHandler) onClose(conn *websocket.Conn, ctx *ConnContext) {
//log.Info().Msg(("client close itself as " + ctx.String()))
wsh.closeConnWithCtx(ctx)
}
// onError triggers when a websocket connection breaks
func (wsh *WebSocketHandler) onError(errMsg string) {
//log.Error().Msg(errMsg)
}
// HandleConn happens when a user connects to us at the listening point. We ask
// the user to authenticate and then send the required HTTP Upgrade return code.
func (wsh *WebSocketHandler) HandleConn(w http.ResponseWriter, r *http.Request) {
user := ""
if r.URL.Path == "/websocket" {
user = "TestUser" // authenticate however you want
if user == "" {
fmt.Println("UNAUTHENTICATED USER TRIED TO CONNECT TO WEBSOCKET FROM ", r.Header.Get("X-Forwarded-For"))
return
}
}
// don't do this. You need to check the origin, but this is here as a place holder
wsh.wsupgrader.CheckOrigin = func(r *http.Request) bool {
return true
}
conn, err := wsh.wsupgrader.Upgrade(w, r, nil)
if err != nil {
log.Error().Msg("Failed to set websocket upgrade: " + err.Error())
return
}
defer conn.Close()
ctx, err := wsh.onOpen(conn, r)
if err != nil {
log.Error().Msg("Open connection failed " + err.Error() + r.URL.RawQuery)
if user != "" {
ctx.UserID = user
}
return
}
if user != "" {
ctx.UserID = user
}
conn.SetPingHandler(func(message string) error {
conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second))
return nil
})
// Message pump for the underlying websocket connection
for {
t, msg, err := conn.ReadMessage()
if err != nil {
// Read errors are when the user closes the tab. Ignore.
wsh.onClose(conn, ctx)
return
}
switch t {
case websocket.TextMessage, websocket.BinaryMessage:
wsh.onMessage(conn, ctx, msg, t)
case websocket.CloseMessage:
wsh.onClose(conn, ctx)
return
case websocket.PingMessage:
case websocket.PongMessage:
}
}
}
func (wsh *WebSocketHandler) closeConnWithCtx(ctx *ConnContext) {
keyString := ctx.AsHashKey()
ctxHashMap.Delete(keyString)
}
func (wsh *WebSocketHandler) processIncomingTextMsg(conn *websocket.Conn, ctx *ConnContext, msg []byte) {
//log.Debug().Msg("CLIENT SAID " + string(msg))
data := WebSocketMessage{}
// try to turn this into data
err := json.Unmarshal(msg, &data)
// And try to get at the data underneath
var raw = make(map[string]interface{})
terr := json.Unmarshal(msg, &raw)
if err == nil {
// What kind of message is this?
if receiveFunctionMap[data.MessageType] != nil {
// We'll try to cast this message and call the handler for it
if terr == nil {
if v, ok := raw["message"].(map[string]interface{}); ok {
receiveFunctionMap[data.MessageType](conn, ctx, v)
} else {
log.Debug().Msg("Nonsense sent over the websocket.")
}
} else {
log.Debug().Msg("Nonsense sent over the websocket.")
}
}
} else {
// Received garbage from the transmitter.
}
}
// SendJSONToSocket sends a specific message to a specific websocket
func (wsh *WebSocketHandler) SendJSONToSocket(socketID string, msg interface{}) {
fields := strings.Split(socketID, ":")
message, _ := json.Marshal(msg)
ctxHashMap.Range(func(key interface{}, value interface{}) bool {
if ctx, err := HashKeyAsCtx(key.(string)); err != nil {
wsh.onError(err.Error())
} else {
if ctx.specialKey == fields[0] {
ctx.mu.Lock()
if value != nil {
err = value.(*websocket.Conn).WriteMessage(websocket.TextMessage, message)
}
ctx.mu.Unlock()
}
if err != nil {
ctx.mu.Lock() // We'll lock here even though we're going to destroy this
wsh.onClose(value.(*websocket.Conn), ctx)
value.(*websocket.Conn).Close()
ctxHashMap.Delete(key) // Remove the websocket immediately
//wsh.onError("WRITE ERR TO USER " + key.(string) + " ERR: " + err.Error())
}
}
return true
})
}
package wsocket
types.go
package wsocket
// Acknowledgement is for ACKing simple messages and sending errors
type Acknowledgement struct {
ResponseID string `json:"responseId"`
Status string `json:"status"`
IPAddress string `json:"ipaddress"`
ErrorText string `json:"errortext"`
}
wsocket.go
package wsocket
import (
"fmt"
server "project/serverws"
"project/utils"
"sync"
"time"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
// "github.com/mitchellh/mapstructure"
"github.com/inconshreveable/log15"
)
var (
WebSocket *server.WebSocketHandler // So other packages can send out websocket messages
WebSocketLocation string
Log log15.Logger = log15.New("package", "wsocket"
)
func SetupWebsockets(r *gin.Engine, socket *server.WebSocketHandler, debug_mode bool) {
WebSocket = socket
WebSocketLocation = "example.mydomain.com"
//WebSocketLocation = "example.mydomain.com"
r.GET("/websocket", func(c *gin.Context) {
socket.HandleConn(c.Writer, c.Request)
})
socket.RegisterMessageType("Hello", func(conn *websocket.Conn, ctx *server.ConnContext, data map[string]interface{}) {
response := Acknowledgement{
ResponseID: "Hello",
Status: fmt.Sprintf("OK/%v", ctx.AuthID),
IPAddress: conn.RemoteAddr().String(),
}
// mapstructure.Decode(data, &request) -- used if we wanted to read what was fed in
socket.SendJSONToSocket(ctx.AsHashKey(), &response)
})
socket.RegisterMessageType("start-job", func(conn *websocket.Conn, ctx *server.ConnContext, data map[string]interface{}) {
response := Acknowledgement{
ResponseID: "starting_job",
Status: fmt.Sprintf("%s is being dialed.", data["did"]),
IPAddress: conn.RemoteAddr().String(),
}
// mapstructure.Decode(data, &request) -- used if we wanted to read what was fed in to a struct.
socket.SendJSONToSocket(ctx.AsHashKey(), &response)
})
This implementation was for a web application. This is a simplified version of the client side in javascript. You can handle many concurrent connections with this implementation and all you do to communicate is define objects/structs that contain a responseID that matches a case in the switch below, it is basically one long switch statement, serialize it and send it to the other side, and the other side will ack. I have some version of this running in several production environments.
websocket.js
$(() => {
function wsMessage(object) {
switch (object.responseId) {
case "Hello": // HELLO! :-)
console.log("Heartbeat received, we're connected.");
break;
case "Notification":
if (object.errortext != "") {
$.notify({
// options
message: '<center><B><i class="fas fa-exclamation-triangle"></i>&nbsp;&nbsp;' + object.errortext + '</B></center>',
}, {
// settings
type: 'danger',
offset: 50,
placement: {
align: 'center',
}
});
} else {
$.notify({
// options
message: '<center><B>' + object.status + '</B></center>',
}, {
// settings
type: 'success',
offset: 50,
placement: {
align: 'center',
}
});
}
break;
}
}
$(document).ready(function () {
function heartbeat() {
if (!websocket) return;
if (websocket.readyState !== 1) return;
websocket.send("{\"type\": \"Hello\", \"message\": { \"RequestID\": \"Hello\", \"User\":\"" + /*getCookie("_loginuser")*/"TestUser" + "\"} }");
setTimeout(heartbeat, 24000);
}
//TODO: CHANGE TO WSS once tls is enabled.
function wireUpWebsocket() {
websocket = new WebSocket('wss://' + WEBSOCKET_LOCATION + '/websocket?specialKey=' + WEBSOCKET_KEY + '&support_gzip=0');
websocket.onopen = function (event) {
console.log("Websocket connected.");
heartbeat();
//if it exists
if (typeof (wsReady) !== 'undefined') {
//execute it
wsReady();
}
};
websocket.onerror = function (event) {
console.log("WEBSOCKET ERROR " + event.data);
};
websocket.onmessage = function (event) {
wsMessage(JSON.parse(event.data));
};
websocket.onclose = function () {
// Don't close!
// Replace key
console.log("WEBSOCKET CLOSED");
WEBSOCKET_KEY = Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15);
websocketreconnects++;
if (websocketreconnects > 30) { // Too much, time to bounce
// location.reload(); Don't reload the page anymore, just re-connect.
}
setTimeout(function () { wireUpWebsocket(); }, 3000);
};
}
wireUpWebsocket();
});
});
function getCookie(name) {
var value = "; " + document.cookie;
var parts = value.split("; " + name + "=");
if (parts.length == 2) return parts.pop().split(";").shift();
}
function setCookie(cname, cvalue, exdays) {
var d = new Date();
d.setTime(d.getTime() + (exdays * 24 * 60 * 60 * 1000));
var expires = "expires=" + d.toUTCString();
document.cookie = cname + "=" + cvalue + ";" + expires + ";path=/";
}
Assigning handler functions over and over again in an infinite loop is definitely not going to work.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论