Is there a way to stop old observable and set new after event in Nest Gateways?

huangapple go评论57阅读模式
英文:

Is there a way to stop old observable and set new after event in Nest Gateways?

问题

我正在开发一个简单的消息服务器应用,使用WebSockets和NestJs gateways。服务器接收一个事件来标识一个具有名称的监听器,然后将WebSocket更新发送给该监听器。

目前,我正在使用主题/可观察的方法,服务器在listen-as事件之后返回一个新的可观察对象,该对象过滤了所有即将到来的消息:

export class ListenAsDto {
  name: string;
}

export interface PersonalisedMessage {
  from: string;
  title: string;
  content: string;
}

export class MessagesService {
  // ...
  listenAs(listener: ListenAsDto): Observable<WsResponse<PersonalisedMessage>> {
    return this.messageObservable.pipe(
      filter((item) => item.to === listener.name),
      map((item) => {
        const copy = { ...item };
        delete copy.to;
        return { event: INBOX_MESSAGE_NAME, data: copy };
      }),
    );
  }
}

但这只对第一次有效,在另一个请求后,客户端现在同时接收来自2个用户的更新。

是否有一种方法可以停止当前用户的先前可观察对象并开始新的可观察对象?

编辑:我正在使用ws平台。

英文:

I'm developing a simple message server app, using WebSockets and NestJs gateways. Server receives an event to indentify a listener with name, and then sends websocket updates to that listener.

Right now I'm using subject/observable approach, server after listen-as event returns a new observable, which filters all upcoming messages:

export class ListenAsDto {
  name: string;
}

export interface PersonalisedMessage {
  from: string;
  title: string;
  content: string;
}

export class MessagesService {
  // ...
  listenAs(listener: ListenAsDto): Observable&lt;WsResponse&lt;PersonalisedMessage&gt;&gt; {
    return this.messageObservable.pipe(
      filter((item) =&gt; item.to === listener.name),
      map((item) =&gt; {
        const copy = { ...item };
        delete copy.to;
        return { event: INBOX_MESSAGE_NAME, data: copy };
      }),
    );
  }
}

But this only works for the first time, after another request client now receives both updates from 2 users at the time.

Is there a way to stop previous observable for current user an start new?

EDIT: I'm using ws platform

答案1

得分: 0

使用本地的 WebSocket 实例找到了一个解决方案。如果是同一客户端发出的请求,它会传递相同的对象。因此,我们可以使用 WeakMap 手动跟踪所有订阅对象,并在获取相同的 WebSocket 客户端时取消订阅它们。

export class MessagesService {
  private subs = new WeakMap<WebSocket, Subscription>();

  listenAs(ip: WebSocket, dto: ListenAsDto) {
    if (this.subs.has(ip)) this.subs.get(ip).unsubscribe();
    // 'createObservable' 实际上是问题中的 'listenAs' 函数
    const obs = this.createObservable(dto);
    this.subs.set(
      ip,
      obs.subscribe({
        next(value) {
          ip.send(JSON.stringify(value));
        },
      }),
    );

    return dto;
  }
}
英文:

Found a solution by using a native WebSocket instance. It passes the same object if request was made by same client. So, using a WeakMap, we can manually track all subscriptions objects, and unsubscribe them, if we get the same WebSocket client

export class MessagesService {
  private subs = new WeakMap&lt;WebSocket, Subscription&gt;();

  listenAs(ip: WebSocket, dto: ListenAsDto) {
    if (this.subs.has(ip)) this.subs.get(ip).unsubscribe();
    // &#39;createObservable&#39; is basically question`s &#39;listenAs&#39; function
    const obs = this.createObservable(dto);
    this.subs.set(
      ip,
      obs.subscribe({
        next(value) {
          ip.send(JSON.stringify(value));
        },
      }),
    );

    return dto;
  }
}

huangapple
  • 本文由 发表于 2023年6月1日 22:59:45
  • 转载请务必保留本文链接:https://go.coder-hub.com/76383257.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定