Python 电报机器人 v20 异步运行

huangapple go评论81阅读模式
英文:

Python telegram bot v20 run asynchronously

问题

from telegram import Update
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes

import time

async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    await update.message.reply_text("start")
    time.sleep(10)  # a process that's going to take some time
    await update.message.reply_text("finish")

app = ApplicationBuilder().token("TOKEN HERE").build()

app.add_handler(CommandHandler("start", start))

app.run_polling()

这是我目前在项目中遇到的问题的最简单示例。

机器人必须执行一个需要一些时间才能完成的过程。

但是在该过程中,机器人停止响应其他用户。

我尝试了一切。

我尝试了较旧版本的Python Telegram Bot。

我尝试使用线程(不适用于异步函数)和asyncio(我对这种类型的东西不太熟悉,但由于某些原因仍然无法响应其他用户)。

我甚至尝试在“start”函数中创建两个函数(一个异步和一个非异步),然后通过普通函数的线程运行异步函数。

from telegram import Update
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes

import time
import threading
import asyncio

async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:

    async def thread1(upd: Update, ctx: ContextTypes.DEFAULT_TYPE):

        await update.message.reply_text('start')

        time.sleep(10)

        await update.message.reply_text('finish')

    def thread_the_thread(upd: Update, ctx: ContextTypes.DEFAULT_TYPE):

        loop = asyncio.new_event_loop()

        asyncio.set_event_loop(loop)

        loop.run_until_complete(thread1(upd, ctx))

        loop.close()

    t = threading.Thread(target=thread_the_thread, args=(update, context))

    t.start()

app = ApplicationBuilder().token("TOKEN HERE").build()

app.add_handler(CommandHandler("start", start))

app.run_polling()

但是当我使用两个不同的用户使用机器人时...

telegram.error.NetworkError: Unknown error in HTTP implementation: RuntimeError('<asyncio.locks.Event object at 0x0000022361E0A920 [unset]>' is bound to a different event loop)
英文:
from telegram import Update
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes

import time


async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -&gt; None:
    await update.message.reply_text(&quot;start&quot;)
    time.sleep(10)  # a process that&#39;s going to take some time
    await update.message.reply_text(&quot;finish&quot;)


app = ApplicationBuilder().token(&quot;TOKEN HERE&quot;).build()

app.add_handler(CommandHandler(&quot;start&quot;, start))

app.run_polling()

This is the simplest example of the problem i'm currently facing in a project

The bot has to do a process that takes some time to finish

But the bot stops responding to other users during that process

I tried everything

I tried older versions of python telegram bot

I tried using threads (which won't work on async functions) and asyncio (i'm not so familiar with this sorta stuffs but for some reasons still did not respond to other users)

I even tried creating two functions inside the "start" function (one async and one not async) and then running the async function through a thread of the normal function.

from telegram import Update

from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes

import time

import threading

import asyncio

