Golang的socket.io服务器由于某种原因出现故障。

huangapple go评论99阅读模式
英文:

Golang socket.io server is malfunctioning for some reason

问题

在尝试使用socket.io进行一些测试后,我将一些Golang和Android Kotlin代码复制到了我的项目的服务器和客户端应用程序中。

我对原始代码唯一的修改是将socket服务器的启动作为一个"coroutine"来运行,因为只调用"StartSocket"似乎是一个阻塞函数。
更新后,我测试了代码是否仍然可以工作,结果是可以的。应用程序可以连接到服务器,应用程序也可以向服务器发送数据,而且我记得应用程序也可以接收来自服务器的数据。

当我重新构建应用程序时,服务器显示一个客户端断开连接的消息。现在,只有连接部分起作用。即使原始应用程序现在也无法发送数据,重新构建应用程序似乎也不会导致它断开连接。而且客户端会再次重复断开连接,但现在它是静默的,只有在终端上出现一个新的连接消息。在重复断开连接之前,至少会告诉一些原因导致客户端断开连接。
如果我使用原始代码运行"go run"并将其与原始应用程序配对,一切都正常。我使用"go build"构建了我的项目代码,但我怀疑这应该不会影响socket.io。我对几乎没有更改任何内容的情况感到困惑。

这是我的Golang代码:

package helpers

import (
	"flag"
	"fmt"
	"log"
	"net/http"

	socketio "github.com/googollee/go-socket.io"
	"github.com/googollee/go-socket.io/engineio"
	"github.com/googollee/go-socket.io/engineio/transport"
	"github.com/googollee/go-socket.io/engineio/transport/polling"
	"github.com/googollee/go-socket.io/engineio/transport/websocket"
)

var allowOriginFunc = func(r *http.Request) bool {
	return true
}
var (
	port = flag.Int("socket_server_port", 8000, "Socket server port")
)

var sckServer *socketio.Server

const WARNING_TAG = "warning"
const ALERT_TAG = "alert"
const NAMESPACE = "notifications"
const SIMPLE_TAG = "simple"
const ROOM = "notif_room"

func StartSocket() {
	flag.Parse()

	sckServer = socketio.NewServer(&engineio.Options{
		Transports: []transport.Transport{
			&polling.Transport{
				CheckOrigin: allowOriginFunc,
			},
			&websocket.Transport{
				CheckOrigin: allowOriginFunc,
			},
		},
	})

	sckServer.OnConnect("/", func(s socketio.Conn) error {
		s.SetContext("")
		fmt.Println("connected:", s.ID())

		s.Emit("notice", "new user connected")
		return nil
	})

	sckServer.OnEvent("/", "notice", func(s socketio.Conn, msg string) {
		fmt.Println("notice:", msg)
		s.Emit("notice", "have "+msg)
	})

	sckServer.OnError("/", func(s socketio.Conn, e error) {
		fmt.Println("socket error:", e)
	})

	sckServer.OnDisconnect("/", func(s socketio.Conn, reason string) {
		fmt.Println("closed", reason)
	})

	go sckServer.Serve()
	defer sckServer.Close()

	http.Handle("/socket.io/", sckServer)
	http.Handle("/", http.FileServer(http.Dir("./asset")))

	fmt.Printf("Socket server serving at localhost:%d...\n", *port)

	err := http.ListenAndServe(fmt.Sprintf(":%d", *port), nil)

	if err != nil {
		log.Fatalf("Failed to start socket server: %v\n", err)
	}
}

func GetSocketSrv() *socketio.Server {
	return sckServer
}

func BroadcastToTag(tag string, payload string) {
	fmt.Printf("BroadcastToTag tag: %s, payload: %s\n", tag, payload)

	if sckServer != nil {
		broadcastStat := sckServer.BroadcastToNamespace(NAMESPACE, tag, payload)
		fmt.Printf("broadcastStat: %v\n", broadcastStat)
	} else {
		fmt.Printf("sckServer = nil\n")
	}
}

这是我的Android Kotlin代码:

import android.util.Log
import io.socket.client.IO
import io.socket.client.Socket
import io.socket.emitter.Emitter
import java.net.ConnectException
import java.net.URISyntaxException

class SocketHelper {
    private lateinit var mSocket: Socket
    private val onAlertNotif =
        Emitter.Listener { args ->
            Log.i(TAG, "onAlertNotif args: ${args[0]}")
        }

    private val onWarningNotif =
        Emitter.Listener { args ->
            Log.i(TAG, "onWarningNotif args: ${args[0]}")
        }

