Python and Starlette: running a long async task

huangapple go评论67阅读模式
英文:

Python and Starlette: running a long async task

问题

I have a simple experiment in the code snippet shown below. My goal is to have the browser client (via a WebSocket) kick off a long-running task on the server, but the server should service WebSocket messages from the client while the long-running task is running. Here's the workflow ("OK" means this step is working as-is in the snippet, while "?" means this is what I'm trying to figure out)...

  • OK - Run the code
  • OK - Launch a browser at 127.0.0.1
  • OK - WebSocket connects
  • OK - Click "Send" and the browser client generates a random number, sends it to the server, and the server echoes back the number
  • OK - Click "Begin" and this invokes a long-running task on the server (5.0 seconds)
  • ? - During this 5sec (while the long-running task is running), I'd like to click "Send" and have the server immediately echo back the random number that was sent from the client while the long-running task continues to be concurrently executed in the event loop

For that last bullet point, it is not working that way: rather, if you click "Send" while the long process is running, the long process finishes and then the numbers are echoed back. To me, this demonstrates that await simulate_long_process(websocket) is truly waiting for simulate_long_process() to complete -- makes sense. However, part of me was expecting that await simulate_long_process(websocket) would signal the event loop that it could go work on other tasks and therefore go back to the while True loop to service the next incoming messages. I was expecting this because simulate_long_process() is fully async (async def, await websocket.send_text(), and await asyncio.sleep()). The current behavior kinda makes sense but not what I want. So my question is, how can I achieve my goal of responding to incoming messages on the WebSocket while the long-running task is running? I am interested in two (or more) approaches:

  1. Spawning the long-running task in a different thread. For example, with asyncio.to_thread() or by stuffing a message into a separate queue that another thread is reading, which then executes the long-running task (e.g. like a producer/consumer queue). Furthermore, I can see how using those same queues, at the end of the long-running tasks, I could then send acknowledgment messages back to the Starlette/async thread and then back to the client over the WebSocket to tell them a task has completed.
  2. Somehow achieving this "purely async"? "Purely async" means mostly or entirely using features/methods from the asyncio package. This might delve into synchronous or blocking code, but here I'm thinking about things like: organizing my coroutines into a TaskGroup() object to get concurrent execution, using call_soon(), using run_in_executor(), etc. I'm really interested in hearing about this approach! But I'm skeptical since it may be convoluted. The spirit of this is mentioned here: https://stackoverflow.com/questions/35355849/long-running-tasks-with-async-server

I can certainly see the path to completion on approach (1). So I'm debating how "pure async" I try to go -- maybe Starlette (running in its own thread) is the only async portion of my entire app, and the rest of my (CPU-bound, blocking) app is on a different (synchronous) thread. Then, the Starlette async thread and the CPU-bound sync thread simply coordinate via a queue. This is where I'm headed but I'd like to hear some thoughts to see if a "pure async" approach could be reasonably implemented. Stated differently, if someone could refactor the code snippet below to work as intended (responding immediately to "Send" while the long-running task is running), using only or mostly methods from asyncio then that would be a good demonstration.

from starlette.applications import Starlette
from starlette.responses import HTMLResponse
from starlette.routing import Route, WebSocketRoute
import uvicorn
import asyncio

index_str = """<!DOCTYPE HTML>
<html>
<head>
    <script type="text/javascript">
    const websocket = new WebSocket("ws://127.0.0.1:80");
    window.addEventListener("DOMContentLoaded", () => {
        websocket.onmessage = ({ data }) => {
            console.log('Received: ' + data)
            document.body.innerHTML += data + "<br>";
        };
    });
    </script>
</head>
<body>
    WebSocket Async Experiment<br>
    <button onclick="websocket.send(Math.floor(Math.random()*10))">Send</button><br>
    <button onclick="websocket.send('begin')">Begin</button><br>
    <button onclick="websocket.send('close')">Close</button><br>
</body>
</html>
"""

def homepage(request):
    return HTMLResponse(index_str)

async def simulate_long_process(websocket):
    await websocket.send_text('Running long process...')
    await asyncio.sleep(5.0)

async def websocket_endpoint(websocket):
    await websocket.accept()
    await websocket.send_text('Server connected')
    while True:
        msg = await websocket.receive_text()
        print('server received: {msg}')
        if msg == 'begin':
            await simulate_long_process(websocket)
        elif msg == 'close':
            await websocket.send_text('Server closed')
            break
        else:
            await websocket.send_text(f'Server received {msg} from client')
    await websocket.close()
    print('Server closed')

if __name__ == '__main__':
    routes = [
        Route('/', homepage),
        WebSocketRoute('/', websocket_endpoint)
    ]

    app = Starlette(debug=True, routes=routes)
    uvicorn.run(app, host='0.0.0.0', port=80)
英文:

