英文:
Golang socket.io server is malfunctioning for some reason
问题
在尝试使用socket.io进行一些测试后,我将一些Golang和Android Kotlin代码复制到了我的项目的服务器和客户端应用程序中。
我对原始代码唯一的修改是将socket服务器的启动作为一个"coroutine"来运行,因为只调用"StartSocket"似乎是一个阻塞函数。
更新后,我测试了代码是否仍然可以工作,结果是可以的。应用程序可以连接到服务器,应用程序也可以向服务器发送数据,而且我记得应用程序也可以接收来自服务器的数据。
当我重新构建应用程序时,服务器显示一个客户端断开连接的消息。现在,只有连接部分起作用。即使原始应用程序现在也无法发送数据,重新构建应用程序似乎也不会导致它断开连接。而且客户端会再次重复断开连接,但现在它是静默的,只有在终端上出现一个新的连接消息。在重复断开连接之前,至少会告诉一些原因导致客户端断开连接。
如果我使用原始代码运行"go run"并将其与原始应用程序配对,一切都正常。我使用"go build"构建了我的项目代码,但我怀疑这应该不会影响socket.io。我对几乎没有更改任何内容的情况感到困惑。
这是我的Golang代码:
package helpers
import (
"flag"
"fmt"
"log"
"net/http"
socketio "github.com/googollee/go-socket.io"
"github.com/googollee/go-socket.io/engineio"
"github.com/googollee/go-socket.io/engineio/transport"
"github.com/googollee/go-socket.io/engineio/transport/polling"
"github.com/googollee/go-socket.io/engineio/transport/websocket"
)
var allowOriginFunc = func(r *http.Request) bool {
return true
}
var (
port = flag.Int("socket_server_port", 8000, "Socket server port")
)
var sckServer *socketio.Server
const WARNING_TAG = "warning"
const ALERT_TAG = "alert"
const NAMESPACE = "notifications"
const SIMPLE_TAG = "simple"
const ROOM = "notif_room"
func StartSocket() {
flag.Parse()
sckServer = socketio.NewServer(&engineio.Options{
Transports: []transport.Transport{
&polling.Transport{
CheckOrigin: allowOriginFunc,
},
&websocket.Transport{
CheckOrigin: allowOriginFunc,
},
},
})
sckServer.OnConnect("/", func(s socketio.Conn) error {
s.SetContext("")
fmt.Println("connected:", s.ID())
s.Emit("notice", "new user connected")
return nil
})
sckServer.OnEvent("/", "notice", func(s socketio.Conn, msg string) {
fmt.Println("notice:", msg)
s.Emit("notice", "have "+msg)
})
sckServer.OnError("/", func(s socketio.Conn, e error) {
fmt.Println("socket error:", e)
})
sckServer.OnDisconnect("/", func(s socketio.Conn, reason string) {
fmt.Println("closed", reason)
})
go sckServer.Serve()
defer sckServer.Close()
http.Handle("/socket.io/", sckServer)
http.Handle("/", http.FileServer(http.Dir("./asset")))
fmt.Printf("Socket server serving at localhost:%d...\n", *port)
err := http.ListenAndServe(fmt.Sprintf(":%d", *port), nil)
if err != nil {
log.Fatalf("Failed to start socket server: %v\n", err)
}
}
func GetSocketSrv() *socketio.Server {
return sckServer
}
func BroadcastToTag(tag string, payload string) {
fmt.Printf("BroadcastToTag tag: %s, payload: %s\n", tag, payload)
if sckServer != nil {
broadcastStat := sckServer.BroadcastToNamespace(NAMESPACE, tag, payload)
fmt.Printf("broadcastStat: %v\n", broadcastStat)
} else {
fmt.Printf("sckServer = nil\n")
}
}
这是我的Android Kotlin代码:
import android.util.Log
import io.socket.client.IO
import io.socket.client.Socket
import io.socket.emitter.Emitter
import java.net.ConnectException
import java.net.URISyntaxException
class SocketHelper {
private lateinit var mSocket: Socket
private val onAlertNotif =
Emitter.Listener { args ->
Log.i(TAG, "onAlertNotif args: ${args[0]}")
}
private val onWarningNotif =
Emitter.Listener { args ->
Log.i(TAG, "onWarningNotif args: ${args[0]}")
}
private val onSimpleNotif =
Emitter.Listener { args ->
Log.i(TAG, "onSimpleNotif args: ${args[0]}")
}
init {
try {
mSocket = IO.socket("http://<local_ip>:8000/")
} catch (e: ConnectException) {
Log.e(TAG, "Socket ConnExc: ${e.localizedMessage}")
} catch (e: URISyntaxException) {
Log.e(TAG, "Socket URISynExc: ${e.localizedMessage}")
} catch (e: Exception) {
Log.e(TAG, "Socket Exc: ${e.localizedMessage}")
}
}
fun send(eventName: String, msg: String) {
mSocket.emit(eventName, msg)
}
fun open() {
mSocket.on("alert", onAlertNotif)
mSocket.on("warning", onWarningNotif)
mSocket.on("simple", onSimpleNotif)
mSocket.connect()
}
fun stop() {
mSocket.off()
mSocket.disconnect()
}
companion object {
const val TAG = "SocketHelper"
}
}
class MainActivity : AppCompatActivity() {
private val socketHelper = SocketHelper()
override fun onCreate(savedInstanceState: Bundle?) {
...
socketHelper.open()
}
override fun onDestroy() {
super.onDestroy()
socketHelper.stop()
}
}
更新:
作为更新,我还将服务器端的main.go分享给你们,因为它可能对你们有帮助:
package main
import (
"flag"
"fmt"
"log"
"net"
pb "github.com/<me>/<project_name>/api/proto/out"
cmmHelpers "github.com/<me>/<project_name>/cmd/commons/helpers"
"github.com/<me>/<project_name>/cmd/server/handlers"
"github.com/<me>/<project_name>/cmd/server/helpers"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
func init() {
cmmHelpers.DatabaseConnection()
}
var (
tls = flag.Bool("tls", true, "Connection uses TLS if true, else plain TCP")
certFile = flag.String("cert_file", "", "The TLS cert file")
keyFile = flag.String("key_file", "", "The TLS key file")
port = flag.Int("port", 50051, "The server port")
)
func main() {
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
var opts []grpc.ServerOption
if *tls {
if *certFile == "" {
*certFile = "service.pem"
}
if *keyFile == "" {
*keyFile = "service.key"
}
creds, err := credentials.NewServerTLSFromFile(*certFile, *keyFile)
if err != nil {
log.Fatalf("Failed to generate credentials: %v", err)
}
opts = []grpc.ServerOption{grpc.Creds(creds)}
}
mServ := grpc.NewServer(opts...)
fmt.Println("gRPC server running ...")
//一些与gRPC相关的样板代码
log.Printf("Server listening at %v", lis.Addr())
go helpers.StartSocket()
if err := mServ.Serve(lis); err != nil {
log.Fatalf("failed to serve : %v", err)
}
}
希望这些信息对你有帮助!
英文:
After making some test Golang and Android Kotlin code to try out socket.io, I copied those code to my project's server and client app.
The only difference I did from the original code is run the socket server start as a coroutine
because of necessity, as just calling StartSocket
seems to be essentially a blocking function.
After the update, I tested if the code still works, and it does. The app can connect to the server and the app can emit to the server as well, and IIRC the app can also receive the emit from the server.
When I rebuild the app, the server shows that a client disconnects. Now, only the connection part works. Even the original app can't emit now, rebuilding the app doesn't seem to disconnect it anymore as well. And the client gets disconnected repeatedly again, but now it happens silently, like only a new connected message appears on the terminal. Before the repeated disconnection at least tells some reason why the the client disconnects.
If I go run
the original code and pair it with the original app, everything works fine. I go build
my project's code, but I doubt that should affect socket.io. I'm at my wits ends as to why everything (mostly) doesn't work anymore when I barely changed anything.
Here's my Golang code:
package helpers
import (
"flag"
"fmt"
"log"
"net/http"
socketio "github.com/googollee/go-socket.io"
"github.com/googollee/go-socket.io/engineio"
"github.com/googollee/go-socket.io/engineio/transport"
"github.com/googollee/go-socket.io/engineio/transport/polling"
"github.com/googollee/go-socket.io/engineio/transport/websocket"
)
var allowOriginFunc = func(r *http.Request) bool {
return true
}
var (
port = flag.Int("socket_server_port", 8000, "Socket sckServer port")
)
var sckServer *socketio.Server
const WARNING_TAG = "warning"
const ALERT_TAG = "alert"
const NAMESPACE = "notifications"
const SIMPLE_TAG = "simple"
const ROOM = "notif_room"
func StartSocket() {
flag.Parse()
sckServer = socketio.NewServer(&engineio.Options{
Transports: []transport.Transport{
&polling.Transport{
CheckOrigin: allowOriginFunc,
},
&websocket.Transport{
CheckOrigin: allowOriginFunc,
},
},
})
sckServer.OnConnect("/", func(s socketio.Conn) error {
s.SetContext("")
fmt.Println("connected:", s.ID())
s.Emit("notice", "new user connected")
return nil
})
sckServer.OnEvent("/", "notice", func(s socketio.Conn, msg string) {
fmt.Println("notice:", msg)
s.Emit("notice", "have "+msg)
})
sckServer.OnError("/", func(s socketio.Conn, e error) {
fmt.Println("socket error:", e)
})
sckServer.OnDisconnect("/", func(s socketio.Conn, reason string) {
fmt.Println("closed", reason)
})
go sckServer.Serve()
defer sckServer.Close()
http.Handle("/socket.io/", sckServer)
http.Handle("/", http.FileServer(http.Dir("./asset")))
fmt.Printf("Socket sckServer serving at localhost:%d...\n", *port)
err := http.ListenAndServe(fmt.Sprintf(":%d", *port), nil)
if err != nil {
log.Fatalf("Failed to start socket sckServer: %v\n", err)
}
}
func GetSocketSrv() *socketio.Server {
return sckServer
}
func BroadcastToTag(tag string, payload string) {
fmt.Printf("BroadcastToTag tag: %s, payload: %s\n", tag, payload)
if sckServer != nil {
broadcastStat := sckServer.BroadcastToNamespace(NAMESPACE, tag, payload)
fmt.Printf("broadcastStat: %v\n", broadcastStat)
} else {
fmt.Printf("sckServer = nil\n")
}
}
And here's my Android Kotlin code:
import android.util.Log
import io.socket.client.IO
import io.socket.client.Socket
import io.socket.emitter.Emitter
import java.net.ConnectException
import java.net.URISyntaxException
class SocketHelper {
private lateinit var mSocket: Socket
private val onAlertNotif =
Emitter.Listener { args ->
Log.i(TAG, "onAlertNotif args: ${args[0]}")
}
private val onWarningNotif =
Emitter.Listener { args ->
Log.i(TAG, "onWarningNotif args: ${args[0]}")
}
private val onSimpleNotif =
Emitter.Listener { args ->
Log.i(TAG, "onSimpleNotif args: ${args[0]}")
}
init {
try {
mSocket = IO.socket("http://<local_ip>:8000/")
}catch (e: ConnectException) {
Log.e(TAG, "Socket ConnExc: ${e.localizedMessage}")
}catch (e: URISyntaxException) {
Log.e(TAG, "Socket URISynExc: ${e.localizedMessage}")
}catch (e: Exception){
Log.e(TAG, "Socket Exc: ${e.localizedMessage}")
}
}
fun send(eventName: String, msg: String){
mSocket.emit(eventName, msg)
}
fun open(){
mSocket.on("alert", onAlertNotif)
mSocket.on("warning", onWarningNotif)
mSocket.on("simple", onSimpleNotif)
mSocket.connect()
}
fun stop(){
mSocket.off()
mSocket.disconnect()
}
companion object{
const val TAG = "SocketHelper"
}
}
class MainActivity : AppCompatActivity() {
private val socketHelper = SocketHelper()
override fun onCreate(savedInstanceState: Bundle?) {
...
socketHelper.open()
}
override fun onDestroy() {
super.onDestroy()
socketHelper.stop()
}
}
UPDATE:
As an update, I'll also share the main.go from the server side as it might be helpful to you guys:
package main
import (
"flag"
"fmt"
"log"
"net"
pb "github.com/<me>/<project_name>/api/proto/out"
cmmHelpers "github.com/<me>/<project_name>/cmd/commons/helpers"
"github.com/<me>/<project_name>/cmd/server/handlers"
"github.com/<me>/<project_name>/cmd/server/helpers"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
func init() {
cmmHelpers.DatabaseConnection()
}
var (
tls = flag.Bool("tls", true, "Connection uses TLS if true, else plain TCP")
certFile = flag.String("cert_file", "", "The TLS cert file")
keyFile = flag.String("key_file", "", "The TLS key file")
port = flag.Int("port", 50051, "The server port")
)
func main() {
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
var opts []grpc.ServerOption
if *tls {
if *certFile == "" {
*certFile = "service.pem"
}
if *keyFile == "" {
*keyFile = "service.key"
}
creds, err := credentials.NewServerTLSFromFile(*certFile, *keyFile)
if err != nil {
log.Fatalf("Failed to generate credentials: %v", err)
}
opts = []grpc.ServerOption{grpc.Creds(creds)}
}
mServ := grpc.NewServer(opts...)
fmt.Println("gRPC server running ...")
//some gRPC related boiler plate
log.Printf("Server listening at %v", lis.Addr())
go helpers.StartSocket()
if err := mServ.Serve(lis); err != nil {
log.Fatalf("failed to serve : %v", err)
}
}
答案1
得分: 1
我已经找到了一个答案,但它是破坏性的。我完全放弃了socket.io
,而是使用了内置的net
包。如果需要的话,我稍后会弄清楚如何实现socket.io
的那些高级功能。以下是代码的样子:
import (
"flag"
"fmt"
"log"
"net"
"os"
)
var (
SOCKET_PORT = flag.Int("socket_server_port", 9001, "Socket sckServer port")
HOST = flag.String("host name", "localhost", "server host name")
socketConnArr = make(map[string]net.Conn)
)
func StartSocket() {
listen, err := net.Listen(TYPE, fmt.Sprintf(":%d", *SOCKET_PORT))
if err != nil {
log.Fatalf("conn err: %s", err)
os.Exit(1)
}
defer listen.Close()
fmt.Printf("Socket sckServer serving at localhost:%d...\n", *SOCKET_PORT)
for {
conn, err := listen.Accept()
if err != nil {
log.Fatalf("accept err: %s", err)
continue
}
socketConnArr[conn.RemoteAddr().String()] = conn
fmt.Printf("socket client on: %s\n", conn.RemoteAddr().String())
go connWrite(conn, "socket server ack")
go connRead(conn)
}
}
func connRead(conn net.Conn) {
fmt.Println("Reading")
buffer := make([]byte, 1024)
bytesRead, err := conn.Read(buffer)
if err != nil {
log.Fatalf("req err: %s", err)
}
fmt.Printf("Read %d bytes: %s", bytesRead, string(buffer[:bytesRead]))
}
func connWrite(conn net.Conn, message string) {
fmt.Println("Sending")
conn.Write([]byte("\n")) //for some reason, this is very important
conn.Write([]byte(message))
conn.Write([]byte("\n"))
}
然后在 Android 端:
lifecycleScope.launch(Dispatchers.IO) {
var response = ""
try {
socket = Socket(SERVER_IP, SERVER_PORT)
writer = PrintWriter(socket.getOutputStream())
reader = BufferedReader(InputStreamReader(socket.getInputStream()))
writer.println(socketInitMsg)
writer.flush()
while (true) {
if (reader.readLine() != null) {
response = reader.readLine()
Log.i(SocketHelper.TAG, "response: $response")
}
}
} catch (e: IOException) {
Log.e(SocketHelper.TAG, "err1: " + e.localizedMessage)
} catch (e: Exception) {
Log.e(SocketHelper.TAG, "err11: " + e.localizedMessage)
}
}
英文:
I already found an answer, but it's the destructive kind. I ditched socket.io
altogether and used what the built-in net
package have. I'll just figure out how to implement those fancy features socket.io
have later on if needed. Here's what the code looks like:
import (
"flag"
"fmt"
"log"
"net"
"os"
)
var (
SOCKET_PORT = flag.Int("socket_server_port", 9001, "Socket sckServer port")
HOST = flag.String("host name", "localhost", "server host name")
socketConnArr = make(map[string]net.Conn)
)
func StartSocket() {
listen, err := net.Listen(TYPE, fmt.Sprintf(":%d", *SOCKET_PORT))
if err != nil {
log.Fatalf("conn err: %s", err)
os.Exit(1)
}
defer listen.Close()
fmt.Printf("Socket sckServer serving at localhost:%d...\n", *SOCKET_PORT)
for {
conn, err := listen.Accept()
if err != nil {
log.Fatalf("accept err: %s", err)
continue
}
socketConnArr[conn.RemoteAddr().String()] = conn
fmt.Printf("socket client on: %s\n", conn.RemoteAddr().String())
go connWrite(conn, "socket server ack")
go connRead(conn)
}
}
func connRead(conn net.Conn) {
fmt.Println("Reading")
buffer := make([]byte, 1024)
bytesRead, err := conn.Read(buffer)
if err != nil {
log.Fatalf("req err: %s", err)
}
fmt.Printf("Read %d bytes: %s", bytesRead, string(buffer[:bytesRead]))
}
func connWrite(conn net.Conn, message string) {
fmt.Println("Sending")
conn.Write([]byte("\n")) //for some reason, this is very important
conn.Write([]byte(message))
conn.Write([]byte("\n"))
}
Then on the android side:
lifecycleScope.launch(Dispatchers.IO) {
var response = ""
try {
socket = Socket(SERVER_IP, SERVER_PORT)
writer = PrintWriter(socket.getOutputStream())
reader = BufferedReader(InputStreamReader(socket.getInputStream()))
writer.println(socketInitMsg)
writer.flush()
while (true) {
if (reader.readLine() != null) {
response = reader.readLine()
Log.i(SocketHelper.TAG, "response: $response")
}
}
} catch (e: IOException) {
Log.e(SocketHelper.TAG, "err1: " + e.localizedMessage)
} catch (e: Exception) {
Log.e(SocketHelper.TAG, "err11: " + e.localizedMessage)
}
}
答案2
得分: 0
首先,请确保没有并发问题:由于您将套接字服务器作为Go协程运行,它可能会由于启动它的主函数的生命周期而过早终止。确保在服务器处理所有连接之前,main
函数不会终止。在提供的main.go
代码中,您在启动gRPC服务器之前调用了go helpers.StartSocket()
。如果gRPC服务器关闭(由于错误或程序终止),StartSocket
协程也可能被终止。请参阅"Starting a socket server interferes with the gRPC/http client server communication Golang"。
另外,请检查错误!可能会发生服务器端或客户端端口的错误,但没有被正确捕获。
- 在您的Kotlin代码中,您目前只捕获
ConnectException
和URISyntaxException
。最好也捕获通用的Exception
作为后备,因为可能会发生其他未预料到的异常。 - 在您的Go代码中,
sckServer.Serve()
可能会返回一个需要处理或记录的错误。 BroadcastToTag
函数可以检查广播是否成功,并处理或记录可能发生的任何错误。但是,根据go-socket.io
库中BroadcastToNamespace
函数的当前实现,它似乎不会返回错误。
或者,添加错误值:在GetSocketSrv
函数中,当sckServer
为nil
时返回错误会是更好的做法,因为它表示套接字服务器尚未初始化。这样,调用者可以适当地处理此情况,而不会冒险发生恐慌。例如,如果在StartSocket
之前或套接字服务器关闭后调用BroadcastToTag
,sckServer.BroadcastToNamespace
将导致恐慌,因为sckServer
为nil
。
为了改进错误处理和更好地管理goroutine的生命周期,您的代码可以包括以下更新:
package helpers
import (
"flag"
"fmt"
"log"
"net/http"
"sync"
socketio "github.com/googollee/go-socket.io"
"github.com/googollee/go-socket.io/engineio"
"github.com/googollee/go-socket.io/engineio/transport"
"github.com/googollee/go-socket.io/engineio/transport/polling"
"github.com/googollee/go-socket.io/engineio/transport/websocket"
)
var allowOriginFunc = func(r *http.Request) bool {
return true
}
var (
port = flag.Int("socket_server_port", 8000, "Socket server port")
)
var sckServer *socketio.Server
var wg sync.WaitGroup // WaitGroup to manage goroutines' lifecycle
// Constants, as before
//...
func StartSocket() {
flag.Parse()
sckServer = socketio.NewServer(&engineio.Options{
Transports: []transport.Transport{
&polling.Transport{
CheckOrigin: allowOriginFunc,
},
&websocket.Transport{
CheckOrigin: allowOriginFunc,
},
},
})
// Event handlers, as before
//...
go func() {
defer wg.Done() // When this goroutine finishes, mark one job as done
sckServer.Serve()
}()
wg.Add(1) // We have one job running
defer func() {
sckServer.Close() // Make sure to close the server when the function exits
wg.Wait() // Wait for all jobs to finish
}()
// HTTP handlers, as before
//...
fmt.Printf("Socket server serving at localhost:%d...\n", *port)
err := http.ListenAndServe(fmt.Sprintf(":%d", *port), nil)
if err != nil {
log.Fatalf("Failed to start socket server: %v\n", err)
}
}
func GetSocketSrv() (*socketio.Server, error) {
if sckServer == nil {
return nil, fmt.Errorf("Socket server is not initialized")
}
return sckServer, nil
}
// BroadcastToTag function, as before
//...
对于main.go
:
package main
import (
// Imports, as before
//...
)
// Initializations, as before
//...
func main() {
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
var opts []grpc.ServerOption
if *tls {
// Certificate and key file handling, as before
//...
}
mServ := grpc.NewServer(opts...)
fmt.Println("gRPC server running ...")
// gRPC related boilerplate, as before
//...
log.Printf("Server listening at %v", lis.Addr())
go func() {
if err := helpers.StartSocket(); err != nil {
log.Fatalf("Failed to start socket server: %v", err)
}
}()
if err := mServ.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
请将“Imports, as before”和“gRPC related boilerplate, as before”注释替换为您的原始代码。
> 我已经用一个使用更简单的net库的代码替换了那些代码。
一个使用Go的net
包作为服务器端,以及使用Kotlin的Ktor
库作为客户端端口的简单TCP服务器/客户端实现可能是(不完全复制所有socket.io功能,特别是与命名空间和广播相关的功能):
Go服务器:
package main
import (
"bufio"
"fmt"
"log"
"net"
"strings"
)
func handleConnection(c net.Conn) {
fmt.Printf("Serving %s\n", c.RemoteAddr().String())
for {
netData, err := bufio.NewReader(c).ReadString('\n')
if err != nil {
fmt.Println(err)
return
}
temp := strings.TrimSpace(string(netData))
if temp == "STOP" {
break
}
fmt.Println(temp)
c.Write([]byte("Received data: " + temp + "\n"))
}
c.Close()
}
func main() {
l, err := net.Listen("tcp4", ":5000")
if err != nil {
log.Fatalf("Failed to start server: %v\n", err)
}
defer l.Close()
for {
c, err := l.Accept()
if err != nil {
fmt.Println(err)
return
}
go handleConnection(c)
}
}
Go服务器将在TCP端口5000上监听传入连接。接收到连接后,它将创建一个新的goroutine来独立处理该连接。然后,它将逐行读取传入的数据,打印它并带有“Received data:
”前缀将其回显给客户端。
Kotlin客户端:
import io.ktor.network.sockets.*
import io.ktor.utils.io.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import java.net.InetSocketAddress
fun main() = runBlocking {
val socket = aSocket(Dispatchers.IO).tcp().connect(InetSocketAddress("localhost", 5000))
val input = socket.openReadChannel()
val output = socket.openWriteChannel(autoFlush = true)
output.writeUTF8Line("Hello, Server!")
val response = input.readUTF8Line()
println("Server said: '$response'")
output.writeUTF8Line("STOP")
socket.close()
}
Kotlin客户端将连接到服务器,发送问候消息,等待响应,打印它,然后在关闭连接之前发送一个“STOP”消息。此代码使用Kotlin协程和Ktor的aSocket
方法来打开TCP连接。
注意:此代码仅在服务器和客户端在同一台机器上运行时才有效。如果它们在不同的机器上,请将"localhost"
替换为服务器的IP地址。
请记得在Kotlin的依赖项中包含Ktor库(ktor-network
):
dependencies {
implementation 'io.ktor:ktor-network:2.3.2'
}
英文:
First, make sure you do not have a concurrency issue: Since you are running the socket server as a Go routine, it might be terminating prematurely due to the lifecycle of the main function it is being launched from.
Ensure the main
function does not terminate before the server is done handling all connections.
In the provided main.go
code, you have go helpers.StartSocket()
called just before starting the gRPC server. If the gRPC server closes (either due to error or program termination), the StartSocket
goroutine may also get terminated.
See also "Starting a socket server interferes with the gRPC/http client server communication Golang"
And, as usual, do check your errors! There might be an error occurring either on the server side or the client side that is not being properly caught.
- In your Kotlin code, you are currently only catching
ConnectException
andURISyntaxException
. It would be better to also catch a genericException
as a fallback, as other unanticipated exceptions could occur. - In your Go code,
sckServer.Serve()
might return an error that should be handled or logged. - The
BroadcastToTag
function could check whether the broadcast was successful and handle or log any error that might have occurred. However, it seems likeBroadcastToNamespace
function does not return an error in its current implementation in thego-socket.io
library.
Or, add error values: In the GetSocketSrv
function, returning an error when sckServer
is nil
would be a better practice as it indicates that the socket server has not been initialized. This allows the caller to handle this situation appropriately instead of risking a panic. For instance, if BroadcastToTag
is called before StartSocket
or after the socket server is closed, sckServer.BroadcastToNamespace
will cause a panic because sckServer
is nil
.
For improving error handling and better managing your goroutines' lifecycles, your code could include those updates:
package helpers
import (
"flag"
"fmt"
"log"
"net/http"
"sync"
socketio "github.com/googollee/go-socket.io"
"github.com/googollee/go-socket.io/engineio"
"github.com/googollee/go-socket.io/engineio/transport"
"github.com/googollee/go-socket.io/engineio/transport/polling"
"github.com/googollee/go-socket.io/engineio/transport/websocket"
)
var allowOriginFunc = func(r *http.Request) bool {
return true
}
var (
port = flag.Int("socket_server_port", 8000, "Socket sckServer port")
)
var sckServer *socketio.Server
var wg sync.WaitGroup // WaitGroup to manage goroutines' lifecycle
// Constants, as before
//...
func StartSocket() {
flag.Parse()
sckServer = socketio.NewServer(&engineio.Options{
Transports: []transport.Transport{
&polling.Transport{
CheckOrigin: allowOriginFunc,
},
&websocket.Transport{
CheckOrigin: allowOriginFunc,
},
},
})
// Event handlers, as before
//...
go func() {
defer wg.Done() // When this goroutine finishes, mark one job as done
sckServer.Serve()
}()
wg.Add(1) // We have one job running
defer func() {
sckServer.Close() // Make sure to close the server when the function exits
wg.Wait() // Wait for all jobs to finish
}()
// HTTP handlers, as before
//...
fmt.Printf("Socket sckServer serving at localhost:%d...\n", *port)
err := http.ListenAndServe(fmt.Sprintf(":%d", *port), nil)
if err != nil {
log.Fatalf("Failed to start socket sckServer: %v\n", err)
}
}
func GetSocketSrv() (*socketio.Server, error) {
if sckServer == nil {
return nil, fmt.Errorf("Socket server is not initialized")
}
return sckServer, nil
}
// BroadcastToTag function, as before
//...
For the main.go
:
package main
import (
// Imports, as before
//...
)
// Initializations, as before
//...
func main() {
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
var opts []grpc.ServerOption
if *tls {
// Certificate and key file handling, as before
//...
}
mServ := grpc.NewServer(opts...)
fmt.Println("gRPC server running ...")
// gRPC related boilerplate, as before
//...
log.Printf("Server listening at %v", lis.Addr())
go func() {
if err := helpers.StartSocket(); err != nil {
log.Fatalf("Failed to start socket server: %v", err)
}
}()
if err := mServ.Serve(lis); err != nil {
log.Fatalf("failed to serve : %v", err)
}
}
Please replace "Imports, as before" and "gRPC related boilerplate, as before" comments with your original code.
> I already replaced those code with something that is using the much simpler net library
A simple TCP server/client implementation using Go's net
package for the server side and Kotlin's Ktor
library for the client side could be (without fully reproducing all the socket.io features, especially the ones related to namespaces and broadcast):
Go server:
package main
import (
"bufio"
"fmt"
"log"
"net"
"strings"
)
func handleConnection(c net.Conn) {
fmt.Printf("Serving %s\n", c.RemoteAddr().String())
for {
netData, err := bufio.NewReader(c).ReadString('\n')
if err != nil {
fmt.Println(err)
return
}
temp := strings.TrimSpace(string(netData))
if temp == "STOP" {
break
}
fmt.Println(temp)
c.Write([]byte("Received data: " + temp + "\n"))
}
c.Close()
}
func main() {
l, err := net.Listen("tcp4", ":5000")
if err != nil {
log.Fatalf("Failed to start server: %v\n", err)
}
defer l.Close()
for {
c, err := l.Accept()
if err != nil {
fmt.Println(err)
return
}
go handleConnection(c)
}
}
The Go server will listen for incoming connections on TCP port 5000.
Upon receiving a connection, it will create a new goroutine to handle that connection independently of others.
It will then read the incoming data line by line, print it, and echo it back to the client with a "Received data:
" prefix.
Kotlin client:
import io.ktor.network.sockets.*
import io.ktor.utils.io.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import java.net.InetSocketAddress
fun main() = runBlocking {
val socket = aSocket(Dispatchers.IO).tcp().connect(InetSocketAddress("localhost", 5000))
val input = socket.openReadChannel()
val output = socket.openWriteChannel(autoFlush = true)
output.writeUTF8Line("Hello, Server!")
val response = input.readUTF8Line()
println("Server said: '$response'")
output.writeUTF8Line("STOP")
socket.close()
}
The Kotlin client will connect to the server, send a greeting message, wait for the response, print it, and then send a "STOP" message before closing the connection.
This code uses Kotlin coroutines and Ktor's aSocket
method to open the TCP connection.
Note: This code will work only if the server and client are running on the same machine. If they are on different machines, replace "localhost"
with the server's IP address.
Remember to include Ktor library (ktor-network
) in your Kotlin dependencies:
dependencies {
implementation 'io.ktor:ktor-network:2.3.2'
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论