英文:
Buffer size in Erlang / Golang port example
问题
我有一个简单的 Erlang 到 Golang 的端口示例,将数据从 Erlang 传递到 Golang 并回显响应。
问题是我可以传输的数据量似乎被限制为 2^8 字节(见下文)。我原以为问题可能在 Golang 方面(没有创建足够大的缓冲区),但是用 bufio.NewReaderSize 替换 bufio.NewReader 并没有起作用。所以现在我认为问题可能在 Erlang 方面。
我需要做什么来增加缓冲区大小/能够回显大于 2^8 字节的消息?
谢谢!
justin@justin-ThinkPad-X240:~/work/erlang_golang_port$ erl -pa ebin
Erlang/OTP 17 [erts-6.4.1] [source] [64-bit] [smp:4:4] [async-threads:10] [kernel-poll:false]
Eshell V6.4.1 (abort with ^G)
1> port:start("./echo").
<0.35.0>
2> port:ping(65000).
65000
3> port:ping(66000).
** exception error: bad argument
in function port:call_port/1 (port.erl, line 20)
4> port:start("./echo").
<0.40.0>
5> port:ping(66000).
65536
Go
package main
import (
"bufio"
"os"
)
const Delimiter = '\n'
func main() {
// reader := bufio:NewReader(os.Stdin)
reader := bufio.NewReaderSize(os.Stdin, 1677216) // 2**24;
bytes, _ := reader.ReadBytes(Delimiter)
os.Stdout.Write(bytes[:len(bytes)-1])
}
Erlang
-module(port).
-export([start/1, stop/0, init/1]).
-export([ping/1]).
-define(DELIMITER, [10]).
start(ExtPrg) ->
spawn(?MODULE, init, [ExtPrg]).
stop() ->
myname ! stop.
ping(N) ->
Msg=[round(65+26*random:uniform()) || _ <- lists:seq(1, N)],
call_port(Msg).
call_port(Msg) ->
myname ! {call, self(), Msg},
receive
{myname, Result} ->
length(Result)
end.
init(ExtPrg) ->
register(myname, self()),
process_flag(trap_exit, true),
Port = open_port({spawn, ExtPrg}, []),
loop(Port).
loop(Port) ->
receive
{call, Caller, Msg} ->
Port ! {self(), {command, Msg++?DELIMITER}},
receive
{Port, {data, Data}} ->
Caller ! {myname, Data}
end,
loop(Port);
stop ->
Port ! {self(), close},
receive
{Port, closed} ->
exit(normal)
end;
{'EXIT', Port, _Reason} ->
exit(port_terminated)
end.
英文:
I have a crude Erlang-to-Golang port example, passing data from Erlang to Golang and echoing the response.
Problem is the amount of data I can transfer seems to be limited to 2^8 bytes (see below). I thought the problem was probably on the Golang side (not creating a big enough buffer) but replacing bufio.NewReader with bufio.NewReaderSize didn't work. So am now thinking the problem is maybe on the Erlang side.
What do I need to do to increase the buffer size / be able to echo a message larger than 2^8 bytes ?
TIA
justin@justin-ThinkPad-X240:~/work/erlang_golang_port$ erl -pa ebin
Erlang/OTP 17 [erts-6.4.1] [source] [64-bit] [smp:4:4] [async-threads:10] [kernel-poll:false]
Eshell V6.4.1 (abort with ^G)
1> port:start("./echo").
<0.35.0>
2> port:ping(65000).
65000
3> port:ping(66000).
** exception error: bad argument
in function port:call_port/1 (port.erl, line 20)
4> port:start("./echo").
<0.40.0>
5> port:ping(66000).
65536
Go
package main
import (
"bufio"
"os"
)
const Delimiter = '\n'
func main() {
// reader := bufio:NewReader(os.Stdin)
reader := bufio.NewReaderSize(os.Stdin, 1677216) // 2**24;
bytes, _ := reader.ReadBytes(Delimiter)
os.Stdout.Write(bytes[:len(bytes)-1])
}
Erlang
-module(port).
-export([start/1, stop/0, init/1]).
-export([ping/1]).
-define(DELIMITER, [10]).
start(ExtPrg) ->
spawn(?MODULE, init, [ExtPrg]).
stop() ->
myname ! stop.
ping(N) ->
Msg=[round(65+26*random:uniform()) || _ <- lists:seq(1, N)],
call_port(Msg).
call_port(Msg) ->
myname ! {call, self(), Msg},
receive
{myname, Result} ->
length(Result)
end.
init(ExtPrg) ->
register(myname, self()),
process_flag(trap_exit, true),
Port = open_port({spawn, ExtPrg}, []),
loop(Port).
loop(Port) ->
receive
{call, Caller, Msg} ->
Port ! {self(), {command, Msg++?DELIMITER}},
receive
{Port, {data, Data}} ->
Caller ! {myname, Data}
end,
loop(Port);
stop ->
Port ! {self(), close},
receive
{Port, closed} ->
exit(normal)
end;
{'EXIT', Port, _Reason} ->
exit(port_terminated)
end.
答案1
得分: 2
如果你使用start_link
,你会发现在第一条命令之后端口会崩溃:
1> port:start('go run port.go').
<0.118.0>
2> port:ping(65000).
65000
** exception error: port_terminated
如果你将Go代码更改为在循环中运行,可以避免这个崩溃:
func main() {
for {
// reader := bufio:NewReader(os.Stdin)
reader := bufio.NewReaderSize(os.Stdin, 1677216) // 2**24;
bytes, _ := reader.ReadBytes(Delimiter)
os.Stdout.Write(bytes[:len(bytes)-1])
}
}
现在我们可以看到另一个有趣的结果:
33> c(port).
{ok,port}
40> port:ping(66000).
65536
41> port:ping(66000).
464
42> port:ping(66000).
65536
43> port:ping(66000).
464
现在我们可以看到实际上没有丢失任何数据,它只是在端口中缓冲。由于你没有指定一个帧协议(使用{packet, N}
或{line, N}
),你需要自己负责收集数据。似乎Erlang端口的内部缓冲区大小为64K(尽管我没有找到相关文档和更改它的方法)。
如果你将接收数据的方式更改为在返回之前获取所有数据,你将每次获得每个字节:
loop(Port) ->
receive
{call, Caller, Msg} ->
Port ! {self(), {command, Msg++?DELIMITER}},
Caller ! {myname, receive_all(Port, 10)},
loop(Port);
stop ->
Port ! {self(), close},
receive
{Port, closed} ->
exit(normal)
end;
{'EXIT', Port, _Reason} ->
exit(port_terminated)
end.
receive_all(Port, Timeout) -> receive_all(Port, Timeout, []).
receive_all(Port, Timeout, Data) ->
receive
{Port, {data, New}} ->
receive_all(Port, Timeout, [New|Data])
after Timeout ->
lists:flatten(lists:reverse(Data))
end.
运行这个代码,我们得到:
1> c(port).
{ok,port}
2>
3> port:start('go run port.go').
<0.311.0>
4> port:ping(66000).
66000
5> port:ping(66000).
66000
6> port:ping(66000).
66000
英文:
If you use start_link
instead, you'll see that the port crashes after the first command:
<!-- language: erlang -->
1> port:start('go run port.go').
<0.118.0>
2> port:ping(65000).
65000
** exception error: port_terminated
If you change the Go code to run in a loop, this crash can be avoided:
<!-- language: go -->
func main() {
for {
// reader := bufio:NewReader(os.Stdin)
reader := bufio.NewReaderSize(os.Stdin, 1677216) // 2**24;
bytes, _ := reader.ReadBytes(Delimiter)
os.Stdout.Write(bytes[:len(bytes)-1])
}
}
Now we can see another interesting result:
<!-- language: erlang -->
33> c(port).
{ok,port}
40> port:ping(66000).
65536
41> port:ping(66000).
464
42> port:ping(66000).
65536
43> port:ping(66000).
464
Now we can see that no data is actually lost, it's just buffered in the port. Since you have not specified a framing protocol (using {packet, N}
or {line, N}
you are responsible yourself for collecting the data. It also seems that the internal buffer size of an Erlang port is 64K (although I found no documentation of this and no way to change it).
If you change your receive to get all data before returning, you'll every byte each time:
<!-- language: erlang -->
loop(Port) ->
receive
{call, Caller, Msg} ->
Port ! {self(), {command, Msg++?DELIMITER}},
Caller ! {myname, receive_all(Port, 10)},
loop(Port);
stop ->
Port ! {self(), close},
receive
{Port, closed} ->
exit(normal)
end;
{'EXIT', Port, _Reason} ->
exit(port_terminated)
end.
receive_all(Port, Timeout) -> receive_all(Port, Timeout, []).
receive_all(Port, Timeout, Data) ->
receive
{Port, {data, New}} ->
receive_all(Port, Timeout, [New|Data])
after Timeout ->
lists:flatten(lists:reverse(Data))
end.
Running this, we get:
<!-- language: erlang -->
1> c(port).
{ok,port}
2>
3> port:start('go run port.go').
<0.311.0>
4> port:ping(66000).
66000
5> port:ping(66000).
66000
6> port:ping(66000).
66000
答案2
得分: 1
-
2^8是256,不是65536,65536是2^16(或2字节)。
-
要排除golang程序,你可以简单地将你的
echo
替换为GNU的cat
。 -
端口通信的默认消息最大大小为64k,所以当你的端口接收消息时,第一个消息是字符串的前64k部分。你可以再次读取端口以获取剩余的数据,但你在代码中只是丢弃它们。
-
如果你真的想要使用基于行的协议进行通信,你应该根据文档配置你的端口:
{line, L}
消息按行传递。每行(由操作系统特定的换行序列分隔)以一个单独的消息传递。消息数据格式为{Flag, Line},其中Flag是eol或noeol,Line是实际传递的数据(不包括换行序列)。
L指定行的最大长度(以字节为单位)。超过此长度的行将以多个消息传递,Flag除了最后一个消息外,都设置为noeol。如果在除了紧接换行序列之外的任何地方遇到文件结束,最后一行也将以Flag设置为noeol的方式传递。在所有其他情况下,行都以Flag设置为eol的方式传递。
{packet, N}
和{line, L}
设置是互斥的。所以你的代码应该是:
Port = open_port({spawn, ExtPrg}, [{line, ?PACKET_SIZE}]),
%%...
{call, Caller, Msg} ->
Port ! {self(), {command, Msg++?DELIMITER}},
D = read_data(Port, []),
Caller ! {myname, D},
loop(Port);
%%...
read_data(Port, Prefix) ->
receive
{Port, {data, {noeol, Data}}} ->
read_data(Port, Prefix ++ Data);
{Port, {data, {eol, Data}}} ->
Prefix ++ Data
end.
英文:
- 2^8 is 256, not 65536 which is 2^16 (or 2 bytes).
- For excluding golang program you can simply replace your
echo
with GNUcat
- Default message max size for port communication is 64k, so when your port receives messages, the first one is leading 64k of the string. You can read port again to gain remaining data but you just drop them in your code.
- If you really want to communicate on line-based protocol you should configure your port accordingly:
> {line, L}
> Messages are delivered on a per line basis. Each line
> (delimited by the OS-dependent newline sequence) is delivered in one
> single message. The message data format is {Flag, Line}, where Flag is
> either eol or noeol and Line is the actual data delivered (without the
> newline sequence).
>
> L
specifies the maximum line length in bytes. Lines longer than this
> will be delivered in more than one message, with the Flag set to noeol
> for all but the last message. If end of file is encountered anywhere
> else than immediately following a newline sequence, the last line will
> also be delivered with the Flag set to noeol. In all other cases,
> lines are delivered with Flag set to eol.
>
> The {packet, N}
and {line, L}
settings are mutually exclusive.
So your code would be
Port = open_port({spawn, ExtPrg}, [{line, ?PACKET_SIZE]),
%%...
{call, Caller, Msg} ->
Port ! {self(), {command, Msg++?DELIMITER}},
D = read_data(Port, []),
Caller ! {myname, D},
loop(Port);
%%...
read_data(Port, Prefix) ->
receive
{Port, {data, {noeol, Data}}} ->
read_data(Port, Prefix ++ Data);
{Port, {data, {eol, Data}}} ->
Prefix ++ Data
end.
答案3
得分: 1
我一直在努力解决类似的问题。
这是pipe模块的完整代码。
它允许将文本数据发送到端口并读取所有回复。
-module(apr_pipe).
-export([open_pipe/2, send/2, close/1]).
-export([loop/1, status/1, init/1]).
-include_lib("kernel/include/logger.hrl").
-define(MAX_LINE_LEN, 4096).
open_pipe(Path, Cmd) ->
State = #{path => Path, cmd => Cmd},
Pid = spawn(?MODULE, init, [State]),
Pid.
init(State) ->
#{path := Path, cmd := Cmd} = State,
FullFn = filename:join(Path, Cmd),
Settings = [{line, ?MAX_LINE_LEN}, use_stdio, stderr_to_stdout, hide, binary, exit_status],
Port = erlang:open_port({spawn_executable, FullFn}, Settings),
State2 = State#{port => Port, data => #{}},
loop(State2).
send(Pid, Data) -> Pid ! {self(), send, Data}.
close(Pid) -> Pid ! {self(), send, close}.
status(Pid) -> Pid ! {self(), status}.
get_eol() -> <<"\n">>.
loop(State) ->
receive
{_Pid, send, close} ->
?LOG(notice, "got cmd: Close", []),
Port = maps:get(port, State),
port_close(Port),
exit(normal);
{Pid, send, Data} ->
?LOG(notice, "Send Data ...", []),
Port = maps:get(port, State),
port_command(Port, Data),
port_command(Port, get_eol()),
State2 = State#{status => data_sent, client => Pid},
loop(State2);
{Pid, status} ->
Port = maps:get(port, State),
?LOG(notice, "Status: Port: ~p State: ~p", [Port, State]),
Pid ! {status, Port, State},
loop(State);
% port messages.
{Port, {data, {noeol, Data}}} ->
?LOG(notice, "Port: ~p Data: ~p", [Port, Data]),
CurData = maps:get(cur_data, State, []),
State2 = State#{cur_data => [Data | CurData]},
loop(State2);
{Port, {data, {eol, Data}}} ->
?LOG(notice, "Port: ~p Data: ~p", [Port, Data]),
CurData = [Data | maps:get(cur_data, State, [])],
CurData2 = lists:reverse(CurData),
Reply = list_to_binary(CurData2),
Client = maps:get(client, State, undefined),
State2 = State#{cur_data => [], client => undefined},
case Client of
undefined -> ?LOG(error, "can not sent reply. Client: ~p Reply: ~p", [Client, Reply]),
loop(State2);
_ -> Client ! {reply, Reply},
loop(State2)
end;
{_Port, closed} ->
?LOG(warning, "Port: ~p closed", []),
exit(normal);
{'EXIT', Port, Reason} ->
?LOG(notice, "Port: ~p exit. Reason: ~p", [Port, Reason]),
exit(Reason);
_Other -> ?LOG(error, "unexpected message: ~p", [_Other]),
exit({error, {unexpected_message, _Other}})
end.
以上是代码的翻译结果。
英文:
I have been struggling with the similar problem.
Here the complete code of pipe module.
It allows sent text data to port and read all replies.
-module(apr_pipe).
-export([open_pipe/2,send/2,close/1]).
-export([loop/1,status/1,init/1]).
-include_lib("kernel/include/logger.hrl").
-define(MAX_LINE_LEN,4096).
open_pipe(Path,Cmd) ->
State = #{path => Path, cmd => Cmd},
Pid = spawn(?MODULE,init,[State]),
Pid.
init(State) ->
#{path := Path,cmd := Cmd} = State,
FullFn = filename:join(Path,Cmd),
Settings = [{line,?MAX_LINE_LEN},use_stdio,stderr_to_stdout,hide,binary,exit_status],
Port = erlang:open_port({spawn_executable,FullFn},Settings),
State2 = State#{port => Port, data => #{}},
loop(State2).
send(Pid,Data) -> Pid!{self(),send,Data}.
close(Pid) -> Pid!{self(),send,close}.
status(Pid) -> Pid!{self(),status}.
get_eol() -> <<"\n">>.
loop(State) ->
receive
{_Pid,send,close} ->
?LOG(notice,"got cmd: Close",[]),
Port = maps:get(port,State),
port_close(Port),
exit(normal);
{Pid,send,Data} ->
?LOG(notice,"Send Data ...",[]),
Port = maps:get(port,State),
port_command(Port,Data),
port_command(Port,get_eol()),
State2 = State#{status => data_sent, client => Pid},
loop(State2);
{Pid,status} ->
Port = maps:get(port,State),
?LOG(notice,"Status: Port: ~p State: ~p",[Port,State]),
Pid!{status,Port,State},
loop(State);
% port messages.
{Port, {data,{noeol,Data}}} ->
?LOG(notice,"Port: ~p Data: ~p",[Port,Data]),
CurData = maps:get(cur_data,State,[]),
State2 = State#{cur_data => [Data | CurData]},
loop(State2);
{Port, {data, {eol,Data}}} ->
?LOG(notice,"Port: ~p Data: ~p",[Port,Data]),
CurData = [Data | maps:get(cur_data,State,[])],
CurData2 = lists:reverse(CurData),
Reply = list_to_binary(CurData2),
Client = maps:get(client,State,undefined),
State2 = State#{cur_data => [], client => undefined},
case Client of
undefined -> ?LOG(error,"can not sent reply. Client: ~p Reply: ~p", [Client,Reply]),
loop(State2);
_ -> Client!{reply,Reply},
loop(State2)
end;
{_Port, closed} ->
?LOG(warning, "Port: ~p closed",[]),
exit(normal);
{'EXIT', Port, Reason} ->
?LOG(notice,"Port: ~p exit. Reason: ~p",[Port,Reason]),
exit(Reason);
_Other -> ?LOG(error,"unexpected message: ~p",[_Other]),
exit({error,{unexpected_message,_Other}})
end.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论