I have a simple experiment in the code snippet shown below. My goal is to have the browser client (via a WebSocket) kick off a long-running task on the server, but the server should service WebSocket messages from the client while the long-running task is running. Here's the workflow ("OK" means this step is working as-is in the snippet, while "?" means this is what I'm trying to figure out)...

  • OK - Run the code
  • OK - Launch a browser at 127.0.0.1
  • OK - WebSocket connects
  • OK - Click "Send" and the browser client generates a random number, sends it to the server, and the server echoes back the number
  • OK - Click "Begin" and this invokes a long-running task on the server (5.0 seconds)
  • ? - During this 5sec (while the long-running task is running), I'd like to click "Send" and have the server immediately echo back the random number that was sent from the client while the long-running task continues to be concurrently executed in the event loop

For that last bullet point, it is not working that way: rather, if you click "Send" while the long process is running, the long process finishes and then the numbers are echoed back. To me, this demonstrates that await simulate_long_process(websocket) is truly waiting for simulate_long_process() to complete -- makes sense. However, part of me was expecting that await simulate_long_process(websocket) would signal the event loop that it could go work on other tasks and therefore go back to the while True loop to service the next incoming messages. I was expecting this because simulate_long_process() is fully async (async def, await websocket.send_text(), and await asyncio.sleep()). The current behavior kinda makes sense but not what I want. So my question is, how can I achieve my goal of responding to incoming messages on the WebSocket while the long-running task is running? I am interested in two (or more) approaches:

  1. Spawning the long-running task in a different thread. For example, with asyncio.to_thread() or by stuffing a message into a separate queue that another thread is reading, which then executes the long-running task (e.g. like a producer/consumer queue). Furthermore, I can see how using those same queues, at the end of the long-running tasks, I could then send acknowledgment messages back to the Starlette/async thread and then back to the client over the WebSocket to tell them a task has completed.
  2. Somehow achieving this "purely async"? "Purely async" means mostly or entirely using features/methods from the asyncio package. This might delve into synchronous or blocking code, but here I'm thinking about things like: organizing my coroutines into a TaskGroup() object to get concurrent execution, using call_soon(), using run_in_executor(), etc. I'm really interested in hearing about this approach! But I'm skeptical since it may be convoluted. The spirit of this is mentioned here: https://stackoverflow.com/questions/35355849/long-running-tasks-with-async-server

I can certainly see the path to completion on approach (1). So I'm debating how "pure async" I try to go -- maybe Starlette (running in its own thread) is the only async portion of my entire app, and the rest of my (CPU-bound, blocking) app is on a different (synchronous) thread. Then, the Starlette async thread and the CPU-bound sync thread simply coordinate via a queue. This is where I'm headed but I'd like to hear some thoughts to see if a "pure async" approach could be reasonably implemented. Stated differently, if someone could refactor the code snippet below to work as intended (responding immediately to "Send" while the long-running task is running), using only or mostly methods from asyncio then that would be a good demonstration.

