英文:
Python and Starlette: running a long async task
问题
I have a simple experiment in the code snippet shown below. My goal is to have the browser client (via a WebSocket) kick off a long-running task on the server, but the server should service WebSocket messages from the client while the long-running task is running. Here's the workflow ("OK" means this step is working as-is in the snippet, while "?" means this is what I'm trying to figure out)...
- OK - Run the code
- OK - Launch a browser at 127.0.0.1
- OK - WebSocket connects
- OK - Click "Send" and the browser client generates a random number, sends it to the server, and the server echoes back the number
- OK - Click "Begin" and this invokes a long-running task on the server (5.0 seconds)
- ? - During this 5sec (while the long-running task is running), I'd like to click "Send" and have the server immediately echo back the random number that was sent from the client while the long-running task continues to be concurrently executed in the event loop
For that last bullet point, it is not working that way: rather, if you click "Send" while the long process is running, the long process finishes and then the numbers are echoed back. To me, this demonstrates that await simulate_long_process(websocket)
is truly waiting for simulate_long_process()
to complete -- makes sense. However, part of me was expecting that await simulate_long_process(websocket)
would signal the event loop that it could go work on other tasks and therefore go back to the while True
loop to service the next incoming messages. I was expecting this because simulate_long_process()
is fully async (async def
, await websocket.send_text()
, and await asyncio.sleep()
). The current behavior kinda makes sense but not what I want. So my question is, how can I achieve my goal of responding to incoming messages on the WebSocket while the long-running task is running? I am interested in two (or more) approaches:
- Spawning the long-running task in a different thread. For example, with
asyncio.to_thread()
or by stuffing a message into a separate queue that another thread is reading, which then executes the long-running task (e.g. like a producer/consumer queue). Furthermore, I can see how using those same queues, at the end of the long-running tasks, I could then send acknowledgment messages back to the Starlette/async thread and then back to the client over the WebSocket to tell them a task has completed. - Somehow achieving this "purely async"? "Purely async" means mostly or entirely using features/methods from the
asyncio
package. This might delve into synchronous or blocking code, but here I'm thinking about things like: organizing my coroutines into aTaskGroup()
object to get concurrent execution, usingcall_soon()
, usingrun_in_executor()
, etc. I'm really interested in hearing about this approach! But I'm skeptical since it may be convoluted. The spirit of this is mentioned here: https://stackoverflow.com/questions/35355849/long-running-tasks-with-async-server
I can certainly see the path to completion on approach (1). So I'm debating how "pure async" I try to go -- maybe Starlette (running in its own thread) is the only async portion of my entire app, and the rest of my (CPU-bound, blocking) app is on a different (synchronous) thread. Then, the Starlette async thread and the CPU-bound sync thread simply coordinate via a queue. This is where I'm headed but I'd like to hear some thoughts to see if a "pure async" approach could be reasonably implemented. Stated differently, if someone could refactor the code snippet below to work as intended (responding immediately to "Send" while the long-running task is running), using only or mostly methods from asyncio
then that would be a good demonstration.
from starlette.applications import Starlette
from starlette.responses import HTMLResponse
from starlette.routing import Route, WebSocketRoute
import uvicorn
import asyncio
index_str = """<!DOCTYPE HTML>
<html>
<head>
<script type="text/javascript">
const websocket = new WebSocket("ws://127.0.0.1:80");
window.addEventListener("DOMContentLoaded", () => {
websocket.onmessage = ({ data }) => {
console.log('Received: ' + data)
document.body.innerHTML += data + "<br>";
};
});
</script>
</head>
<body>
WebSocket Async Experiment<br>
<button onclick="websocket.send(Math.floor(Math.random()*10))">Send</button><br>
<button onclick="websocket.send('begin')">Begin</button><br>
<button onclick="websocket.send('close')">Close</button><br>
</body>
</html>
"""
def homepage(request):
return HTMLResponse(index_str)
async def simulate_long_process(websocket):
await websocket.send_text('Running long process...')
await asyncio.sleep(5.0)
async def websocket_endpoint(websocket):
await websocket.accept()
await websocket.send_text('Server connected')
while True:
msg = await websocket.receive_text()
print('server received: {msg}')
if msg == 'begin':
await simulate_long_process(websocket)
elif msg == 'close':
await websocket.send_text('Server closed')
break
else:
await websocket.send_text(f'Server received {msg} from client')
await websocket.close()
print('Server closed')
if __name__ == '__main__':
routes = [
Route('/', homepage),
WebSocketRoute('/', websocket_endpoint)
]
app = Starlette(debug=True, routes=routes)
uvicorn.run(app, host='0.0.0.0', port=80)
英文:
I have a simple experiment in the code snippet shown below. My goal is to have the browser client (via a WebSocket) kick off a long-running task on the server, but the server should service WebSocket messages from the client while the long-running task is running. Here's the workflow ("OK" means this step is working as-is in the snippet, while "?" means this is what I'm trying to figure out)...
- OK - Run the code
- OK - Launch a browser at 127.0.0.1
- OK - WebSocket connects
- OK - Click "Send" and the browser client generates a random number, sends it to the server, and the server echoes back the number
- OK - Click "Begin" and this invokes a long-running task on the server (5.0 seconds)
- ? - During this 5sec (while the long-running task is running), I'd like to click "Send" and have the server immediately echo back the random number that was sent from the client while the long-running task continues to be concurrently executed in the event loop
For that last bullet point, it is not working that way: rather, if you click "Send" while the long process is running, the long process finishes and then the numbers are echoed back. To me, this demonstrates that await simulate_long_process(websocket)
is truly waiting for simulate_long_process()
to complete -- makes sense. However, part of me was expecting that await simulate_long_process(websocket)
would signal the event loop that it could go work on other tasks and therefore go back to the while True
loop to service the next incoming messages. I was expecting this because simulate_long_process()
is fully async (async def
, await websocket.send_text()
, and await asyncio.sleep()
). The current behavior kinda makes sense but not what I want. So my question is, how can I achieve my goal of responding to incoming messages on the WebSocket while the long-running task is running? I am interested in two (or more) approaches:
- Spawning the long-running task in a different thread. For example, with
asyncio.to_thread()
or by stuffing a message into a separate queue that another thread is reading, which then executes the long-running task (e.g. like a producer/consumer queue). Furthermore, I can see how using those same queues, at the end of the long-running tasks, I could then send acknowledgment messages back to the Starlette/async thread and then back to the client over the WebSocket to tell them a task has completed. - Somehow achieving this "purely async"? "Purely async" means mostly or entirely using features/methods from the
asyncio
package. This might delve into synchronous or blocking code, but here I'm thinking about things like: organizing my coroutines into aTaskGroup()
object to get concurrent execution, usingcall_soon()
, usingrun_in_executor()
, etc. I'm really interested in hearing about this approach! But I'm skeptical since it may be convoluted. The spirit of this is mentioned here: https://stackoverflow.com/questions/35355849/long-running-tasks-with-async-server
I can certainly see the path to completion on approach (1). So I'm debating how "pure async" I try to go -- maybe Starlette (running in its own thread) is the only async portion of my entire app, and the rest of my (CPU-bound, blocking) app is on a different (synchronous) thread. Then, the Starlette async thread and the CPU-bound sync thread simply coordinate via a queue. This is where I'm headed but I'd like to hear some thoughts to see if a "pure async" approach could be reasonably implemented. Stated differently, if someone could refactor the code snippet below to work as intended (responding immediately to "Send" while the long-running task is running), using only or mostly methods from asyncio
then that would be a good demonstration.
from starlette.applications import Starlette
from starlette.responses import HTMLResponse
from starlette.routing import Route, WebSocketRoute
import uvicorn
import asyncio
index_str = """<!DOCTYPE HTML>
<html>
<head>
<script type = "text/javascript">
const websocket = new WebSocket("ws://127.0.0.1:80");
window.addEventListener("DOMContentLoaded", () => {
websocket.onmessage = ({ data }) => {
console.log('Received: ' + data)
document.body.innerHTML += data + "<br>";
};
});
</script>
</head>
<body>
WebSocket Async Experiment<br>
<button onclick="websocket.send(Math.floor(Math.random()*10))">Send</button><br>
<button onclick="websocket.send('begin')">Begin</button><br>
<button onclick="websocket.send('close')">Close</button><br>
</body>
</html>
"""
def homepage(request):
return HTMLResponse(index_str)
async def simulate_long_process(websocket):
await websocket.send_text(f'Running long process...')
await asyncio.sleep(5.0)
async def websocket_endpoint(websocket):
await websocket.accept()
await websocket.send_text(f'Server connected')
while True:
msg = await websocket.receive_text()
print(f'server received: {msg}')
if msg == 'begin':
await simulate_long_process(websocket)
elif msg == 'close':
await websocket.send_text('Server closed')
break
else:
await websocket.send_text(f'Server received {msg} from client')
await websocket.close()
print('Server closed')
if __name__ == '__main__':
routes = [
Route('/', homepage),
WebSocketRoute('/', websocket_endpoint) ]
app = Starlette(debug=True, routes=routes)
uvicorn.run(app, host='0.0.0.0', port=80)
答案1
得分: 1
First:
However, part of me was expecting that await simulate_long_process(websocket) would signal the event loop that it could go work on other tasks
这正是 await
的含义:它意味着,“停止执行此协程(websocket_endpoint
),而我们等待来自 simulate_long_process
的结果,并继续服务其他协程”。
正如所发生的那样,你没有任何并发协程在运行,因此这只是暂停,直到 simulate_long_process
返回。
Second:
即使你同时运行 simulate_long_process
(例如,通过使用 asyncio.create_task
创建任务,然后检查其是否完成),你的 while
循环会阻塞,等待来自客户端的文本。这意味着你不能在 simulate_long_process
完成后向客户端发送消息,因为客户端需要在 while
循环的主体执行之前向你发送某些内容。
英文:
First:
> However, part of me was expecting that await simulate_long_process(websocket) would signal the event loop that it could go work on other tasks
That is exactly what await
means: it means, "stop executing this coroutine (websocket_endpoint
) while we wait for a result from simulate_long_process
, and go service other coroutines".
As it happens, you don't have any concurrent coroutines running, so this just pauses things until simulate_long_process
returns.
Second:
Even if you were to run simulate_long_process
concurrently (e.g., by creating a task using asyncio.create_task
and then checking if its complete), your while
loop blocks waiting for text from the client. This means that you can't, for instance, send the client a message when simulate_long_process
completes, because the client needs to send you something before the body of the while
loop can execute.
I haven't worked with Starlette before, so this may not be the most canonical solution, but here's an implementation that uses a WebSocketEndpoint
to implement the desired behavior:
from starlette.applications import Starlette
from starlette.responses import HTMLResponse
from starlette.routing import Route, WebSocketRoute
from starlette.endpoints import WebSocketEndpoint
import uvicorn
import asyncio
SERVER_PORT=8000
index_str = """<!DOCTYPE HTML>
<html>
<head>
<script type = "text/javascript">
const websocket = new WebSocket("ws://127.0.0.1:%s");
window.addEventListener("DOMContentLoaded", () => {
websocket.onmessage = ({ data }) => {
console.log('Received: ' + data)
document.body.innerHTML += data + "<br>";
};
});
</script>
</head>
<body>
WebSocket Async Experiment<br>
<button onclick="websocket.send(Math.floor(Math.random()*10))">Send</button><br>
<button onclick="websocket.send('begin')">Begin</button><br>
<button onclick="websocket.send('close')">Close</button><br>
</body>
</html>
""" % (SERVER_PORT)
def homepage(request):
return HTMLResponse(index_str)
class Consumer(WebSocketEndpoint):
encoding = 'text'
task = None
async def on_connect(self, ws):
await ws.accept()
async def on_receive(self, ws, data):
match data:
case 'begin':
if self.task is not None:
await ws.send_text('background task is already running')
return
await ws.send_text('start background task')
self.task = asyncio.create_task(self.simulate_long_task(ws))
case 'close':
await ws.send_text('closing connection')
await ws.close()
case _:
await ws.send_text(f'Server received {data} from client')
async def simulate_long_task(self, ws):
await ws.send_text('start long process')
await asyncio.sleep(5)
await ws.send_text('finish long process')
self.task = None
async def on_disconnect(self, ws, close_code):
pass
if __name__ == '__main__':
routes = [
Route('/', homepage),
WebSocketRoute('/', Consumer) ]
app = Starlette(debug=True, routes=routes)
uvicorn.run(app, host='0.0.0.0', port=SERVER_PORT)
(Note that this by default uses port 8000
instead of port 80
because I already have something running on port 80
locally.)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论