Python and Starlette: running a long async task

huangapple go评论95阅读模式
英文:

Python and Starlette: running a long async task

问题

I have a simple experiment in the code snippet shown below. My goal is to have the browser client (via a WebSocket) kick off a long-running task on the server, but the server should service WebSocket messages from the client while the long-running task is running. Here's the workflow ("OK" means this step is working as-is in the snippet, while "?" means this is what I'm trying to figure out)...

  • OK - Run the code
  • OK - Launch a browser at 127.0.0.1
  • OK - WebSocket connects
  • OK - Click "Send" and the browser client generates a random number, sends it to the server, and the server echoes back the number
  • OK - Click "Begin" and this invokes a long-running task on the server (5.0 seconds)
  • ? - During this 5sec (while the long-running task is running), I'd like to click "Send" and have the server immediately echo back the random number that was sent from the client while the long-running task continues to be concurrently executed in the event loop

For that last bullet point, it is not working that way: rather, if you click "Send" while the long process is running, the long process finishes and then the numbers are echoed back. To me, this demonstrates that await simulate_long_process(websocket) is truly waiting for simulate_long_process() to complete -- makes sense. However, part of me was expecting that await simulate_long_process(websocket) would signal the event loop that it could go work on other tasks and therefore go back to the while True loop to service the next incoming messages. I was expecting this because simulate_long_process() is fully async (async def, await websocket.send_text(), and await asyncio.sleep()). The current behavior kinda makes sense but not what I want. So my question is, how can I achieve my goal of responding to incoming messages on the WebSocket while the long-running task is running? I am interested in two (or more) approaches:

  1. Spawning the long-running task in a different thread. For example, with asyncio.to_thread() or by stuffing a message into a separate queue that another thread is reading, which then executes the long-running task (e.g. like a producer/consumer queue). Furthermore, I can see how using those same queues, at the end of the long-running tasks, I could then send acknowledgment messages back to the Starlette/async thread and then back to the client over the WebSocket to tell them a task has completed.
  2. Somehow achieving this "purely async"? "Purely async" means mostly or entirely using features/methods from the asyncio package. This might delve into synchronous or blocking code, but here I'm thinking about things like: organizing my coroutines into a TaskGroup() object to get concurrent execution, using call_soon(), using run_in_executor(), etc. I'm really interested in hearing about this approach! But I'm skeptical since it may be convoluted. The spirit of this is mentioned here: https://stackoverflow.com/questions/35355849/long-running-tasks-with-async-server

I can certainly see the path to completion on approach (1). So I'm debating how "pure async" I try to go -- maybe Starlette (running in its own thread) is the only async portion of my entire app, and the rest of my (CPU-bound, blocking) app is on a different (synchronous) thread. Then, the Starlette async thread and the CPU-bound sync thread simply coordinate via a queue. This is where I'm headed but I'd like to hear some thoughts to see if a "pure async" approach could be reasonably implemented. Stated differently, if someone could refactor the code snippet below to work as intended (responding immediately to "Send" while the long-running task is running), using only or mostly methods from asyncio then that would be a good demonstration.

  1. from starlette.applications import Starlette
  2. from starlette.responses import HTMLResponse
  3. from starlette.routing import Route, WebSocketRoute
  4. import uvicorn
  5. import asyncio
  6. index_str = """<!DOCTYPE HTML>
  7. <html>
  8. <head>
  9. <script type="text/javascript">
  10. const websocket = new WebSocket("ws://127.0.0.1:80");
  11. window.addEventListener("DOMContentLoaded", () => {
  12. websocket.onmessage = ({ data }) => {
  13. console.log('Received: ' + data)
  14. document.body.innerHTML += data + "<br>";
  15. };
  16. });
  17. </script>
  18. </head>
  19. <body>
  20. WebSocket Async Experiment<br>
  21. <button onclick="websocket.send(Math.floor(Math.random()*10))">Send</button><br>
  22. <button onclick="websocket.send('begin')">Begin</button><br>
  23. <button onclick="websocket.send('close')">Close</button><br>
  24. </body>
  25. </html>
  26. """
  27. def homepage(request):
  28. return HTMLResponse(index_str)
  29. async def simulate_long_process(websocket):
  30. await websocket.send_text('Running long process...')
  31. await asyncio.sleep(5.0)
  32. async def websocket_endpoint(websocket):
  33. await websocket.accept()
  34. await websocket.send_text('Server connected')
  35. while True:
  36. msg = await websocket.receive_text()
  37. print('server received: {msg}')
  38. if msg == 'begin':
  39. await simulate_long_process(websocket)
  40. elif msg == 'close':
  41. await websocket.send_text('Server closed')
  42. break
  43. else:
  44. await websocket.send_text(f'Server received {msg} from client')
  45. await websocket.close()
  46. print('Server closed')
  47. if __name__ == '__main__':
  48. routes = [
  49. Route('/', homepage),
  50. WebSocketRoute('/', websocket_endpoint)
  51. ]
  52. app = Starlette(debug=True, routes=routes)
  53. uvicorn.run(app, host='0.0.0.0', port=80)
