英文:
Fastapi standalone app not working with websocket listenning
问题
以下是您提供的代码的中文翻译部分:
我想构建一个小型的REST API服务器,返回监听Binance交易所WebSocket的最后100笔交易。
我使用uvicorn启动应用程序,但返回的JSON是'{}'。
如果我使用普通的Python命令执行程序,WebSocket的监听工作正常。
我想要一个独立的服务器,既能响应REST请求,又能监听Binance的WebSocket。
这是我的代码:
```python
from fastapi import FastAPI
import websocket
import json
import time
import uvicorn
app = FastAPI()
last_trade = []
def on_message(ws, message):
global last_trade
data = json.loads(message)
if 'e' in data and data['e'] == 'trade':
last_trade += data
if len(last_trade) > LASTS:
last_trade.pop(0)
def on_error(ws, error):
print(error)
def on_close(ws):
print("连接已关闭")
reconnect()
def on_open(ws):
ws.send(json.dumps({
"method": "SUBSCRIBE",
"params": ["btctusd@trade"],
"id": 1
}))
def reconnect():
delay = 5 # 重新连接延迟(秒)
print(f"在{delay}秒后重新连接...")
time.sleep(delay)
connect_to_binance_websocket()
def connect_to_binance_websocket():
websocket_url = "wss://stream.binance.com:9443/ws"
ws = websocket.WebSocketApp(websocket_url,
on_message=on_message,
on_error=on_error,
on_close=on_close)
ws.on_open = on_open
ws.run_forever()
@app.get("/")
async def root():
return json.dumps(last_trade)
LASTS = 100
if __name__ == '__main__':
connect_to_binance_websocket()
uvicorn.run(app, host="0.0.0.0", port=8000)
希望这有所帮助!
英文:
I want to build a tiny REST api server returning the last 100 trades listened on Binance exchange websocket.
I start the app with uvicorn but the json returned is '{}'.
The listening of the websocket works if i execute the program with normal python command
I want to have a standalone server that both respond to REST requests and listen to Binance's websocket.
This is my code:
import websocket
import json
import time
import uvicorn
app = FastAPI()
last_trade = []
def on_message(ws, message):
global last_trade
data = json.loads(message)
if 'e' in data and data['e'] == 'trade':
last_trades += data
if len(last_trade) > LASTS:
last_trade.pop(0)
# print(data)
def on_error(ws, error):
print(error)
def on_close(ws):
print("Connexion fermée")
reconnect()
def on_open(ws):
ws.send(json.dumps({
"method": "SUBSCRIBE",
"params": ["btctusd@trade"],
"id": 1
}))
def reconnect():
delay = 5 # Délai de reconnexion en secondes
print(f"Reconnexion dans {delay} secondes...")
time.sleep(delay)
connect_to_binance_websocket()
def connect_to_binance_websocket():
websocket_url = "wss://stream.binance.com:9443/ws"
ws = websocket.WebSocketApp(websocket_url,
on_message=on_message,
on_error=on_error,
on_close=on_close)
ws.on_open = on_open
ws.run_forever()
@app.get("/")
async def root():
return json.dumps(last_trade)
LASTS=100
if __name__ == '__main__':
connect_to_binance_websocket()
uvicorn.run(app, host="0.0.0.0", port=8000)
Thanks
</details>
# 答案1
**得分**: 1
你可以使用`on_event`装饰器与`startup`一起,用于在uvicorn启动后调用需要的函数。
```python
@app.on_event("startup")
def connect_to_binance_websocket():
英文:
you can use on_event
decorator with startup
on the function that needs to be called after the uvicorn starts
@app.on_event("startup")
def connect_to_binance_websocket():
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论