在Go中实现客户端和服务器的RPC

huangapple go评论75阅读模式
英文:

RPC from both client and server in Go

问题

在Go语言中,使用net/rpc包从服务器向客户端进行RPC调用是否真的可行?如果不行,是否有更好的解决方案?

英文:

Is it actually possible to do RPC calls from a server to a client with the net/rpc package in Go? If no, is there a better solution out there?

答案1

得分: 4

我目前正在使用thrift(thrift4go)来实现服务器->客户端和客户端->服务器的RPC功能。默认情况下,thrift只支持客户端->服务器的调用,就像net/rpc一样。由于我还需要服务器->客户端的通信,所以我进行了一些研究,并找到了bidi-thrift。Bidi-thrift解释了如何连接一个Java服务器+Java客户端以实现双向的thrift通信。

Bidi-thrift的功能和限制

TCP连接有一个传入和传出的通信线路(RC和TX)。Bidi-thrift的想法是将RS和TX分开,并将它们提供给客户端应用程序和服务器应用程序上的服务器(处理器)和客户端(远程)。我发现在Go中很难做到这一点。而且,这种方式没有“响应”可能(响应线路正在使用中)。因此,服务中的所有方法都必须是“oneway void”(即发出调用后不返回结果)。

解决方案

我改变了bidi-thrift的想法,并让客户端打开两个连接到服务器,A和B。第一个连接(A)用于执行客户端->服务器通信(客户端进行调用,与通常情况下一样)。第二个连接(B)被“劫持”,并连接到客户端上的服务器(处理器),同时连接到服务器上的客户端(远程)。我已经在Go服务器和Java客户端上实现了这个功能。它运行得非常好。它快速可靠(就像普通的thrift一样)。

一些资源... B连接(服务器->客户端)的设置如下:

Go服务器

// 工厂
framedTransportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory())
protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()

// 创建套接字监听器
addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:9091")
if err != nil {
    log.Print("解析地址错误:", err.Error(), "\n")
    return
}
serverTransport, err := thrift.NewTServerSocketAddr(addr)
if err != nil {
    log.Print("创建服务器套接字错误:", err.Error(), "\n")
    return
}

// 启动服务器监听连接
log.Print("在", addr, "上启动B通信(服务器->客户端)的服务器\n")
err = serverTransport.Listen()
if err != nil {
    log.Print("B服务器错误:", err.Error(), "\n")
    return //err
}

// 接受新连接并处理
for {
    transport, err := serverTransport.Accept()
    if err != nil {
        return //err
    }
    if transport != nil {
        // 每个传输都在一个goroutine中处理,以便服务器再次可用。
        go func() {
            useTransport := framedTransportFactory.GetTransport(transport)
            client := worldclient.NewWorldClientClientFactory(useTransport, protocolFactory)
            
            // 就是这样!
            // 让我们对连接做些什么
            result, err := client.Hello()
            if err != nil {
                log.Printf("在客户端调用Hello时出错:%s\n", err)
            }

            // client.CallSomething()
        }()
    }
}

Java客户端

// B连接的准备工作
TTransportFactory transportFactory = new TTransportFactory();
TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
YourServiceProcessor processor = new YourService.Processor<YourServiceProcessor>(new YourServiceProcessor(this));


/* 创建B调用(服务器->客户端)的thrift连接 */
try {
    // 创建传输
    final TTransport transport = new TSocket("127.0.0.1", 9091);
    
    // 打开传输
    transport.open();
    
    // 将传输层添加到帧传输中
    final TTransport framedTransport = new TFramedTransport(transportFactory.getTransport(transport));
    
    // 将帧传输连接到协议
    final TProtocol protocol = protocolFactory.getProtocol(framedTransport);
    
    // 让处理器在新线程中处理请求
    new Thread() {
        public void run() {
            try {
                while (processor.process(protocol, protocol)) {}
            } catch (TException e) {
                e.printStackTrace();
            } catch (NullPointerException e) {
                e.printStackTrace();
            }
        }
    }.start();
} catch(Exception e) {
    e.printStackTrace();
}
英文:

I am currently using thrift (thrift4go) for server->client and client->server RPC functionality. By default, thrift does only client->server calls just like net/rpc. As I also required server->client communication, I did some research and found bidi-thrift. Bidi-thrift explains how to connect a java server + java client to have bidirectional thrift communication.

What bidi-thrift does, and it's limitations.

A TCP connection has an incomming and outgoing communication line (RC and TX). The idea of bidi-thrift is to split RS and TX and provide these to a server(processor) and client(remote) on both client-application and server-application. I found this to be hard to do in Go. Also, this way there is no "response" possible (the response line is in use). Therefore, all methods in the service's must be "oneway void". (fire and forget, call gives no result).

The solution

I changed the idea of bidi-thrift and made the client open two connections to the server, A and B. The first connection(A) is used to perform client -> server communication (where client makes the calls, as usual). The second connection(B) is 'hijacked', and connected to a server(processor) on the client, while it is connected to a client(remote) on the server. I've got this working with a Go server and a Java client. It works very well. It's fast and reliable (just like normal thrift is).

Some sources.. The B connection (server->client) is set up like this:

Go server

// factories
framedTransportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory())
protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()

// create socket listener
addr, err := net.ResolveTCPAddr(&quot;tcp&quot;, &quot;127.0.0.1:9091&quot;)
if err != nil {
	log.Print(&quot;Error resolving address: &quot;, err.Error(), &quot;\n&quot;)
	return
}
serverTransport, err := thrift.NewTServerSocketAddr(addr)
if err != nil {
	log.Print(&quot;Error creating server socket: &quot;, err.Error(), &quot;\n&quot;)
	return
}