    private val onSimpleNotif =
        Emitter.Listener { args ->
            Log.i(TAG, "onSimpleNotif args: ${args[0]}")
        }

    init {
        try {
            mSocket = IO.socket("http://<local_ip>:8000/")
        } catch (e: ConnectException) {
            Log.e(TAG, "Socket ConnExc: ${e.localizedMessage}")
        } catch (e: URISyntaxException) {
            Log.e(TAG, "Socket URISynExc: ${e.localizedMessage}")
        } catch (e: Exception) {
            Log.e(TAG, "Socket Exc: ${e.localizedMessage}")
        }
    }

    fun send(eventName: String, msg: String) {
        mSocket.emit(eventName, msg)
    }

    fun open() {
        mSocket.on("alert", onAlertNotif)
        mSocket.on("warning", onWarningNotif)
        mSocket.on("simple", onSimpleNotif)

        mSocket.connect()
    }

    fun stop() {
        mSocket.off()
        mSocket.disconnect()
    }

    companion object {
        const val TAG = "SocketHelper"
    }
}

class MainActivity : AppCompatActivity() {
    private val socketHelper = SocketHelper()

    override fun onCreate(savedInstanceState: Bundle?) {
        ...
        socketHelper.open()
    }

    override fun onDestroy() {
        super.onDestroy()

        socketHelper.stop()
    }
}

更新:

作为更新,我还将服务器端的main.go分享给你们,因为它可能对你们有帮助:

package main

import (
	"flag"
	"fmt"
	"log"
	"net"

	pb "github.com/<me>/<project_name>/api/proto/out"
	cmmHelpers "github.com/<me>/<project_name>/cmd/commons/helpers"
	"github.com/<me>/<project_name>/cmd/server/handlers"
	"github.com/<me>/<project_name>/cmd/server/helpers"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
)

func init() {
	cmmHelpers.DatabaseConnection()
}

var (
	tls      = flag.Bool("tls", true, "Connection uses TLS if true, else plain TCP")
	certFile = flag.String("cert_file", "", "The TLS cert file")
	keyFile  = flag.String("key_file", "", "The TLS key file")
	port     = flag.Int("port", 50051, "The server port")
)

func main() {
	flag.Parse()

	lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}

	var opts []grpc.ServerOption
	if *tls {
		if *certFile == "" {
			*certFile = "service.pem"
		}

		if *keyFile == "" {
			*keyFile = "service.key"
		}

		creds, err := credentials.NewServerTLSFromFile(*certFile, *keyFile)

		if err != nil {
			log.Fatalf("Failed to generate credentials: %v", err)
		}

		opts = []grpc.ServerOption{grpc.Creds(creds)}
	}

	mServ := grpc.NewServer(opts...)

	fmt.Println("gRPC server running ...")

	//一些与gRPC相关的样板代码

	log.Printf("Server listening at %v", lis.Addr())

	go helpers.StartSocket()

	if err := mServ.Serve(lis); err != nil {
		log.Fatalf("failed to serve : %v", err)
	}
}

希望这些信息对你有帮助!

英文:

After making some test Golang and Android Kotlin code to try out socket.io, I copied those code to my project's server and client app.

The only difference I did from the original code is run the socket server start as a coroutine because of necessity, as just calling StartSocket seems to be essentially a blocking function.
After the update, I tested if the code still works, and it does. The app can connect to the server and the app can emit to the server as well, and IIRC the app can also receive the emit from the server.

When I rebuild the app, the server shows that a client disconnects. Now, only the connection part works. Even the original app can't emit now, rebuilding the app doesn't seem to disconnect it anymore as well. And the client gets disconnected repeatedly again, but now it happens silently, like only a new connected message appears on the terminal. Before the repeated disconnection at least tells some reason why the the client disconnects.
If I go run the original code and pair it with the original app, everything works fine. I go build my project's code, but I doubt that should affect socket.io. I'm at my wits ends as to why everything (mostly) doesn't work anymore when I barely changed anything.

Here's my Golang code:

package helpers
import (
&quot;flag&quot;
&quot;fmt&quot;
&quot;log&quot;
&quot;net/http&quot;
socketio &quot;github.com/googollee/go-socket.io&quot;
&quot;github.com/googollee/go-socket.io/engineio&quot;
&quot;github.com/googollee/go-socket.io/engineio/transport&quot;
&quot;github.com/googollee/go-socket.io/engineio/transport/polling&quot;
&quot;github.com/googollee/go-socket.io/engineio/transport/websocket&quot;
)
var allowOriginFunc = func(r *http.Request) bool {
return true
}
var (
port = flag.Int(&quot;socket_server_port&quot;, 8000, &quot;Socket sckServer port&quot;)
)
var sckServer *socketio.Server
const WARNING_TAG = &quot;warning&quot;
const ALERT_TAG = &quot;alert&quot;
const NAMESPACE = &quot;notifications&quot;
const SIMPLE_TAG = &quot;simple&quot;
const ROOM = &quot;notif_room&quot;
func StartSocket() {
flag.Parse()
sckServer = socketio.NewServer(&amp;engineio.Options{
Transports: []transport.Transport{
&amp;polling.Transport{
CheckOrigin: allowOriginFunc,
},
&amp;websocket.Transport{
CheckOrigin: allowOriginFunc,
},
},
})
sckServer.OnConnect(&quot;/&quot;, func(s socketio.Conn) error {
s.SetContext(&quot;&quot;)
fmt.Println(&quot;connected:&quot;, s.ID())
s.Emit(&quot;notice&quot;, &quot;new user connected&quot;)
return nil
})
sckServer.OnEvent(&quot;/&quot;, &quot;notice&quot;, func(s socketio.Conn, msg string) {
fmt.Println(&quot;notice:&quot;, msg)
s.Emit(&quot;notice&quot;, &quot;have &quot;+msg)
})
sckServer.OnError(&quot;/&quot;, func(s socketio.Conn, e error) {
fmt.Println(&quot;socket error:&quot;, e)
})
sckServer.OnDisconnect(&quot;/&quot;, func(s socketio.Conn, reason string) {
fmt.Println(&quot;closed&quot;, reason)
})
go sckServer.Serve()
defer sckServer.Close()
http.Handle(&quot;/socket.io/&quot;, sckServer)
http.Handle(&quot;/&quot;, http.FileServer(http.Dir(&quot;./asset&quot;)))
fmt.Printf(&quot;Socket sckServer serving at localhost:%d...\n&quot;, *port)
err := http.ListenAndServe(fmt.Sprintf(&quot;:%d&quot;, *port), nil)
if err != nil {
log.Fatalf(&quot;Failed to start socket sckServer: %v\n&quot;, err)
}
}
func GetSocketSrv() *socketio.Server {
return sckServer
}
func BroadcastToTag(tag string, payload string) {
fmt.Printf(&quot;BroadcastToTag tag: %s, payload: %s\n&quot;, tag, payload)
if sckServer != nil {
broadcastStat := sckServer.BroadcastToNamespace(NAMESPACE, tag, payload)
fmt.Printf(&quot;broadcastStat: %v\n&quot;, broadcastStat)
} else {
fmt.Printf(&quot;sckServer = nil\n&quot;)
}
}

And here's my Android Kotlin code:

import android.util.Log
import io.socket.client.IO
import io.socket.client.Socket
import io.socket.emitter.Emitter
import java.net.ConnectException
import java.net.URISyntaxException
class SocketHelper {
private lateinit var mSocket: Socket
private val onAlertNotif =
Emitter.Listener { args -&gt;
Log.i(TAG, &quot;onAlertNotif args: ${args[0]}&quot;)
}
private val onWarningNotif =
Emitter.Listener { args -&gt;
Log.i(TAG, &quot;onWarningNotif args: ${args[0]}&quot;)
}
private val onSimpleNotif =
Emitter.Listener { args -&gt;
Log.i(TAG, &quot;onSimpleNotif args: ${args[0]}&quot;)
}
init {
try {
mSocket = IO.socket(&quot;http://&lt;local_ip&gt;:8000/&quot;)
}catch (e: ConnectException) {
Log.e(TAG, &quot;Socket ConnExc: ${e.localizedMessage}&quot;)
}catch (e: URISyntaxException) {
Log.e(TAG, &quot;Socket URISynExc: ${e.localizedMessage}&quot;)
}catch (e: Exception){
Log.e(TAG, &quot;Socket Exc: ${e.localizedMessage}&quot;)
}
}
fun send(eventName: String, msg: String){
mSocket.emit(eventName, msg)
}
fun open(){
mSocket.on(&quot;alert&quot;, onAlertNotif)
mSocket.on(&quot;warning&quot;, onWarningNotif)
mSocket.on(&quot;simple&quot;, onSimpleNotif)
mSocket.connect()
}
fun stop(){
mSocket.off()
mSocket.disconnect()
}
companion object{
const val TAG = &quot;SocketHelper&quot;
}
}
class MainActivity : AppCompatActivity() {
private val socketHelper = SocketHelper()
override fun onCreate(savedInstanceState: Bundle?) {
...
socketHelper.open()
}
override fun onDestroy() {
super.onDestroy()
socketHelper.stop()
}
}

UPDATE:

As an update, I'll also share the main.go from the server side as it might be helpful to you guys:

package main
import (
&quot;flag&quot;
&quot;fmt&quot;
&quot;log&quot;
&quot;net&quot;
pb &quot;github.com/&lt;me&gt;/&lt;project_name&gt;/api/proto/out&quot;
cmmHelpers &quot;github.com/&lt;me&gt;/&lt;project_name&gt;/cmd/commons/helpers&quot;
&quot;github.com/&lt;me&gt;/&lt;project_name&gt;/cmd/server/handlers&quot;
&quot;github.com/&lt;me&gt;/&lt;project_name&gt;/cmd/server/helpers&quot;
&quot;google.golang.org/grpc&quot;
&quot;google.golang.org/grpc/credentials&quot;
)
func init() {
cmmHelpers.DatabaseConnection()
}
var (
tls      = flag.Bool(&quot;tls&quot;, true, &quot;Connection uses TLS if true, else plain TCP&quot;)
certFile = flag.String(&quot;cert_file&quot;, &quot;&quot;, &quot;The TLS cert file&quot;)
keyFile  = flag.String(&quot;key_file&quot;, &quot;&quot;, &quot;The TLS key file&quot;)
port     = flag.Int(&quot;port&quot;, 50051, &quot;The server port&quot;)
)
func main() {
flag.Parse()
lis, err := net.Listen(&quot;tcp&quot;, fmt.Sprintf(&quot;localhost:%d&quot;, *port))
if err != nil {
log.Fatalf(&quot;failed to listen: %v&quot;, err)
}
var opts []grpc.ServerOption
if *tls {
if *certFile == &quot;&quot; {
*certFile = &quot;service.pem&quot;
}
if *keyFile == &quot;&quot; {
*keyFile = &quot;service.key&quot;
}
creds, err := credentials.NewServerTLSFromFile(*certFile, *keyFile)
if err != nil {
log.Fatalf(&quot;Failed to generate credentials: %v&quot;, err)
}
opts = []grpc.ServerOption{grpc.Creds(creds)}
}
mServ := grpc.NewServer(opts...)
fmt.Println(&quot;gRPC server running ...&quot;)
//some gRPC related boiler plate
log.Printf(&quot;Server listening at %v&quot;, lis.Addr())
go helpers.StartSocket()
if err := mServ.Serve(lis); err != nil {
log.Fatalf(&quot;failed to serve : %v&quot;, err)
}
}

答案1

得分: 1

我已经找到了一个答案,但它是破坏性的。我完全放弃了socket.io,而是使用了内置的net包。如果需要的话,我稍后会弄清楚如何实现socket.io的那些高级功能。以下是代码的样子:

import (
	"flag"
	"fmt"
	"log"
	"net"
	"os"
)

var (
	SOCKET_PORT   = flag.Int("socket_server_port", 9001, "Socket sckServer port")
	HOST          = flag.String("host name", "localhost", "server host name")
	socketConnArr = make(map[string]net.Conn)
)

func StartSocket() {
	listen, err := net.Listen(TYPE, fmt.Sprintf(":%d", *SOCKET_PORT))
	if err != nil {
		log.Fatalf("conn err: %s", err)
		os.Exit(1)
	}

	defer listen.Close()

	fmt.Printf("Socket sckServer serving at localhost:%d...\n", *SOCKET_PORT)

	for {
		conn, err := listen.Accept()
		if err != nil {
			log.Fatalf("accept err: %s", err)
			continue
		}
		socketConnArr[conn.RemoteAddr().String()] = conn

		fmt.Printf("socket client on: %s\n", conn.RemoteAddr().String())

		go connWrite(conn, "socket server ack")
		go connRead(conn)
	}
}

func connRead(conn net.Conn) {
	fmt.Println("Reading")
	buffer := make([]byte, 1024)
	bytesRead, err := conn.Read(buffer)
	if err != nil {
		log.Fatalf("req err: %s", err)
	}

	fmt.Printf("Read %d bytes: %s", bytesRead, string(buffer[:bytesRead]))
}

func connWrite(conn net.Conn, message string) {
	fmt.Println("Sending")
	conn.Write([]byte("\n")) //for some reason, this is very important
	conn.Write([]byte(message))
	conn.Write([]byte("\n"))
}

然后在 Android 端:

lifecycleScope.launch(Dispatchers.IO) {
	var response = ""

	try {
		socket = Socket(SERVER_IP, SERVER_PORT)
		writer = PrintWriter(socket.getOutputStream())
		reader = BufferedReader(InputStreamReader(socket.getInputStream()))

		writer.println(socketInitMsg)
		writer.flush()

		while (true) {
			if (reader.readLine() != null) {
				response = reader.readLine()
				Log.i(SocketHelper.TAG, "response: $response")
			}
		}
	} catch (e: IOException) {
		Log.e(SocketHelper.TAG, "err1: " + e.localizedMessage)
	} catch (e: Exception) {
		Log.e(SocketHelper.TAG, "err11: " + e.localizedMessage)
	}
}
英文:

I already found an answer, but it's the destructive kind. I ditched socket.io altogether and used what the built-in net package have. I'll just figure out how to implement those fancy features socket.io have later on if needed. Here's what the code looks like:

import (
&quot;flag&quot;
&quot;fmt&quot;
&quot;log&quot;
&quot;net&quot;
&quot;os&quot;
)
var (
SOCKET_PORT   = flag.Int(&quot;socket_server_port&quot;, 9001, &quot;Socket sckServer port&quot;)
HOST          = flag.String(&quot;host name&quot;, &quot;localhost&quot;, &quot;server host name&quot;)
socketConnArr = make(map[string]net.Conn)
)
func StartSocket() {
listen, err := net.Listen(TYPE, fmt.Sprintf(&quot;:%d&quot;, *SOCKET_PORT))
if err != nil {
log.Fatalf(&quot;conn err: %s&quot;, err)
os.Exit(1)
}
defer listen.Close()
fmt.Printf(&quot;Socket sckServer serving at localhost:%d...\n&quot;, *SOCKET_PORT)
for {
conn, err := listen.Accept()
if err != nil {
log.Fatalf(&quot;accept err: %s&quot;, err)
continue
}
socketConnArr[conn.RemoteAddr().String()] = conn
fmt.Printf(&quot;socket client on: %s\n&quot;, conn.RemoteAddr().String())
go connWrite(conn, &quot;socket server ack&quot;)
go connRead(conn)
}
}
func connRead(conn net.Conn) {
fmt.Println(&quot;Reading&quot;)
buffer := make([]byte, 1024)
bytesRead, err := conn.Read(buffer)
if err != nil {
log.Fatalf(&quot;req err: %s&quot;, err)
}
fmt.Printf(&quot;Read %d bytes: %s&quot;, bytesRead, string(buffer[:bytesRead]))
}
func connWrite(conn net.Conn, message string) {
fmt.Println(&quot;Sending&quot;)
conn.Write([]byte(&quot;\n&quot;)) //for some reason, this is very important
conn.Write([]byte(message))
conn.Write([]byte(&quot;\n&quot;))
}

Then on the android side:

lifecycleScope.launch(Dispatchers.IO) {
var response = &quot;&quot;
try {
socket = Socket(SERVER_IP, SERVER_PORT)
writer = PrintWriter(socket.getOutputStream())
reader = BufferedReader(InputStreamReader(socket.getInputStream()))
writer.println(socketInitMsg)
writer.flush()
while (true) {
if (reader.readLine() != null) {
response = reader.readLine()
Log.i(SocketHelper.TAG, &quot;response: $response&quot;)
}
}
} catch (e: IOException) {
Log.e(SocketHelper.TAG, &quot;err1: &quot; + e.localizedMessage)
} catch (e: Exception) {
Log.e(SocketHelper.TAG, &quot;err11: &quot; + e.localizedMessage)
}
}

答案2

得分: 0

首先,请确保没有并发问题:由于您将套接字服务器作为Go协程运行,它可能会由于启动它的主函数的生命周期而过早终止。确保在服务器处理所有连接之前,main函数不会终止。在提供的main.go代码中,您在启动gRPC服务器之前调用了go helpers.StartSocket()。如果gRPC服务器关闭(由于错误或程序终止),StartSocket协程也可能被终止。请参阅"Starting a socket server interferes with the gRPC/http client server communication Golang"。

另外,请检查错误!可能会发生服务器端或客户端端口的错误,但没有被正确捕获。