from starlette.applications import Starlette
from starlette.responses import HTMLResponse
from starlette.routing import Route, WebSocketRoute
import uvicorn
import asyncio
index_str = &quot;&quot;&quot;&lt;!DOCTYPE HTML&gt;
&lt;html&gt;
&lt;head&gt;
&lt;script type = &quot;text/javascript&quot;&gt;
const websocket = new WebSocket(&quot;ws://127.0.0.1:80&quot;);
window.addEventListener(&quot;DOMContentLoaded&quot;, () =&gt; {
websocket.onmessage = ({ data }) =&gt; {
console.log(&#39;Received: &#39; + data)
document.body.innerHTML += data + &quot;&lt;br&gt;&quot;;
};
});
&lt;/script&gt;
&lt;/head&gt;
&lt;body&gt;
WebSocket Async Experiment&lt;br&gt;
&lt;button onclick=&quot;websocket.send(Math.floor(Math.random()*10))&quot;&gt;Send&lt;/button&gt;&lt;br&gt;
&lt;button onclick=&quot;websocket.send(&#39;begin&#39;)&quot;&gt;Begin&lt;/button&gt;&lt;br&gt;
&lt;button onclick=&quot;websocket.send(&#39;close&#39;)&quot;&gt;Close&lt;/button&gt;&lt;br&gt;
&lt;/body&gt;
&lt;/html&gt;
&quot;&quot;&quot;
def homepage(request):
return HTMLResponse(index_str)
async def simulate_long_process(websocket):
await websocket.send_text(f&#39;Running long process...&#39;)
await asyncio.sleep(5.0)
async def websocket_endpoint(websocket):
await websocket.accept()
await websocket.send_text(f&#39;Server connected&#39;)
while True:
msg = await websocket.receive_text()
print(f&#39;server received: {msg}&#39;)
if msg == &#39;begin&#39;:
await simulate_long_process(websocket)
elif msg == &#39;close&#39;:
await websocket.send_text(&#39;Server closed&#39;)
break
else:
await websocket.send_text(f&#39;Server received {msg} from client&#39;)
await websocket.close()
print(&#39;Server closed&#39;)
if __name__ == &#39;__main__&#39;:
routes = [
Route(&#39;/&#39;, homepage),
WebSocketRoute(&#39;/&#39;, websocket_endpoint) ]
app = Starlette(debug=True, routes=routes)
uvicorn.run(app, host=&#39;0.0.0.0&#39;, port=80)

答案1

得分: 1

First:

However, part of me was expecting that await simulate_long_process(websocket) would signal the event loop that it could go work on other tasks

这正是 await 的含义:它意味着,“停止执行此协程(websocket_endpoint),而我们等待来自 simulate_long_process 的结果,并继续服务其他协程”。

正如所发生的那样,你没有任何并发协程在运行,因此这只是暂停,直到 simulate_long_process 返回。

Second:

即使你同时运行 simulate_long_process(例如,通过使用 asyncio.create_task 创建任务,然后检查其是否完成),你的 while 循环会阻塞,等待来自客户端的文本。这意味着你不能在 simulate_long_process 完成后向客户端发送消息,因为客户端需要在 while 循环的主体执行之前向你发送某些内容。

英文:

First:

> However, part of me was expecting that await simulate_long_process(websocket) would signal the event loop that it could go work on other tasks

That is exactly what await means: it means, "stop executing this coroutine (websocket_endpoint) while we wait for a result from simulate_long_process, and go service other coroutines".

As it happens, you don't have any concurrent coroutines running, so this just pauses things until simulate_long_process returns.

Second:

Even if you were to run simulate_long_process concurrently (e.g., by creating a task using asyncio.create_task and then checking if its complete), your while loop blocks waiting for text from the client. This means that you can't, for instance, send the client a message when simulate_long_process completes, because the client needs to send you something before the body of the while loop can execute.


I haven't worked with Starlette before, so this may not be the most canonical solution, but here's an implementation that uses a WebSocketEndpoint to implement the desired behavior:

from starlette.applications import Starlette
from starlette.responses import HTMLResponse
from starlette.routing import Route, WebSocketRoute
from starlette.endpoints import WebSocketEndpoint
import uvicorn
import asyncio
SERVER_PORT=8000
index_str = &quot;&quot;&quot;&lt;!DOCTYPE HTML&gt;
&lt;html&gt;
&lt;head&gt;
&lt;script type = &quot;text/javascript&quot;&gt;
const websocket = new WebSocket(&quot;ws://127.0.0.1:%s&quot;);
window.addEventListener(&quot;DOMContentLoaded&quot;, () =&gt; {
websocket.onmessage = ({ data }) =&gt; {
console.log(&#39;Received: &#39; + data)
document.body.innerHTML += data + &quot;&lt;br&gt;&quot;;
};
});
&lt;/script&gt;
&lt;/head&gt;
&lt;body&gt;
WebSocket Async Experiment&lt;br&gt;
&lt;button onclick=&quot;websocket.send(Math.floor(Math.random()*10))&quot;&gt;Send&lt;/button&gt;&lt;br&gt;
&lt;button onclick=&quot;websocket.send(&#39;begin&#39;)&quot;&gt;Begin&lt;/button&gt;&lt;br&gt;
&lt;button onclick=&quot;websocket.send(&#39;close&#39;)&quot;&gt;Close&lt;/button&gt;&lt;br&gt;
&lt;/body&gt;
&lt;/html&gt;
&quot;&quot;&quot; % (SERVER_PORT)
def homepage(request):
return HTMLResponse(index_str)
class Consumer(WebSocketEndpoint):
encoding = &#39;text&#39;
task = None
async def  on_connect(self, ws):
await ws.accept()
async def on_receive(self, ws, data):
match data:
case &#39;begin&#39;:
if self.task is not None:
await ws.send_text(&#39;background task is already running&#39;)
return
await ws.send_text(&#39;start background task&#39;)
self.task = asyncio.create_task(self.simulate_long_task(ws))
case &#39;close&#39;:
await ws.send_text(&#39;closing connection&#39;)
await ws.close()
case _:
await ws.send_text(f&#39;Server received {data} from client&#39;)
async def simulate_long_task(self, ws):
await ws.send_text(&#39;start long process&#39;)
await asyncio.sleep(5)
await ws.send_text(&#39;finish long process&#39;)
self.task = None
async def on_disconnect(self, ws, close_code):
pass
if __name__ == &#39;__main__&#39;:
routes = [
Route(&#39;/&#39;, homepage),
WebSocketRoute(&#39;/&#39;, Consumer) ]
app = Starlette(debug=True, routes=routes)
uvicorn.run(app, host=&#39;0.0.0.0&#39;, port=SERVER_PORT)

(Note that this by default uses port 8000 instead of port 80 because I already have something running on port 80 locally.)

huangapple
  • 本文由 发表于 2023年2月14日 05:27:42
  • 转载请务必保留本文链接:https://go.coder-hub.com/75441343.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定