英文:
How to provide Rails websocket to an existing client (any generic message)?
问题
经过尝试,我还没有完全理解Rails中的魔法值,以便提供WebSocket接口。客户端代码已知可以工作,因为已经在NodeJS和FastAPI接口上进行了测试。据我理解,Rails服务器正确处理WS事件的方式是通过最后代码片段中实现的send_msg
方法,但该方法应该如何调用呢?似乎为了调用send_msg
方法,我需要修改客户端代码以使用Rails提供的JS库(如这里),这是不可能的。
正如标题所说,问题是如何创建一个简单(但通用?)的WS消息接收器/广播器。
WebSocket接口应该如何工作:
- 在/clock上有一个ws端点
- 客户端可以连接到/clock
- 当客户端向/clock发送一个数据为'requestTime'的WS消息时,API会将服务器系统时间广播给所有连接的客户端
- 无法更改客户端代码
客户端尝试连接和请求时间(NodeJS):
(连接X个客户端并广播Y次时间)
以下是NodeJS代码示例:
// 请注意,这部分代码是客户端NodeJS代码示例,它连接到服务器并请求时间。
// 请查看原始消息以获取完整的代码示例。
以下是NodeJS中一个可行的WebSocket响应示例,基本上这是需要在Rails中复制的内容:
// 这是NodeJS中一个可行的WebSocket响应器示例,基本上这是需要在Rails中复制的内容。
我目前实现了以下内容:
在routes.rb
中添加以下内容,假设它将所有WS事件定向到路径/clock,即ClocksChannel
:
# 在routes.rb中添加以下内容,将所有WS事件定向到路径/clock,即ClocksChannel
card_controller.rb
的主要内容如下:
# card_controller.rb的主要内容如下:
实现了ClocksChannel
,假设它订阅和取消订阅客户端。至于如何调用send_msg
,我不清楚它应该如何被调用:
# 实现了ClocksChannel,假设它订阅和取消订阅客户端。至于如何调用send_msg,我不清楚它应该如何被调用。
当服务器按照给定的设置接收到连接时,Rails库内部将输出以下内容:
# 当服务器按照给定的设置接收到连接时,Rails库内部将输出以下内容
注意:这些代码示例是NodeJS和Rails之间WebSocket通信的关键部分。如果你有特定的问题或需要更多的指导,可以提出具体的问题,我会尽力帮助你。
英文:
After trying different things in an attempt to get this working, I haven't managed to yet wrap my head around Rails' magical values in order to provide a websocket interface. Client code is known to work, as it has been already tested with NodeJS and FastAPI interfaces. To my understanding the correct way for the Rails server to read/respond to WS events is through the implemented 'send_msg' method in the last code snippet, but how should the method be called? It seems that in order to call the send_msg method I would have to modify the client code to use a JS library (as in here) provided by Rails, which is not possible.
As the title says, the question would be how to create a simple (but generic?) WS message receiver/broadcaster?
How the websocket interface should work
- Have a ws: endpoint at /clock
- Client can connect to /clock
- When a client sends a WS message with the data 'requestTime' to /clock, the API broadcasts server system time to all connected clients
- Client code cannot be altered
How client attempts to connect and request time (NodeJS)
(connect X clients and broadcast time Y times)
import async from "async";
import fetch from "node-fetch";
import fs from "fs";
import ws from "ws";
// client that only counts the received messages
function wsFunction(requestCount) {
return resolve => {
let limit = requestCount;
// construct all promises
const client = new ws("ws://localhost:3000/clock");
client.on('message', data => {
if(--limit == 0) {
client.close();
}
});
client.on('close', () => {
if(limit > 0) {
console.log("Socket closed prematurely... ", limit);
}
});
client.on('open', () => {
resolve(client); // client connected
});
const close = () => {
if(client.readyState !== ws.CLOSED && client.readyState !== ws.CLOSING) {
client.close();
}
}
}
}
/**
*
* @param {*} limit
* @returns operation time for limit messages, or -1 if connection is cut
*/
function attemptClockFetches(clientCount, retrieveCount) {
const clients = [];
for(let i = 0; i < clientCount - 1; ++i) {
clients.push(async () => new Promise(wsFunction(retrieveCount)));
}
// client that initiates the broadcast
const promise = new Promise(async resolve => {
const startTime = performance.now();
const sockets = await async.parallel(clients); // connect all clients
// create updater client
const client = new ws("ws://localhost:3000/clock");
// now update until limit is reached
client.on('close', () => {
if(retrieveCount > 0) {
console.log("Parent socket closed prematurely...");
}
});
client.on('message', () => {
if(--retrieveCount > 0) {
client.send("requestTime");
} else {
client.close();
const endTime = performance.now();
// close all sockets
for(let s of sockets) {
s.close();
}
resolve(endTime - startTime);
}
});
client.on('open', () => {
client.send("requestTime");
});
});
return promise;
}
async function doStressTest() {
await attemptClockFetches(10, 10);
}
const i = setInterval(() => {
// prevent node from killing process
}, 1000);
doStressTest().then(() => {
clearInterval(i);
});
A snippet of a working NodeJS WebSocket responder, essentially this is what needs to be replicated in Rails
const wsServer = new ws.WebSocketServer({ server: server, path: "/clock" });
wsServer.on('connection', socket => {
socket.on('error', err => {
console.error(err);
});
socket.on('message', data => {
if(data.toString() === "requestTime") {
// broadcast time on requestTime event to all clients
wsServer.clients.forEach(client => {
if(client.readyState === ws.OPEN) {
client.send((new Date()).getMilliseconds());
}
});
}
});
});
What I currently have implemented
I've added this to routes.rb, assuming that it directs all WS events to path /clock which is ClocksChannel
Rails.application.routes.draw do
get '/users/:userId/cards', to: 'card#index'
# get '/clock', to: 'card#clock' <- ADDING THIS MAKES RAILS RESPOND IN HTTP EVEN THOUGH USING WebSocket PROTOCOL
mount ActionCable.server => '/clock'
end
Contents of the main card_controller.rb
class CardController < ApplicationController
def index
# do some index things, not part of WS
end
# def clock
# render "Hello World"
# end
end
Implemented this channel, assuming that it subscribes and unsubscribes the clients. As for calling send_msg, I don't have a clear understanding as to how it should be called
require "time"
class ClocksChannel < ApplicationCable::Channel
def subscribed
# stream_from "some_channel"
end
def unsubscribed
# Any cleanup needed when channel is unsubscribed
end
def send_msg(data)
if data == "requestTime"
ActionCable.server.broadcast "requestTime", message: (Time.now.to_f * 1000).to_i
end
end
end
When the server receives a connection with the given setup, the following output is given from within the Rails libraries:
Started GET "/clock" for 127.0.0.1 at 2023-03-09 20:12:29 +0200
Started GET "/clock/" [WebSocket] for 127.0.0.1 at 2023-03-09 20:12:29 +0200
Successfully upgraded to WebSocket (REQUEST_METHOD: GET, HTTP_CONNECTION: Upgrade, HTTP_UPGRADE: websocket)
There was an exception - JSON::ParserError(859: unexpected token at 'requestTime')
There was an exception - JSON::ParserError(859: unexpected token at 'requestTime')
C:/Ruby31-x64/lib/ruby/3.1.0/json/common.rb:216:in `parse'
C:/Ruby31-x64/lib/ruby/3.1.0/json/common.rb:216:in `parse'
C:/Ruby31-x64/lib/ruby/gems/3.1.0/gems/activesupport-7.0.4.2/lib/active_support/json/decoding.rb:23:in `decode'
C:/Ruby31-x64/lib/ruby/gems/3.1.0/gems/actioncable-7.0.4.2/lib/action_cable/connection/base.rb:168:in `decode'
C:/Ruby31-x64/lib/ruby/gems/3.1.0/gems/actioncable-7.0.4.2/lib/action_cable/connection/base.rb:89:in `dispatch_websocket_message'
C:/Ruby31-x64/lib/ruby/gems/3.1.0/gems/actioncable-7.0.4.2/lib/action_cable/server/worker.rb:59:in `block in invoke'
C:/Ruby31-x64/lib/ruby/gems/3.1.0/gems/activesupport-7.0.4.2/lib/active_support/callbacks.rb:118:in `block in run_callbacks'
...
...
答案1
得分: 0
看起来这在ApplicationCable中无法实现,因为它要求数据格式为JSON。为了解决这个问题,我进行了以下更改:
删除问题上方显示的所有先前代码,因为它不再需要/有效
在此之后添加gem 'faye-websocket'
,然后运行bundle install
使用Faye创建控制器:
class ClockController < ApplicationController
@@clients = []
def connect
if Faye::WebSocket.websocket?(request.env)
ws = Faye::WebSocket.new(request.env)
ws.on :open do |event|
# 当websocket连接打开时执行的代码
@@clients << ws
end
ws.on :message do |event|
if event.data == 'requestTime'
now = Time.now
@@clients.each do |client|
client.send(now.to_s)
end
end
end
ws.on :close do |event|
# 当websocket连接关闭时执行的代码
@@clients.delete(ws)
ws = nil
end
# 返回异步Rack响应
return ws.rack_response
else
# 返回常规HTTP响应
render text: '不是websocket请求'
end
end
end
在routes.rb中为控制器添加路由:
get '/clock', to: 'clock#connect', via: :all
这个解决方案可能不是所有情况下都完美,但足够好地完成了任务。
英文:
It seems this is not doable with ApplicationCable, as it requires the data to be in JSON format. In order to fix this I did the following changes:
Remove all previous code displayed in the question above, as it's no longer necessary/valid
add gem 'faye-websocket'
after which run bundle install
Create the controller using Faye:
class ClockController < ApplicationController
@@clients = []
def connect
if Faye::WebSocket.websocket?(request.env)
ws = Faye::WebSocket.new(request.env)
ws.on :open do |event|
# Code to execute when the websocket connection is opened
@@clients << ws
end
ws.on :message do |event|
if event.data == 'requestTime'
now = Time.now
@@clients.each do |client|
client.send(now.to_s)
end
end
end
ws.on :close do |event|
# Code to execute when the websocket connection is closed
@@clients.delete(ws)
ws = nil
end
# Return async Rack response
return ws.rack_response
else
# Return regular HTTP response
render text: 'Not a websocket request'
end
end
end
Add the route to the controller in routes.rb
get '/clock', to: 'clock#connect', via: :all
This solution may not be perfect in all cases, but it gets the job done well enough
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论