  • 在您的Kotlin代码中,您目前只捕获ConnectExceptionURISyntaxException。最好也捕获通用的Exception作为后备,因为可能会发生其他未预料到的异常。
  • 在您的Go代码中,sckServer.Serve()可能会返回一个需要处理或记录的错误。
  • BroadcastToTag函数可以检查广播是否成功,并处理或记录可能发生的任何错误。但是,根据go-socket.io库中BroadcastToNamespace函数的当前实现,它似乎不会返回错误。

或者,添加错误值:在GetSocketSrv函数中,当sckServernil时返回错误会是更好的做法,因为它表示套接字服务器尚未初始化。这样,调用者可以适当地处理此情况,而不会冒险发生恐慌。例如,如果在StartSocket之前或套接字服务器关闭后调用BroadcastToTagsckServer.BroadcastToNamespace将导致恐慌,因为sckServernil

为了改进错误处理和更好地管理goroutine的生命周期,您的代码可以包括以下更新:

package helpers

import (
    "flag"
    "fmt"
    "log"
    "net/http"
    "sync"

    socketio "github.com/googollee/go-socket.io"
    "github.com/googollee/go-socket.io/engineio"
    "github.com/googollee/go-socket.io/engineio/transport"
    "github.com/googollee/go-socket.io/engineio/transport/polling"
    "github.com/googollee/go-socket.io/engineio/transport/websocket"
)

var allowOriginFunc = func(r *http.Request) bool {
    return true
}
var (
    port = flag.Int("socket_server_port", 8000, "Socket server port")
)

var sckServer *socketio.Server
var wg sync.WaitGroup  // WaitGroup to manage goroutines' lifecycle

// Constants, as before
//...

func StartSocket() {
    flag.Parse()

    sckServer = socketio.NewServer(&engineio.Options{
        Transports: []transport.Transport{
            &polling.Transport{
                CheckOrigin: allowOriginFunc,
            },
            &websocket.Transport{
                CheckOrigin: allowOriginFunc,
            },
        },
    })

    // Event handlers, as before
    //...

    go func() {
        defer wg.Done()  // When this goroutine finishes, mark one job as done
        sckServer.Serve()
    }()
    wg.Add(1)  // We have one job running

    defer func() {
        sckServer.Close()  // Make sure to close the server when the function exits
        wg.Wait()  // Wait for all jobs to finish
    }()

    // HTTP handlers, as before
    //...

    fmt.Printf("Socket server serving at localhost:%d...\n", *port)

    err := http.ListenAndServe(fmt.Sprintf(":%d", *port), nil)

    if err != nil {
        log.Fatalf("Failed to start socket server: %v\n", err)
    }
}

func GetSocketSrv() (*socketio.Server, error) {
    if sckServer == nil {
        return nil, fmt.Errorf("Socket server is not initialized")
    }

    return sckServer, nil
}

// BroadcastToTag function, as before
//...

对于main.go

package main

import (
    // Imports, as before
    //...
)

// Initializations, as before
//...

func main() {
    flag.Parse()

    lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }

    var opts []grpc.ServerOption
    if *tls {
        // Certificate and key file handling, as before
        //...
    }

    mServ := grpc.NewServer(opts...)

    fmt.Println("gRPC server running ...")

    // gRPC related boilerplate, as before
    //...

    log.Printf("Server listening at %v", lis.Addr())

    go func() {
        if err := helpers.StartSocket(); err != nil {
            log.Fatalf("Failed to start socket server: %v", err)
        }
    }()

    if err := mServ.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

请将“Imports, as before”和“gRPC related boilerplate, as before”注释替换为您的原始代码。


> 我已经用一个使用更简单的net库的代码替换了那些代码。

一个使用Go的net包作为服务器端,以及使用Kotlin的Ktor库作为客户端端口的简单TCP服务器/客户端实现可能是(不完全复制所有socket.io功能,特别是与命名空间和广播相关的功能):

Go服务器:

package main

import (
	"bufio"
	"fmt"
	"log"
	"net"
	"strings"
)

func handleConnection(c net.Conn) {
	fmt.Printf("Serving %s\n", c.RemoteAddr().String())
	for {
		netData, err := bufio.NewReader(c).ReadString('\n')
		if err != nil {
			fmt.Println(err)
			return
		}

		temp := strings.TrimSpace(string(netData))
		if temp == "STOP" {
			break
		}

		fmt.Println(temp)
		c.Write([]byte("Received data: " + temp + "\n"))
	}
	c.Close()
}

func main() {
	l, err := net.Listen("tcp4", ":5000")
	if err != nil {
		log.Fatalf("Failed to start server: %v\n", err)
	}
	defer l.Close()

	for {
		c, err := l.Accept()
		if err != nil {
			fmt.Println(err)
			return
		}
		go handleConnection(c)
	}
}

Go服务器将在TCP端口5000上监听传入连接。接收到连接后,它将创建一个新的goroutine来独立处理该连接。然后,它将逐行读取传入的数据,打印它并带有“Received data:”前缀将其回显给客户端。

Kotlin客户端:

import io.ktor.network.sockets.*
import io.ktor.utils.io.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import java.net.InetSocketAddress

fun main() = runBlocking {
    val socket = aSocket(Dispatchers.IO).tcp().connect(InetSocketAddress("localhost", 5000))

    val input = socket.openReadChannel()
    val output = socket.openWriteChannel(autoFlush = true)

    output.writeUTF8Line("Hello, Server!")
    val response = input.readUTF8Line()
    println("Server said: '$response'")

    output.writeUTF8Line("STOP")
    socket.close()
}

Kotlin客户端将连接到服务器,发送问候消息,等待响应,打印它,然后在关闭连接之前发送一个“STOP”消息。此代码使用Kotlin协程和Ktor的aSocket方法来打开TCP连接。

注意:此代码仅在服务器和客户端在同一台机器上运行时才有效。如果它们在不同的机器上,请将"localhost"替换为服务器的IP地址。

请记得在Kotlin的依赖项中包含Ktor库(ktor-network):

dependencies {
    implementation 'io.ktor:ktor-network:2.3.2'
}
英文:

First, make sure you do not have a concurrency issue: Since you are running the socket server as a Go routine, it might be terminating prematurely due to the lifecycle of the main function it is being launched from.
Ensure the main function does not terminate before the server is done handling all connections.
In the provided main.go code, you have go helpers.StartSocket() called just before starting the gRPC server. If the gRPC server closes (either due to error or program termination), the StartSocket goroutine may also get terminated.
See also "Starting a socket server interferes with the gRPC/http client server communication Golang"

And, as usual, do check your errors! There might be an error occurring either on the server side or the client side that is not being properly caught.

