结束管道中的读取流,但仍要处理已读取的数据块。

huangapple go评论76阅读模式
英文:

NodeJS Streams - End a read stream in pipeline but still process the chunks already readed

问题

Sure, here's the translated text without the code:

我有以下问题:

我正在使用NodeJS流以管道方式读取一个大文件,进行一些转换,然后将其写入可写流。棘手的部分是,我希望能够在满足特定条件时停止读取文件,但仍然完成已读取的块的处理。

在以下示例中,我正在读取一个文本文件,我的转换流(secondStream)将文本转换为大写并发送到下一个流。但如果它找到文本,那意味着应该停止从文本文件中读取,我认为这意味着读取流应该停止读取块。

我尝试了几种解决方案,但不得不承认,我有点困惑。到目前为止,我得到了以下代码可以工作。但是,使用firstStream.destroy();会导致管道抛出错误

Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close

我能够通过捕获并忽略管道上的错误来“避免”这个错误,但老实说,这对我来说听起来不安全或不正确。

是否有更好的方法来实现这一点?我是否遗漏了什么?

提前感谢您的帮助!

英文:

I have the following problem :

I am using NodeJS streams with the pipeline to read a big file, do some transforms and then write it to a writable stream. The tricky part is I wanted to be able to stop reading the file if an specific condition is met, but still finish processing the already read chunks.

In the following example I am reading a text file, my transform stream(secondStream) converts the text to uppercase and send to the next stream. But if it founds a text, that means it should stop reading from the text file, which I believe means that the readstream should stop reading the chunks.

I tried several solutions, but not gonna line that I am a little confused here. So far I got the following code to work. However, by using firstStream.destroy(); makes the pipeline to throw an error

Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close

I was able to 'avoid' this error by catching and ignoring it on the pipeline, but to be honest this doesn't sound safe or correct to me.

  const { Transform, Writable, Readable } = require("node:stream");
  const { pipeline } = require("node:stream/promises");
  const fs = require("node:fs");

  let shouldStop = false;
  const firstStream = fs.createReadStream("./lg.txt");

  const secondStream = new Transform({
    transform(chunk, encoding, callback) {
      const foundText = chunk.toString().search("CHAPTER 9") !== -1;

      if (foundText) {
        shouldStop = true;
      }

      const transformed = chunk.toString().toUpperCase();
      callback(null, transformed);
    },
  });

  const lastStream = process.stdout;

  firstStream.on("data", () => {
    if (shouldStop) {
      console.log("should pause");
      firstStream.destroy();
    }
  });

  await pipeline(firstStream, secondStream, lastStream).catch(
    (err) => undefined
  ); // Feels wrong to me

Is there any better way to do it? Am I missing something?

Thank you in advance friends!

答案1

得分: 0

在你的转换流中,你可以在找到目标文本后只是“吃掉”或“跳过”任何数据。这样,你可以保留所有其他的 pipeline() 逻辑。而不是立即终止,它将只读取输入流的末尾,但会跳过目标文本之后的所有数据。这允许流正常完成。

const secondStream = new Transform({
    transform(chunk, encoding, callback) {
        if (shouldStop) {
            // 吃掉任何剩余的数据
            callback(null, "");
        } else {
            const text = chunk.toString();
            const foundText = text.search("CHAPTER 9") !== -1;
            if (foundText) {
                // 设置标志以吃掉剩余的数据
                shouldStop = true;
            }
            callback(null, text.toUpperCase());
        }
    },
});

pipeline() 函数还支持一个中止控制器,这是一种支持中止管道但仍然适当清理一切的方法。当你中止时,pipeline() 将以一个被拒绝的 promise 结束,但你可以检查拒绝是否因为你的中止而发生,如果是的话,你可以获取你的中止消息。

在你的代码中,可以这样实现:

const { Transform, Writable, Readable } = require("node:stream");
const { pipeline } = require("node:stream/promises");
const fs = require("node:fs");

const firstStream = fs.createReadStream("./lg.txt");

const ac = new AbortController();
const signal = ac.signal;

const secondStream = new Transform({
    transform(chunk, encoding, callback) {
        const text = chunk.toString();
        const foundText = text.search("CHAPTER 9") !== -1;

        callback(null, text.toUpperCase());
        if (foundText) {
            ac.abort(new Error("reading terminated, match found"));
        }

    },
});

const lastStream = process.stdout;

pipeline(firstStream, secondStream, lastStream, { signal }).then(() => {
    console.log("\nall done without match");
}).catch((err) => {
    if (err.code === "ABORT_ERR") {
        console.log(`\n${signal.reason.message}`);
    } else {
        console.log(err);
    }
});

注意: 另一个话题是,你的代码容易受到搜索字符串跨越数据块边界而不被检测到的影响。避免这个问题的通常方法是保留每个数据块的最后 N 个字符,并在运行匹配搜索之前将它们添加到下一个数据块。其中 N 是你搜索字符串的长度减去 1。这可以确保你不会错过跨越数据块的搜索字符串。你将不得不调整你的输出以不包括添加的文本。由于这不是你在这里提出的问题的关键,我没有添加这个逻辑,留给你自己处理,但它对于可靠的匹配是必要的。

英文:

In your transform stream, you could just "eat" or "skip" any data that is after you found the target text. In this way, you can keep all the other pipeline() logic. Rather than terminating immediately, it will just read to the end of the input stream, but will skip all data after the target text. This allows the streams to complete normally.

const secondStream = new Transform({
    transform(chunk, encoding, callback) {
        if (shouldStop) {
            // eat any remaining data
            callback(null, "");
        } else {
            const text = chunk.toString();
            const foundText = text.search("CHAPTER 9") !== -1;
            if (foundText) {
                // set flag to eat remaining data
                shouldStop = true;
            }
            callback(null, text.toUpperCase());
        }
    },
});

The pipeline() function also supports an abort controller which is a supported means of aborting the pipeline while still cleaning everything up appropriately. When you abort, the pipeline() will end with a rejected promise, but you can check if the rejection was because of your abort or not and, if so, you can get your abort message.

In your code, that can be implemented like this:

const { Transform, Writable, Readable } = require("node:stream");
const { pipeline } = require("node:stream/promises");
const fs = require("node:fs");

const firstStream = fs.createReadStream("./lg.txt");

const ac = new AbortController();
const signal = ac.signal;

const secondStream = new Transform({
    transform(chunk, encoding, callback) {
        const text = chunk.toString();
        const foundText = text.search("CHAPTER 9") !== -1;

        callback(null, text.toUpperCase());
        if (foundText) {
            ac.abort(new Error("reading terminated, match found"));
        }

    },
});

const lastStream = process.stdout;

pipeline(firstStream, secondStream, lastStream, { signal }).then(() => {
    console.log("\nall done without match");
}).catch((err) => {
    if (err.code === "ABORT_ERR") {
        console.log(`\n${signal.reason.message}`);
    } else {
        console.log(err);
    }
});

Note: On another topic, your code is vulnerable to the search string falling across a chunk boundary and thus not being detected. The usual way of avoiding that issue is by preserving the last N characters of each chunk and prepending it to the next chunk before running your match search where N is the length of your search string - 1. This ensures you won't miss a search string that spans across chunks. You will have to adjust your output to not include the prepended text too. Since that wasn't the crux of your question here, I didn't add that logic and will leave that to you, but it is necessary for reliable matching.

huangapple
  • 本文由 发表于 2023年5月15日 02:24:04
  • 转载请务必保留本文链接:https://go.coder-hub.com/76249053.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定