async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -&gt; None:

    async def thread1(upd: Update, ctx: ContextTypes.DEFAULT_TYPE):

        await update.message.reply_text(&#39;start&#39;)

        time.sleep(10)

        await update.message.reply_text(&#39;finish&#39;)

    def thread_the_thread(upd: Update, ctx: ContextTypes.DEFAULT_TYPE):

        loop = asyncio.new_event_loop()

        asyncio.set_event_loop(loop)

        loop.run_until_complete(thread1(upd, ctx))

        loop.close()

    t = threading.Thread(target=thread_the_thread, args=(update, context))

    t.start()

app = ApplicationBuilder().token(&quot;TOKEN HERE&quot;).build()

app.add_handler(CommandHandler(&quot;start&quot;, start))

app.run_polling()

But when i used the bot with two different users...

telegram.error.NetworkError: Unknown error in HTTP implementation: RuntimeError(&#39;&lt;asyncio.locks.Event object at 0x0000022361E0A920 [unset]&gt; is bound to a different event loop&#39;)

答案1

得分: 1

文档

有关PTB(python-telegram-bot)中异步工作的相关文档在谷歌上确实很难找到。提示:使用'concurrency'关键字。

官方文档中回答你的问题在这里

阻塞 vs 异步

在深入研究PTB之前,值得一提的是你使用了阻塞的*time.sleep()*函数。为了使其成为异步操作,你应该使用适当的函数:asyncio.sleep()。你可以参考这个讨论

PTB并发性

关于PTB并发性:有两种方法使机器人以异步方式运行:

1. block=False

在添加处理程序时使用'block=False'选项:

app.add_handler(CommandHandler("start", start, block=False))

要小心这一点(PTB文档的引用):

但是,在处理程序中使用block=False后,您不能再依赖于不同组中的处理程序按顺序调用。根据您的用例,这可能会成为一个问题。因此,PTB提供了第二个选项。

2. concurrent_updates

在构建Application实例时激活concurrent_updates选项:

app = ApplicationBuilder().token('TOKEN HERE').concurrent_updates(True).build()

示例

这是我的一段代码,它会按照您的期望运行(我使用Loguru进行日志记录,您可以用print()来简化):

from telegram import Update
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes
import asyncio
import random
import sys
from loguru import logger

COUNTER = 0
TOKEN = "TOKEN HERE"
logger.add(sys.stdout, level="TRACE", format="<green>{time}</green> | <blue>{function}</blue> | {message}", serialize=False)

async def logic(num: int) -> int:
    """一些要运行的逻辑"""
    delay = random.randint(5, 10)
    logger.trace(f"逻辑-{num}将延迟{delay}秒运行")
    await asyncio.sleep(delay)
    return delay

async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    msg = '已启动'
    logger.trace(msg)

    await context.bot.send_message(
        chat_id=update.message.chat_id,
        text=msg
    )

    global COUNTER
    result = await logic(COUNTER)
    COUNTER += 1
    msg = f"工作了{result}秒后完成"
    await context.bot.send_message(
        chat_id=update.message.chat_id,
        text=msg
    )
    logger.trace(msg)

def main():
    # 选项1:block=False
    # app = ApplicationBuilder().token(TOKEN).build()
    # app.add_handler(CommandHandler("start", start, block=False))

    # 选项2:设置concurrent_updates
    app = ApplicationBuilder().token(TOKEN).concurrent_updates(True).build()
    app.add_handler(CommandHandler("start", start))

    app.run_polling()

if __name__ == '__main__':
    main()

希望这有所帮助。

英文:

Documentation

Somehow the related documentation about asynchronous working in PTB (python-telegram-bot) is really hard to google. Hint: use 'concurrency' keyword.

The answer to your question in official docs is here.

Blocking vs Async

Before jumping to PTB itself, it's worth mentioning that you use blocking time.sleep() function. In order for it to be asynchronous, you should use the appropriate function: asyncio.sleep().
You could refer to that discussion.

PTB concurrency

As for PTB concurrency: there are 2 ways to make the bot running asynchronously:

1. block=False

Use 'block=False' option during adding the handler:

app.add_handler(CommandHandler(&quot;start&quot;, start, block=False))

Be cautious with that (a quote from PTB docs):

> However, by using block=False in a handler, you can no longer rely on handlers in different groups being called one after the other. Depending on your use case, this can be an issue. Hence, PTB comes with a second option.

2. concurrent_updates

Activate concurrent_updates option during building the Application instance:

app = ApplicationBuilder().token(&#39;TOKEN HERE&#39;).concurrent_updates(True).build()

Example

This is my piece of code which running as you expect it to (I'm using Loguru for logging purposes, you could replace it with print() for simplicity):

from telegram import Update
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes
import asyncio
import random
import sys
from loguru import logger

COUNTER = 0
TOKEN = &quot;TOKEN HERE&quot;
logger.add(sys.stdout, level=&quot;TRACE&quot;, format=&quot;&lt;green&gt;{time}&lt;/green&gt; | &lt;blue&gt;{function}&lt;/blue&gt; | {message}&quot;, serialize=False)


async def logic(num: int) -&gt; int:
    &quot;&quot;&quot;Some logic to run&quot;&quot;&quot;
    delay = random.randint(5, 10)
    logger.trace(f&quot;Logic-{num} to be delayed for {delay} seconds&quot;)
    await asyncio.sleep(delay)
    return delay


async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -&gt; None:
    msg = &#39;started&#39;
    logger.trace(msg)

    await context.bot.send_message(
        chat_id=update.message.chat_id,
        text=msg
    )

    global COUNTER
    result = await logic(COUNTER)
    COUNTER +=1
    msg = f&quot;Finished after {result} seconds of working&quot;
    await context.bot.send_message(
        chat_id=update.message.chat_id,
        text=msg
    )
    logger.trace(msg)


def main():
    # option 1: block=False
    # app = ApplicationBuilder().token(TOKEN).build()
    # app.add_handler(CommandHandler(&quot;start&quot;, start, block=False))
   
    # option 2: set concurrent_updates
    app = ApplicationBuilder().token(TOKEN).concurrent_updates(True).build()
    app.add_handler(CommandHandler(&quot;start&quot;, start))

    app.run_polling()


if __name__ == &#39;__main__&#39;:
    main()

Hope that helps.

huangapple
  • 本文由 发表于 2023年7月6日 13:32:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/76625766.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定