英文:

I have a simple experiment in the code snippet shown below. My goal is to have the browser client (via a WebSocket) kick off a long-running task on the server, but the server should service WebSocket messages from the client while the long-running task is running. Here's the workflow ("OK" means this step is working as-is in the snippet, while "?" means this is what I'm trying to figure out)...

  • OK - Run the code
  • OK - Launch a browser at 127.0.0.1
  • OK - WebSocket connects
  • OK - Click "Send" and the browser client generates a random number, sends it to the server, and the server echoes back the number
  • OK - Click "Begin" and this invokes a long-running task on the server (5.0 seconds)
  • ? - During this 5sec (while the long-running task is running), I'd like to click "Send" and have the server immediately echo back the random number that was sent from the client while the long-running task continues to be concurrently executed in the event loop

For that last bullet point, it is not working that way: rather, if you click "Send" while the long process is running, the long process finishes and then the numbers are echoed back. To me, this demonstrates that await simulate_long_process(websocket) is truly waiting for simulate_long_process() to complete -- makes sense. However, part of me was expecting that await simulate_long_process(websocket) would signal the event loop that it could go work on other tasks and therefore go back to the while True loop to service the next incoming messages. I was expecting this because simulate_long_process() is fully async (async def, await websocket.send_text(), and await asyncio.sleep()). The current behavior kinda makes sense but not what I want. So my question is, how can I achieve my goal of responding to incoming messages on the WebSocket while the long-running task is running? I am interested in two (or more) approaches:

  1. Spawning the long-running task in a different thread. For example, with asyncio.to_thread() or by stuffing a message into a separate queue that another thread is reading, which then executes the long-running task (e.g. like a producer/consumer queue). Furthermore, I can see how using those same queues, at the end of the long-running tasks, I could then send acknowledgment messages back to the Starlette/async thread and then back to the client over the WebSocket to tell them a task has completed.
  2. Somehow achieving this "purely async"? "Purely async" means mostly or entirely using features/methods from the asyncio package. This might delve into synchronous or blocking code, but here I'm thinking about things like: organizing my coroutines into a TaskGroup() object to get concurrent execution, using call_soon(), using run_in_executor(), etc. I'm really interested in hearing about this approach! But I'm skeptical since it may be convoluted. The spirit of this is mentioned here: https://stackoverflow.com/questions/35355849/long-running-tasks-with-async-server

