如何通过MessageChannel传递异步生成器?

huangapple go评论72阅读模式
英文:

How to pass async generator through MessageChannel?

问题

我有以下代码,可以实现我想要的功能:

function remoteGenerator(port) {
  const createPromise = () => {
    let handlers;
    return {
      promise: new Promise(
        (resolve, reject) => (handlers = { resolve, reject })
      ),
      get handlers() {
        return handlers;
      },
    };
  };
  const createIterator = (run) => {
    const iterator = {
      next: run,
      return: (arg) => run(arg, 'return'),
      [Symbol.asyncIterator]: () => iterator,
    };
    return iterator;
  };

  let done = false;
  let { promise, handlers } = createPromise();
  const step = createIterator((arg, name = 'next') => {
    const original = promise;
    if (done) return original;
    port.postMessage({ name, arg });
    promise = promise.then(() => {
      if (done) return original;
      const next = createPromise();
      handlers = next.handlers;
      return next.promise;
    });
    return original;
  });

  port.onmessage = (evt) => {
    done = evt.data.done;
    handlers[evt.data.handler]({ done: evt.data.done, value: evt.data.value });
  };
  return step;
}

// 使用示例
async function* startCounterAsync(delay = 1000) {
  let i = 0;
  while (i < 10) {
    yield i++;
    await new Promise((r) => setTimeout(r, delay));
  }
}

const startRemoteGenerator = result => {
  const mc = new MessageChannel()
  mc.port1.onmessage = async (evt) => {
    let nextResult;
    try {
      nextResult = await result[evt.data.name](evt.data.arg);
      mc.port1.postMessage({
        name: nextResult.done ? 'return' : 'next',
        handler: 'resolve',
        value: nextResult.value,
        done: nextResult.done,
      });
    } catch (err) {
      mc.port1.postMessage({
        name: 'return',
        handler: 'reject',
        value: err,
        done: true,
      });
    }
    nextResult.done && port.close();
  };
  return remoteGenerator(mc.port2);
}

for await (let value of startRemoteGenerator(startCounterAsync())) {
  console.log(value);
}

remoteGenerator 函数接收来自 MessageChannel 的一个端口,并与消息通道的另一端的生成器或异步生成器一起工作,以提供对可能在不同上下文中发生的执行的不透明接口。

我正在寻找如何重构 remoteGenerator 函数,使其本身成为一个异步生成器。

到目前为止,对我来说主要的障碍是无法知道另一端的生成器是否会返回或产生值。

为了获取传递给远程生成器的参数,我必须在我的一端使用 yield,然后应该返回另一端产生的值,似乎没有办法取消或替换正在进行的 yield 以返回,因此它将 { done: true } 设置为 true。

到目前为止,我找到的解决方案是将函数包装成异步生成器函数:

async function* remoteGeneratorWrapper(port) {
  const it = remoteGenerator(port);
  yield* it;
  return (await it.return()).value;
}
    

但我想简化这个解决方案,使其不具有中间的异步迭代器。

英文:

I have the following code which does what I want to:

function remoteGenerator(port) {
  const createPromise = () =&gt; {
    let handlers;
    return {
      promise: new Promise(
        (resolve, reject) =&gt; (handlers = { resolve, reject })
      ),
      get handlers() {
        return handlers;
      },
    };
  };
  const createIterator = (run) =&gt; {
    const iterator = {
      next: run,
      return: (arg) =&gt; run(arg, &#39;return&#39;),
      [Symbol.asyncIterator]: () =&gt; iterator,
    };
    return iterator;
  };

  let done = false;
  let { promise, handlers } = createPromise();
  const step = createIterator((arg, name = &#39;next&#39;) =&gt; {
    const original = promise;
    if (done) return original;
    port.postMessage({ name, arg });
    promise = promise.then(() =&gt; {
      if (done) return original;
      const next = createPromise();
      handlers = next.handlers;
      return next.promise;
    });
    return original;
  });

  port.onmessage = (evt) =&gt; {
    done = evt.data.done;
    handlers[evt.data.handler]({ done: evt.data.done, value: evt.data.value });
  };
  return step;
}

// usage
async function* startCounterAsync(delay = 1000) {
  let i = 0;
  while (i &lt; 10) {
    yield i++;
    await new Promise((r) =&gt; setTimeout(r, delay));
  }
}

const startRemoteGenerator = result =&gt; {
  const mc = new MessageChannel()
  mc.port1.onmessage = async (evt) =&gt; {
    let nextResult;
    try {
      nextResult = await result[evt.data.name](evt.data.arg);
      mc.port1.postMessage({
        name: nextResult.done ? &#39;return&#39; : &#39;next&#39;,
        handler: &#39;resolve&#39;,
        value: nextResult.value,
        done: nextResult.done,
      });
    } catch (err) {
      mc.port1.postMessage({
        name: &#39;return&#39;,
        handler: &#39;reject&#39;,
        value: err,
        done: true,
      });
    }
    nextResult.done &amp;&amp; port.close();
  };
  return remoteGenerator(mc.port2);
}

for await (let value of startRemoteGenerator(startCounterAsync())) {
  console.log(value);
}

The function remoteGenerator receives one of ports from MessageChannel and works with the generator or the async generator on the other end of the message channel to provide an opaque interface to the execution which may happen in different context.

I'm looking on how I could refactor the remoteGenerator function to be an async generator itself.

So far the main blocker for me is the fact that there is no way to know whether the generator on the other end will return or yield the value.

In order to get the arg to pass to the remote generator, I have to do yield on my end, which in turn should return the yielded value from the other end, and there seems to be no way to cancel or replace ongoing yield with return, so it has { done: true } set.

The solution I've found so far is wrapping the function into async generator function:

 async function* remoteGeneratorWrapper(port) {
  const it = remoteGenerator(port);
  yield* it;
  return (await it.return()).value;
}
    

But I'd want to simplify the solution, so it doesn't have intermediate async iterator

答案1

得分: 1

感谢Bergi提供有关try..finally的见解,但这实际上不太需要它。

看起来以下是remoteGeneratorWrapper的替代品,它利用了异步生成器而无需自定义异步迭代器:

async function* wrapRemoteGenerator(port) {
  try {
    port.postMessage({ name: 'next', arg: void 0 });
    while (true) {
      const res = await new Promise((resolve, reject) => {
        const handlers = { resolve, reject };
        port.onmessage = (evt) => {
          handlers[evt.data.handler](evt.data);
        };
      });
      if (res.done) {
        return res.value;
      }
      const arg = yield res.value;
      port.postMessage({ name: 'next', arg });
    }
  } finally {
    port.close();
  }
}

我一开始有点困惑,因为忘记了在调用生成器函数后,它会立即挂起,而第一个next(arg)调用的参数不能被生成器接收。相反,该调用将启动生成器函数,直到它产生或返回为止。

通过利用这个事实,我们可以立即发出next()调用,而无需等待实际传递的参数。由于生成器函数在被调用后会自动挂起,所以当我们在我们这一端调用next()时,消息将只在远程端发布。

英文:

So, thanks to Bergi for an insight re try..finally, but this doesn't really need it much.

Looks like the following is the replacement for the remoteGeneratorWrapper which takes an advantage of using async generators without the need in a custom async iterator:

async function* wrapRemoteGenerator(port) {
  try {
    port.postMessage({ name: &#39;next&#39;, arg: void 0 });
    while (true) {
      const res = await new Promise((resolve, reject) =&gt; {
        const handlers = { resolve, reject };
        port.onmessage = (evt) =&gt; {
          handlers[evt.data.handler](evt.data);
        };
      });
      if (res.done) {
        return res.value;
      }
      const arg = yield res.value;
      port.postMessage({ name: &#39;next&#39;, arg });
    }
  } finally {
    port.close();
  }
}

I had an initial struggle with it because forgot that after one calls generator function it suspends immediately and the very first next(arg) call's arg can't be received by the generator. Instead the call is starting the generator function until it either yields or returns.

By using the fact, we can issue the next() call immediately without the need to wait on the actual arg passed. As the generator function suspends on its own after it is called the message will be posted to the remote side only when one calls next() on our side.

huangapple
  • 本文由 发表于 2023年8月4日 00:36:23
  • 转载请务必保留本文链接:https://go.coder-hub.com/76830047.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定