英文:
Ran the function during project execution using rocketry
问题
我有一个类似这样的代码,我希望在运行项目时执行"do_something"函数。它的工作是准备数据库等等。
我不知道如何做。如果你能指导我,我会很感激。我已经阅读了Rocketry文档,但找不到我需要的内容。
from pyrogram import Client
from rocketry import Rocketry
import asyncio
api = Client("api")
cli = Client("cli")
rocktry = Rocketry()
async def do_something():
await api.send_message(123456, "loading ....")
# 代码
await api.send_message(123456, "bot is ready!")
async def main():
await asyncio.gather(api.start(), cli.start(), rocketry.serve())
api.run(main())
我想到的思路是将一个变量的值设置为false,然后在我的代码中检查变量的值是否为false,执行这个动作,然后将该变量的值设置为true。
英文:
I have a code similar to this, and I want the function "do_something" to be executed when I run the project. Its job is to prepare the database, etc.
I don't know how do it. I would it if you could guide me. I have read the rocketry documentation, but I couldn't find what I need.
from pyrogram import Client
from rocketry import Rocketry
import asyncio
api = Client("api")
cli = Client("cli")
rocktry = Rocketry()
async def do_something():
await api.send_message(123456, "loading ....")
# code
await api.send_message(123456, "bot is ready!")
async def main():
await asyncio.gather(api.start(), cli.start(), rocketry.serve())
api.run(main())
The idea that came to my mind is to set the value of a variable to false and then check in my code if the variable's value is false, perform this action, and afterwards set the value of that variable to true.
答案1
得分: 1
async def main():
await asyncio.gather(api.start(), cli.start(), rocktry.serve())
await do_something()
api.loop.run_until_complete(main())
这将在客户端启动和Rocketry服务器启动后运行do_something()
函数。
英文:
async def main():
await asyncio.gather(api.start(), cli.start(), rocktry.serve())
await do_something()
api.loop.run_until_complete(main())
This will run the do_something()
function after the clients have started and the Rocketry server has started.
答案2
得分: 1
以下是已翻译的代码部分:
我将我的代码更改为以下内容,并且它正常工作:
from pyrogram import Client
from rocketry import Rocketry
import asyncio
api = Client("api")
cli = Client("cli")
rocktry = Rocketry()
async def do_something():
while not api_app.is_connected:
await asyncio.sleep(1)
await api.send_message(123456, "loading ....")
# code
await api.send_message(123456, "bot is ready!")
async def main():
await asyncio.gather(
api_app.start(),
do_something(),
cli_app.start(),
skd_app.serve()
)
api.run(main())
我不知道它的性能如何,但它确实是我正在寻找的东西。
请注意,我已经翻译了代码部分,其他内容未翻译。
英文:
I change my code to this , and it's worked correctly:
from pyrogram import Client
from rocketry import Rocketry
import asyncio
api = Client("api")
cli = Client("cli")
rocktry = Rocketry()
async def do_something():
while not api_app.is_connected:
await asyncio.sleep(1)
await api.send_message(123456, "loading ....")
# code
await api.send_message(123456, "bot is ready!")
async def main():
await asyncio.gather(
api_app.start(),
do_something(),
cli_app.start(),
skd_app.serve()
)
api.run(main())
I don't know if it has good performance or not, but it's exactly what I was looking for.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论