I can certainly see the path to completion on approach (1). So I'm debating how "pure async" I try to go -- maybe Starlette (running in its own thread) is the only async portion of my entire app, and the rest of my (CPU-bound, blocking) app is on a different (synchronous) thread. Then, the Starlette async thread and the CPU-bound sync thread simply coordinate via a queue. This is where I'm headed but I'd like to hear some thoughts to see if a "pure async" approach could be reasonably implemented. Stated differently, if someone could refactor the code snippet below to work as intended (responding immediately to "Send" while the long-running task is running), using only or mostly methods from asyncio then that would be a good demonstration.

  1. from starlette.applications import Starlette
  2. from starlette.responses import HTMLResponse
  3. from starlette.routing import Route, WebSocketRoute
  4. import uvicorn
  5. import asyncio
  6. index_str = &quot;&quot;&quot;&lt;!DOCTYPE HTML&gt;
  7. &lt;html&gt;
  8. &lt;head&gt;
  9. &lt;script type = &quot;text/javascript&quot;&gt;
  10. const websocket = new WebSocket(&quot;ws://127.0.0.1:80&quot;);
  11. window.addEventListener(&quot;DOMContentLoaded&quot;, () =&gt; {
  12. websocket.onmessage = ({ data }) =&gt; {
  13. console.log(&#39;Received: &#39; + data)
  14. document.body.innerHTML += data + &quot;&lt;br&gt;&quot;;
  15. };
  16. });
  17. &lt;/script&gt;
  18. &lt;/head&gt;
  19. &lt;body&gt;
  20. WebSocket Async Experiment&lt;br&gt;
  21. &lt;button onclick=&quot;websocket.send(Math.floor(Math.random()*10))&quot;&gt;Send&lt;/button&gt;&lt;br&gt;
  22. &lt;button onclick=&quot;websocket.send(&#39;begin&#39;)&quot;&gt;Begin&lt;/button&gt;&lt;br&gt;
  23. &lt;button onclick=&quot;websocket.send(&#39;close&#39;)&quot;&gt;Close&lt;/button&gt;&lt;br&gt;
  24. &lt;/body&gt;
  25. &lt;/html&gt;
  26. &quot;&quot;&quot;
  27. def homepage(request):
  28. return HTMLResponse(index_str)
  29. async def simulate_long_process(websocket):
  30. await websocket.send_text(f&#39;Running long process...&#39;)
  31. await asyncio.sleep(5.0)
  32. async def websocket_endpoint(websocket):
  33. await websocket.accept()
  34. await websocket.send_text(f&#39;Server connected&#39;)
  35. while True:
  36. msg = await websocket.receive_text()
  37. print(f&#39;server received: {msg}&#39;)
  38. if msg == &#39;begin&#39;:
  39. await simulate_long_process(websocket)
  40. elif msg == &#39;close&#39;:
  41. await websocket.send_text(&#39;Server closed&#39;)
  42. break
  43. else:
  44. await websocket.send_text(f&#39;Server received {msg} from client&#39;)
  45. await websocket.close()
  46. print(&#39;Server closed&#39;)
  47. if __name__ == &#39;__main__&#39;:
  48. routes = [
  49. Route(&#39;/&#39;, homepage),
  50. WebSocketRoute(&#39;/&#39;, websocket_endpoint) ]
  51. app = Starlette(debug=True, routes=routes)
  52. uvicorn.run(app, host=&#39;0.0.0.0&#39;, port=80)

答案1

得分: 1

First:

However, part of me was expecting that await simulate_long_process(websocket) would signal the event loop that it could go work on other tasks

这正是 await 的含义:它意味着,“停止执行此协程(websocket_endpoint),而我们等待来自 simulate_long_process 的结果,并继续服务其他协程”。

正如所发生的那样,你没有任何并发协程在运行,因此这只是暂停,直到 simulate_long_process 返回。

Second:

即使你同时运行 simulate_long_process(例如,通过使用 asyncio.create_task 创建任务,然后检查其是否完成),你的 while 循环会阻塞,等待来自客户端的文本。这意味着你不能在 simulate_long_process 完成后向客户端发送消息,因为客户端需要在 while 循环的主体执行之前向你发送某些内容。

英文:

First:

> However, part of me was expecting that await simulate_long_process(websocket) would signal the event loop that it could go work on other tasks

That is exactly what await means: it means, "stop executing this coroutine (websocket_endpoint) while we wait for a result from simulate_long_process, and go service other coroutines".

As it happens, you don't have any concurrent coroutines running, so this just pauses things until simulate_long_process returns.

Second:

Even if you were to run simulate_long_process concurrently (e.g., by creating a task using asyncio.create_task and then checking if its complete), your while loop blocks waiting for text from the client. This means that you can't, for instance, send the client a message when simulate_long_process completes, because the client needs to send you something before the body of the while loop can execute.


I haven't worked with Starlette before, so this may not be the most canonical solution, but here's an implementation that uses a WebSocketEndpoint to implement the desired behavior:

  1. from starlette.applications import Starlette
  2. from starlette.responses import HTMLResponse
  3. from starlette.routing import Route, WebSocketRoute
  4. from starlette.endpoints import WebSocketEndpoint
  5. import uvicorn
  6. import asyncio
  7. SERVER_PORT=8000
  8. index_str = &quot;&quot;&quot;&lt;!DOCTYPE HTML&gt;
  9. &lt;html&gt;
  10. &lt;head&gt;
  11. &lt;script type = &quot;text/javascript&quot;&gt;
  12. const websocket = new WebSocket(&quot;ws://127.0.0.1:%s&quot;);
  13. window.addEventListener(&quot;DOMContentLoaded&quot;, () =&gt; {
  14. websocket.onmessage = ({ data }) =&gt; {
  15. console.log(&#39;Received: &#39; + data)
  16. document.body.innerHTML += data + &quot;&lt;br&gt;&quot;;
  17. };
  18. });
  19. &lt;/script&gt;
  20. &lt;/head&gt;
  21. &lt;body&gt;
  22. WebSocket Async Experiment&lt;br&gt;
  23. &lt;button onclick=&quot;websocket.send(Math.floor(Math.random()*10))&quot;&gt;Send&lt;/button&gt;&lt;br&gt;
  24. &lt;button onclick=&quot;websocket.send(&#39;begin&#39;)&quot;&gt;Begin&lt;/button&gt;&lt;br&gt;
  25. &lt;button onclick=&quot;websocket.send(&#39;close&#39;)&quot;&gt;Close&lt;/button&gt;&lt;br&gt;
  26. &lt;/body&gt;
  27. &lt;/html&gt;
  28. &quot;&quot;&quot; % (SERVER_PORT)
  29. def homepage(request):
  30. return HTMLResponse(index_str)
  31. class Consumer(WebSocketEndpoint):
  32. encoding = &#39;text&#39;
  33. task = None
  34. async def on_connect(self, ws):
  35. await ws.accept()
  36. async def on_receive(self, ws, data):
  37. match data:
  38. case &#39;begin&#39;:
  39. if self.task is not None:
  40. await ws.send_text(&#39;background task is already running&#39;)
  41. return
  42. await ws.send_text(&#39;start background task&#39;)
  43. self.task = asyncio.create_task(self.simulate_long_task(ws))
  44. case &#39;close&#39;:
  45. await ws.send_text(&#39;closing connection&#39;)
  46. await ws.close()
  47. case _:
  48. await ws.send_text(f&#39;Server received {data} from client&#39;)
  49. async def simulate_long_task(self, ws):
  50. await ws.send_text(&#39;start long process&#39;)
  51. await asyncio.sleep(5)
  52. await ws.send_text(&#39;finish long process&#39;)
  53. self.task = None
  54. async def on_disconnect(self, ws, close_code):
  55. pass
  56. if __name__ == &#39;__main__&#39;:
  57. routes = [
  58. Route(&#39;/&#39;, homepage),
  59. WebSocketRoute(&#39;/&#39;, Consumer) ]
  60. app = Starlette(debug=True, routes=routes)
  61. uvicorn.run(app, host=&#39;0.0.0.0&#39;, port=SERVER_PORT)

(Note that this by default uses port 8000 instead of port 80 because I already have something running on port 80 locally.)

huangapple
  • 本文由 发表于 2023年2月14日 05:27:42
  • 转载请务必保留本文链接:https://go.coder-hub.com/75441343.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定