  • In your Kotlin code, you are currently only catching ConnectException and URISyntaxException. It would be better to also catch a generic Exception as a fallback, as other unanticipated exceptions could occur.
  • In your Go code, sckServer.Serve() might return an error that should be handled or logged.
  • The BroadcastToTag function could check whether the broadcast was successful and handle or log any error that might have occurred. However, it seems like BroadcastToNamespace function does not return an error in its current implementation in the go-socket.io library.

Or, add error values: In the GetSocketSrv function, returning an error when sckServer is nil would be a better practice as it indicates that the socket server has not been initialized. This allows the caller to handle this situation appropriately instead of risking a panic. For instance, if BroadcastToTag is called before StartSocket or after the socket server is closed, sckServer.BroadcastToNamespace will cause a panic because sckServer is nil.

For improving error handling and better managing your goroutines' lifecycles, your code could include those updates:

    package helpers

    import (
        &quot;flag&quot;
        &quot;fmt&quot;
        &quot;log&quot;
        &quot;net/http&quot;
        &quot;sync&quot;

        socketio &quot;github.com/googollee/go-socket.io&quot;
        &quot;github.com/googollee/go-socket.io/engineio&quot;
        &quot;github.com/googollee/go-socket.io/engineio/transport&quot;
        &quot;github.com/googollee/go-socket.io/engineio/transport/polling&quot;
        &quot;github.com/googollee/go-socket.io/engineio/transport/websocket&quot;
    )

    var allowOriginFunc = func(r *http.Request) bool {
        return true
    }
    var (
        port = flag.Int(&quot;socket_server_port&quot;, 8000, &quot;Socket sckServer port&quot;)
    )

    var sckServer *socketio.Server
    var wg sync.WaitGroup  // WaitGroup to manage goroutines&#39; lifecycle

    // Constants, as before
    //...

    func StartSocket() {
        flag.Parse()

        sckServer = socketio.NewServer(&amp;engineio.Options{
            Transports: []transport.Transport{
                &amp;polling.Transport{
                    CheckOrigin: allowOriginFunc,
                },
                &amp;websocket.Transport{
                    CheckOrigin: allowOriginFunc,
                },
            },
        })

        // Event handlers, as before
        //...

        go func() {
            defer wg.Done()  // When this goroutine finishes, mark one job as done
            sckServer.Serve()
        }()
        wg.Add(1)  // We have one job running

        defer func() {
            sckServer.Close()  // Make sure to close the server when the function exits
            wg.Wait()  // Wait for all jobs to finish
        }()

        // HTTP handlers, as before
        //...

        fmt.Printf(&quot;Socket sckServer serving at localhost:%d...\n&quot;, *port)

        err := http.ListenAndServe(fmt.Sprintf(&quot;:%d&quot;, *port), nil)

        if err != nil {
            log.Fatalf(&quot;Failed to start socket sckServer: %v\n&quot;, err)
        }
    }