// Start the server to listen for connections
log.Print(&quot;Starting the server for B communication (server-&gt;client) on &quot;, addr, &quot;\n&quot;)
err = serverTransport.Listen()
if err != nil {
	log.Print(&quot;Error during B server: &quot;, err.Error(), &quot;\n&quot;)
	return //err
}

// Accept new connections and handle those
for {
	transport, err := serverTransport.Accept()
	if err != nil {
		return //err
	}
	if transport != nil {
		// Each transport is handled in a goroutine so the server is availiable again.
		go func() {
			useTransport := framedTransportFactory.GetTransport(transport)
			client := worldclient.NewWorldClientClientFactory(useTransport, protocolFactory)
			
			// Thats it!
			// Lets do something with the connction
			result, err := client.Hello()
			if err != nil {
				log.Printf(&quot;Errror when calling Hello on client: %s\n&quot;, err)
			}

			// client.CallSomething()
		}()
	}
}

Java client

// preparations for B connection
TTransportFactory transportFactory = new TTransportFactory();
TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
YourServiceProcessor processor = new YourService.Processor&lt;YourServiceProcessor&gt;(new YourServiceProcessor(this));


/* Create thrift connection for B calls (server -&gt; client) */
try {
	// create the transport
	final TTransport transport = new TSocket(&quot;127.0.0.1&quot;, 9091);
	
	// open the transport
	transport.open();
	
	// add framing to the transport layer
	final TTransport framedTransport = new TFramedTransport(transportFactory.getTransport(transport));
	
	// connect framed transports to protocols
	final TProtocol protocol = protocolFactory.getProtocol(framedTransport);
	
	// let the processor handle the requests in new Thread
	new Thread() {
		public void run() {
			try {
				while (processor.process(protocol, protocol)) {}
			} catch (TException e) {
				e.printStackTrace();
			} catch (NullPointerException e) {
				e.printStackTrace();
			}
		}
	}.start();
} catch(Exception e) {
    e.printStackTrace();
}

答案2

得分: 2

我遇到了实现它的<a href="https://github.com/cenkalti/rpc2">rpc2</a>。一个例子:

Server.go

// server.go
package main

import (
 "net"
 "github.com/cenkalti/rpc2"
 "fmt"
)

type Args struct{ A, B int }
type Reply int


func main(){
     srv := rpc2.NewServer()
     srv.Handle("add", func(client *rpc2.Client, args *Args, reply *Reply) error{
    // Reversed call (server to client)
    var rep Reply
    client.Call("mult", Args{2, 3}, &rep)
    fmt.Println("mult result:", rep)

    *reply = Reply(args.A + args.B)
    return nil
 })

 lis, _ := net.Listen("tcp", "127.0.0.1:5000")
 srv.Accept(lis)
}

Client.go

// client.go
package main

import (
 "fmt"
 "github.com/cenkalti/rpc2"
 "net"
)

type Args struct{ A, B int }
type Reply int

func main(){
     conn, _ := net.Dial("tcp", "127.0.0.1:5000")

     clt := rpc2.NewClient(conn)
     clt.Handle("mult", func(client *rpc2.Client, args *Args, reply *Reply) error {
    *reply = Reply(args.A * args.B)
    return nil
   })
   go clt.Run()

    var rep Reply
    clt.Call("add", Args{5, 2}, &rep)
    fmt.Println("add result:", rep)
   }
英文:

I came across <a href="https://github.com/cenkalti/rpc2">rpc2</a> which implements it. An example:

Server.go

// server.go
package main

import (
 &quot;net&quot;
 &quot;github.com/cenkalti/rpc2&quot;
 &quot;fmt&quot;
)

type Args struct{ A, B int }
type Reply int


func main(){
     srv := rpc2.NewServer()
     srv.Handle(&quot;add&quot;, func(client *rpc2.Client, args *Args, reply *Reply) error{
    // Reversed call (server to client)
    var rep Reply
    client.Call(&quot;mult&quot;, Args{2, 3}, &amp;rep)
    fmt.Println(&quot;mult result:&quot;, rep)

    *reply = Reply(args.A + args.B)
    return nil
 })

 lis, _ := net.Listen(&quot;tcp&quot;, &quot;127.0.0.1:5000&quot;)
 srv.Accept(lis)
}

Client.go

// client.go
package main

import (
 &quot;fmt&quot;
 &quot;github.com/cenkalti/rpc2&quot;
 &quot;net&quot;
)

type Args struct{ A, B int }
type Reply int

func main(){
     conn, _ := net.Dial(&quot;tcp&quot;, &quot;127.0.0.1:5000&quot;)

     clt := rpc2.NewClient(conn)
     clt.Handle(&quot;mult&quot;, func(client *rpc2.Client, args *Args, reply *Reply) error {
    *reply = Reply(args.A * args.B)
    return nil
   })
   go clt.Run()

    var rep Reply
    clt.Call(&quot;add&quot;, Args{5, 2}, &amp;rep)
    fmt.Println(&quot;add result:&quot;, rep)
   }

答案3

得分: 1

RPC是一种(远程)服务。每当某台计算机请求远程服务时,它就充当客户端,请求服务器提供服务。在这个“定义”中,服务器调用客户端的RPC概念没有明确定义的含义。

英文:

RPC is a (remote) service. Whenever some computer requests a remote service then it is acting as a client asking the server to provide the service. Within this "definition" the concept of a server calling client RPC has no well defined meaning.

huangapple
  • 本文由 发表于 2012年11月1日 03:15:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/13166004.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定