英文:
Is there a way to stop old observable and set new after event in Nest Gateways?
问题
我正在开发一个简单的消息服务器应用,使用WebSockets和NestJs gateways。服务器接收一个事件来标识一个具有名称的监听器,然后将WebSocket更新发送给该监听器。
目前,我正在使用主题/可观察的方法,服务器在listen-as
事件之后返回一个新的可观察对象,该对象过滤了所有即将到来的消息:
export class ListenAsDto {
name: string;
}
export interface PersonalisedMessage {
from: string;
title: string;
content: string;
}
export class MessagesService {
// ...
listenAs(listener: ListenAsDto): Observable<WsResponse<PersonalisedMessage>> {
return this.messageObservable.pipe(
filter((item) => item.to === listener.name),
map((item) => {
const copy = { ...item };
delete copy.to;
return { event: INBOX_MESSAGE_NAME, data: copy };
}),
);
}
}
但这只对第一次有效,在另一个请求后,客户端现在同时接收来自2个用户的更新。
是否有一种方法可以停止当前用户的先前可观察对象并开始新的可观察对象?
编辑:我正在使用ws
平台。
英文:
I'm developing a simple message server app, using WebSockets and NestJs gateways. Server receives an event to indentify a listener with name, and then sends websocket updates to that listener.
Right now I'm using subject/observable approach, server after listen-as
event returns a new observable, which filters all upcoming messages:
export class ListenAsDto {
name: string;
}
export interface PersonalisedMessage {
from: string;
title: string;
content: string;
}
export class MessagesService {
// ...
listenAs(listener: ListenAsDto): Observable<WsResponse<PersonalisedMessage>> {
return this.messageObservable.pipe(
filter((item) => item.to === listener.name),
map((item) => {
const copy = { ...item };
delete copy.to;
return { event: INBOX_MESSAGE_NAME, data: copy };
}),
);
}
}
But this only works for the first time, after another request client now receives both updates from 2 users at the time.
Is there a way to stop previous observable for current user an start new?
EDIT: I'm using ws
platform
答案1
得分: 0
使用本地的 WebSocket
实例找到了一个解决方案。如果是同一客户端发出的请求,它会传递相同的对象。因此,我们可以使用 WeakMap
手动跟踪所有订阅对象,并在获取相同的 WebSocket
客户端时取消订阅它们。
export class MessagesService {
private subs = new WeakMap<WebSocket, Subscription>();
listenAs(ip: WebSocket, dto: ListenAsDto) {
if (this.subs.has(ip)) this.subs.get(ip).unsubscribe();
// 'createObservable' 实际上是问题中的 'listenAs' 函数
const obs = this.createObservable(dto);
this.subs.set(
ip,
obs.subscribe({
next(value) {
ip.send(JSON.stringify(value));
},
}),
);
return dto;
}
}
英文:
Found a solution by using a native WebSocket
instance. It passes the same object if request was made by same client. So, using a WeakMap
, we can manually track all subscriptions objects, and unsubscribe them, if we get the same WebSocket
client
export class MessagesService {
private subs = new WeakMap<WebSocket, Subscription>();
listenAs(ip: WebSocket, dto: ListenAsDto) {
if (this.subs.has(ip)) this.subs.get(ip).unsubscribe();
// 'createObservable' is basically question`s 'listenAs' function
const obs = this.createObservable(dto);
this.subs.set(
ip,
obs.subscribe({
next(value) {
ip.send(JSON.stringify(value));
},
}),
);
return dto;
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论