    func GetSocketSrv() (*socketio.Server, error) {
        if sckServer == nil {
            return nil, fmt.Errorf(&quot;Socket server is not initialized&quot;)
        }

        return sckServer, nil
    }

    // BroadcastToTag function, as before
    //...

For the main.go:

    package main

    import (
        // Imports, as before
        //...
    )

    // Initializations, as before
    //...

    func main() {
        flag.Parse()

        lis, err := net.Listen(&quot;tcp&quot;, fmt.Sprintf(&quot;localhost:%d&quot;, *port))
        if err != nil {
            log.Fatalf(&quot;failed to listen: %v&quot;, err)
        }

        var opts []grpc.ServerOption
        if *tls {
            // Certificate and key file handling, as before
            //...
        }

        mServ := grpc.NewServer(opts...)

        fmt.Println(&quot;gRPC server running ...&quot;)

        // gRPC related boilerplate, as before
        //...

        log.Printf(&quot;Server listening at %v&quot;, lis.Addr())

        go func() {
            if err := helpers.StartSocket(); err != nil {
                log.Fatalf(&quot;Failed to start socket server: %v&quot;, err)
            }
        }()

        if err := mServ.Serve(lis); err != nil {
            log.Fatalf(&quot;failed to serve : %v&quot;, err)
        }
    }

Please replace "Imports, as before" and "gRPC related boilerplate, as before" comments with your original code.


> I already replaced those code with something that is using the much simpler net library

A simple TCP server/client implementation using Go's net package for the server side and Kotlin's Ktor library for the client side could be (without fully reproducing all the socket.io features, especially the ones related to namespaces and broadcast):

Go server:

package main

import (
	&quot;bufio&quot;
	&quot;fmt&quot;
	&quot;log&quot;
	&quot;net&quot;
	&quot;strings&quot;
)

func handleConnection(c net.Conn) {
	fmt.Printf(&quot;Serving %s\n&quot;, c.RemoteAddr().String())
	for {
		netData, err := bufio.NewReader(c).ReadString(&#39;\n&#39;)
		if err != nil {
			fmt.Println(err)
			return
		}

		temp := strings.TrimSpace(string(netData))
		if temp == &quot;STOP&quot; {
			break
		}

		fmt.Println(temp)
		c.Write([]byte(&quot;Received data: &quot; + temp + &quot;\n&quot;))
	}
	c.Close()
}

func main() {
	l, err := net.Listen(&quot;tcp4&quot;, &quot;:5000&quot;)
	if err != nil {
		log.Fatalf(&quot;Failed to start server: %v\n&quot;, err)
	}
	defer l.Close()

	for {
		c, err := l.Accept()
		if err != nil {
			fmt.Println(err)
			return
		}
		go handleConnection(c)
	}
}

The Go server will listen for incoming connections on TCP port 5000.
Upon receiving a connection, it will create a new goroutine to handle that connection independently of others.
It will then read the incoming data line by line, print it, and echo it back to the client with a "Received data:" prefix.

Kotlin client:

import io.ktor.network.sockets.*
import io.ktor.utils.io.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import java.net.InetSocketAddress

fun main() = runBlocking {
    val socket = aSocket(Dispatchers.IO).tcp().connect(InetSocketAddress(&quot;localhost&quot;, 5000))

    val input = socket.openReadChannel()
    val output = socket.openWriteChannel(autoFlush = true)

    output.writeUTF8Line(&quot;Hello, Server!&quot;)
    val response = input.readUTF8Line()
    println(&quot;Server said: &#39;$response&#39;&quot;)

    output.writeUTF8Line(&quot;STOP&quot;)
    socket.close()
}

The Kotlin client will connect to the server, send a greeting message, wait for the response, print it, and then send a "STOP" message before closing the connection.
This code uses Kotlin coroutines and Ktor's aSocket method to open the TCP connection.

Note: This code will work only if the server and client are running on the same machine. If they are on different machines, replace &quot;localhost&quot; with the server's IP address.

Remember to include Ktor library (ktor-network) in your Kotlin dependencies:

dependencies {
    implementation &#39;io.ktor:ktor-network:2.3.2&#39;
}

huangapple
  • 本文由 发表于 2023年7月8日 01:08:33
  • 转载请务必保留本文链接:https://go.coder-hub.com/76638947.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定