Simultaneous Writing and Reading of data using Array

huangapple go评论73阅读模式
英文:

Simultaneous Writing and Reading of data using Array

问题

我正在开发一个Chrome扩展,通过Socket.io使用WebSocket连接与远程API通信。API以字符串格式发送流数据给我。我的目标是将流数据保留在缓冲区中几秒钟,直到我从内容脚本接收到特定消息为止。一旦接收到消息"startStreaming",我想开始将数据流传递给内容脚本。

为了实现这一目标,我考虑使用一个缓冲数组来存储从服务器接收到的数据。例如:

let buffer = [];

socket.on('stream', (wordChunk) => {
    buffer.push(wordChunk);
});

if (msg.msg === 'startStreaming') {
    console.log('向选项卡发送响应');
    buffer.forEach(wordChunk => {
        port.postMessage({ msg: 'streamData', wordChunk });
    });
}

但我遇到的问题是确保以安全的方式同时进行数据写入和读取。由于数据可能会在从服务器连续传输的同时读取和处理缓冲区,我需要正确处理这种情况,以避免任何数据不一致。

我将非常感激有关如何有效处理此情况的任何建议或最佳实践,以确保按照先进先出(FIFO)顺序读取数据,避免竞态条件或数据损坏问题。

如果有其他方法可以替代数组,请给我建议。

场景
如果我们不将Chrome扩展放入背景中,可以这样考虑,我想在用户执行操作之前预取API响应。例如,当用户刚刚将鼠标悬停在按钮上时,我想从服务器开始获取数据并将其临时存储。当他点击按钮时立即显示响应,以使他感觉到速度非常快。因此,我们不知道用户何时点击按钮,只要他悬停或悬停几次按钮即可。

非常感谢您提前的帮助!

英文:

I am developing a Chrome extension that communicates with a remote API via a WebSocket connection using Socket.io. The API sends me streaming data in string format. My goal is to hold the streaming data in a buffer for a few seconds until I receive a specific message from the content script. Once the message "startStreaming" is received, I want to start streaming the data to the content script.

To achieve this, I am thinking to use a buffer array to store the incoming data from the server. E.g

let buffer = [];

socket.on('stream', (wordChunk) => {
    buffer.push(wordChunk);
});

if (msg.msg === 'startStreaming') {
    console.log('Send response back to Tab');
    buffer.forEach(wordChunk => {
        port.postMessage({ msg: 'streamData', wordChunk });
    });
}

But the problem I am facing is ensuring the simultaneous writing and reading of data in a safe manner. Since the data can be continuously streamed from the server while the buffer is being read and processed, I need to handle this situation properly to avoid any data inconsistencies.

I would greatly appreciate any suggestions or best practices on how to handle this scenario effectively, ensuring that the data is read in the FIFO (First-In, First-Out) order and avoiding any race conditions or data corruption issues.

Suggest me if there is any other way rather than array.

Scenario
If we don't take Chrome Extension to the context, think it in such a way that, I want to pre-fetch api response before a user's action. E.g I want to start fetching data from the server and store it temporarily when a user just hover over the button. And display the response instantly when he clicks on the button to make him feel that it is super fast. So we don't know when the user clicks on the button, as soon as he hovers or after few times of hovering the button.

Thank you in advance for your assistance!

答案1

得分: 1

JavaScript代码执行是单线程的。这意味着您的代码不会受到线程交错问题的影响,而socket.on监听器在执行forEach循环时无法运行。

您真正的问题是forEach循环将完成,然后socket.on监听器可能会在之后运行。因此,缓冲区可能会有新的条目,但缓冲区的内容不会被迭代并发送到端口。

以下是您可以如何编写代码的方式:

let buffer = [];
let streamingEnabled = false;

socket.on('stream', wordChunk => {
  buffer.push(wordChunk);
  if(streamingEnabled) sendBuffer(); 
});

function sendBuffer() {
  buffer.forEach(wordChunk => port.postMessage({ msg: 'streamData', wordChunk }));
  buffer.length = 0; // 清空缓冲区
}

if (msg.msg === 'startStreaming') {
  streamingEnabled = true;
  sendBuffer();
}
英文:

Javascript code execution is single-threaded. This means your code is safe from thread interleaving issues, and the socket.on listener cannot run while the forEach loop is executing.

Your real issue is that the forEach loop will complete, and then the socket.on listener may run afterwards. Thus, the buffer may get new entries, but the contents of the buffer will not be iterated over and sent to the port.

Here is how you could code it instead:

let buffer = [];
let streamingEnabled = false;

socket.on('stream', wordChunk => {
  buffer.push(wordChunk);
  if(streamingEnabled) sendBuffer(); 
});

function sendBuffer() {
  buffer.forEach(wordChunk => port.postMessage({ msg: 'streamData', wordChunk }));
  buffer.length = 0; //clear the buffer
}

if (msg.msg === 'startStreaming') {
  streamingEnabled = true;
  sendBuffer();
}

答案2

得分: 1

我认为这可能是使用RXJS的ReplaySubject的一个不错的选择。

我已经在下面给出了一个示例,展示了它在你的情况下可能如何运作,但实际上我不建议使用间隔这个东西。这仅仅是基于你的代码进行演示。

import { ReplaySubject } from "rxjs";

const subject = new ReplaySubject<any>()

socket.on('stream', wordChunk => {
  subject.next(wordChunk);
});

const interval = setInterval(() => {
  if(msg.msg === 'startStreaming') {
    subject.subscribe({
      next: (wordChunk) => port.postMessage({ msg: 'streamData', wordChunk })
    })
    clearInterval(interval);
  };
}, 1000);

我建立了一个sandbox来展示ReplaySubject的运作。

英文:

I think this might be a good candidate for a ReplaySubject from RXJS.

I've put an example below on how it could work in your situation, but in practice I wouldn't recommend the interval thing. It is only to demonstrate based on your code.

import { ReplaySubject } from &quot;rxjs&quot;;

const subject = new ReplaySubject&lt;any&gt;()

socket.on(&#39;stream&#39;, wordChunk =&gt; {
  subject.next(wordChunk);
});

const interval = setInterval(() =&gt; {
  if(msg.msg === &#39;startStreaming&#39;) {
    subject.subscribe({
      next: (wordChunk) =&gt; port.postMessage({ msg: &#39;streamData&#39;, wordChunk })
    })
    clearInterval(interval);
  };
}, 1000);

I've built a sandbox to show a ReplaySubject in action.

huangapple
  • 本文由 发表于 2023年6月8日 05:45:42
  • 转载请务必保留本文链接:https://go.coder-hub.com/76427294.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定