利用异步和多线程来快速编写Python代码。

huangapple go评论80阅读模式
英文:

Taking advantage of both async and multithreading to make rapid code in Python

问题

我想创建一个程序,能够使用自定义超时时间对每个主机进行ping(icmp),扫描端口,获取HTTP。程序需要尽可能地利用最大的CPU资源以提高效率。在第一眼看来,使用主线程中的异步操作似乎不错,但实际上并不符合高负载要求。我希望在不到60秒的时间内扫描100,000个主机,其中0.05%的主机是DOWN(等待超时)。

我想知道如何创建多个线程,并在这些线程中异步扫描一批主机。也欢迎任何其他解决这种问题的方法。

async def ping_host(host, timeout=5, retry=1):
    for i in range(retry):
        try:
            delay = await aioping.ping(host, timeout=timeout)
            return True, host, delay
        except (OSError, asyncio.TimeoutError, socket.gaierror):
            pass
    return False, host

def ping_hosts(hosts):
    results = {}
    with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as pool:
        loop = asyncio.get_event_loop()
        tasks = []
        for host in hosts:
            task = loop.run_in_executor(pool, ping_host, host)
            tasks.append(task)
        for index, task in enumerate(asyncio.as_completed(tasks)):
            host = hosts[index]
            try:
                result = loop.run_until_complete(task)
            except Exception as e:
                print(f"Ex: {e}")
                result = False
            results[host] = result
    return results
Ex: This event loop is already running
...
google.com: DOWN
False(结果)
...
英文:

I want to create a program that can ping (icmp), scan ports, get HTTP for each host with custom timeout. Program needs to utilize maximum CPU as possible to make it efficient. Using async in main thread is good at first glance but actually doesn't corresponds with high load requirements. I expect to scan 100,000 hosts in less than 60 seconds with 0.0005 ratio (0.05%) of hosts that are DOWN (waiting for the timeout).

I'm wondering how to create many threads and run asynchronous scanning of a batch of hosts in them. Open to any kind of other solutions to this kind of problem.

async def ping_host(host, timeout=5, retry=1):
    for i in range(retry):
        try:
            delay = await aioping.ping(host, timeout=timeout)
            return True, host, delay
        except (OSError, asyncio.TimeoutError, socket.gaierror):
            pass
    return False, host


def ping_hosts(hosts):
    results = {}
    with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as pool:
        loop = asyncio.get_event_loop()
        tasks = []
        for host in hosts:
            task = loop.run_in_executor(pool, ping_host, host)
            tasks.append(task)
        for index, task in enumerate(asyncio.as_completed(tasks)):
            host = hosts[index]
            try:
                result = loop.run_until_complete(task)
            except Exception as e:
                print(f"Ex: {e}")
                result = False
            results[host] = result
    return results
Ex: This event loop is already running
...
google.com: DOWN
False (RESULT)
...

答案1

得分: 1

可以考虑使用队列来完成这个任务。我刚刚测试了一下,可以在大约2秒钟内ping通100个主机。队列被设置为最多可以容纳50个任务,但是一旦一个任务完成,新的任务就会被添加进来,所以速度非常快。你可以将它设置为100个任务而没有问题。它应该能够在很短的时间内处理成千上万个主机。

代码:

from asyncio import run, Queue, create_task
from aioping import ping
from time import perf_counter

async def ping_host(host, taskQueue, resultsQueue, timeout, retry):
    try:
        delay = await ping(host, timeout)
        await resultsQueue.put([True, host, delay])
    except TimeoutError:
        if retry:
            try:
                delay = await ping(host, timeout)
                await resultsQueue.put([True, host, delay])
            except: await resultsQueue.put([False, host, f"{host} 超时(重试)。"])
        else: await resultsQueue.put([False, host, f"{host} 超时。"])
    taskQueue.task_done()
    taskQueue.get_nowait()

async def ping_hosts(hosts, timeout=2, retry=False):
    # hosts = [["host1.com"], ["host2.com"], [...]]
    results: list = []
    taskQueue: Queue = Queue(maxsize=50)
    resultsQueue: Queue = Queue()
    for host in hosts:
        await taskQueue.put(create_task(ping_host(host[0], taskQueue, resultsQueue, timeout, retry)))
    await taskQueue.join()
    while not resultsQueue.empty():
        results.append(await resultsQueue.get())
        resultsQueue.task_done()
    await resultsQueue.join()
    return(results)

async def main():
    script_start: float = perf_counter()
    hosts: list = [["1.1.1.1"], ["81.83.12.253"], ["46.246.29.69"],["77.68.88.76"],["195.238.40.45"],["8.8.8.8"], ["1.0.0.1"], ["198.101.242.72"], ["23.253.163.53"], ["205.204.88.60"], ["91.239.100.100"], ["89.233.43.71"],["google.com"], ["bing.com"], ["yahoo.com"], ["test.com"],["google.com"], ["bing.com"], ["yahoo.com"],["google.com"], ["bing.com"], ["yahoo.com"],["google.com"], ["bing.com"], ["yahoo.com"],["google.com"], ["bing.com"], ["yahoo.com"],["google.com"], ["bing.com"], ["yahoo.com"],["google.com"], ["bing.com"], ["yahoo.com"],["google.com"], ["bing.com"], ["yahoo.com"],["test.com"],["1.1.1.1"], ["81.83.12.253"], ["46.246.29.69"],["77.68.88.76"],["195.238.40.45"],["8.8.8.8"], ["1.0.0.1"], ["198.101.242.72"], ["23.253.163.53"], ["205.204.88.60"], ["91.239.100.100"], ["89.233.43.71"],["google.com"], ["bing.com"], ["yahoo.com"], ["test.com"],["google.com"], ["bing.com"], ["yahoo.com"],["google.com"], ["bing.com"], ["yahoo.com"],["google.com"], ["bing.com"], ["yahoo.com"],["google.com"], ["bing.com"], ["yahoo.com"],["google.com"], ["bing.com"], ["yahoo.com"],["google.com"], ["bing.com"], ["yahoo.com"],["test.com"]]
    test: list = await ping_hosts(hosts)
    print(test)
    print(f"脚本已完成。运行耗时 {round(perf_counter()-script_start, 2)} 秒。")

if __name__ == "__main__":
    run(main())

测试64个主机,结果:

[[True, '1.1.1.1', 0.009948200000000018], [True, '46.246.29.69', 0.0228458], [True, 'bing.com', 0.015366999999999992], [True, '1.0.0.1', 0.013153100000000001], [True, 'bing.com', 0.020199400000000006], [True, '89.233.43.71', 0.02210000000000001], [True, 'bing.com', 0.019370899999999996], [True, '77.68.88.76', 0.035211400000000004], [True, 'bing.com', 0.018472500000000003], [True, '81.83.12.253', 0.03657640000000001], [True, '91.239.100.100', 0.023512000000000005], [True, 'bing.com', 0.01855379999999998], [True, '8.8.8.8', 0.028554300000000005], [True, 'google.com', 0.0303359], [True, 'bing.com', 0.015900700000000018], [True, 'google.com', 0.026591299999999984], [True, '1.1.1.1', 0.017966500000000024], [True, '1.0.0.1', 0.018603599999999998], [True, 'google.com', 0.028806400000000038], [True, '91.239.100.100', 0.0188546], [True, 'bing.com', 0.017099299999999984], [True, 'google.com', 0.028467900000000018], [True, '195.238.40.45', 0.03879199999999999], [True, 'google.com', 0.027985999999999983], [True, '89.233.43.71', 0.021615100000000026], [True, 'bing.com', 0.021515199999999984], [True, '46.246.29.69', 0.027923399999999987], [True, '8.8.8.8', 0.02808860000000002], [True, 'google.com', 0.03274550000000001], [True, 'google.com', 0.027735399999999966], [True, 'bing.com', 0.021062299999999978], [True, 'google.com', 0.026245300000000027], [True, 'google.com', 0.027394100000000005], [True, 'google.com', 0.024639000000000022], [True, 'bing.com', 

<details>
<summary>英文:</summary>

May I suggest using Queues for this task. I just tested and it can ping 100 hosts using around 2 seconds. The Queue is set to can hold 50 jobs maximum, but as soon as a job completes a new is added, so it&#39;ll be very fast. You could set it 100 without issues. It should be able to handle many thousands of hosts in no time.

Code:

    from asyncio import run, Queue, create_task
    from aioping import ping
    from time import perf_counter
    
    async def ping_host(host, taskQueue, resultsQueue, timeout, retry):
        try:
            delay = await ping(host, timeout)
            await resultsQueue.put([True, host, delay])
        except TimeoutError:
            if retry:
                try:
                    delay = await ping(host, timeout)
                    await resultsQueue.put([True, host, delay])
                except: await resultsQueue.put([False, host, f&quot;{host} timed out (retried).&quot;])
            else: await resultsQueue.put([False, host, f&quot;{host} timed out.&quot;])
        taskQueue.task_done()
        taskQueue.get_nowait()
    
    async def ping_hosts(hosts, timeout=2, retry=False):
        # hosts = [[&quot;host1.com&quot;], [&quot;host2.com&quot;], [...]]
        results: list = []
        taskQueue: Queue = Queue(maxsize=50)
        resultsQueue: Queue = Queue()
        for host in hosts:
            await taskQueue.put(create_task(ping_host(host[0], taskQueue, resultsQueue, timeout, retry)))
        await taskQueue.join()
        while not resultsQueue.empty():
            results.append(await resultsQueue.get())
            resultsQueue.task_done()
        await resultsQueue.join()
        return(results)
    
    async def main():
        script_start: float = perf_counter()
        hosts: list = [[&quot;1.1.1.1&quot;], [&quot;81.83.12.253&quot;], [&quot;46.246.29.69&quot;],[&quot;77.68.88.76&quot;],[&quot;195.238.40.45&quot;],[&quot;8.8.8.8&quot;], [&quot;1.0.0.1&quot;], [&quot;198.101.242.72&quot;], [&quot;23.253.163.53&quot;], [&quot;205.204.88.60&quot;], [&quot;91.239.100.100&quot;], [&quot;89.233.43.71&quot;],[&quot;google.com&quot;], [&quot;bing.com&quot;], [&quot;yahoo.com&quot;], [&quot;test.com&quot;],[&quot;google.com&quot;], [&quot;bing.com&quot;], [&quot;yahoo.com&quot;],[&quot;google.com&quot;], [&quot;bing.com&quot;], [&quot;yahoo.com&quot;],[&quot;google.com&quot;], [&quot;bing.com&quot;], [&quot;yahoo.com&quot;],[&quot;google.com&quot;], [&quot;bing.com&quot;], [&quot;yahoo.com&quot;],[&quot;google.com&quot;], [&quot;bing.com&quot;], [&quot;yahoo.com&quot;], [&quot;test.com&quot;], [&quot;1.1.1.1&quot;], [&quot;81.83.12.253&quot;], [&quot;46.246.29.69&quot;],[&quot;77.68.88.76&quot;],[&quot;195.238.40.45&quot;],[&quot;8.8.8.8&quot;], [&quot;1.0.0.1&quot;], [&quot;198.101.242.72&quot;], [&quot;23.253.163.53&quot;], [&quot;205.204.88.60&quot;], [&quot;91.239.100.100&quot;], [&quot;89.233.43.71&quot;],[&quot;google.com&quot;], [&quot;bing.com&quot;], [&quot;yahoo.com&quot;], [&quot;test.com&quot;],[&quot;google.com&quot;], [&quot;bing.com&quot;], [&quot;yahoo.com&quot;],[&quot;google.com&quot;], [&quot;bing.com&quot;], [&quot;yahoo.com&quot;],[&quot;google.com&quot;], [&quot;bing.com&quot;], [&quot;yahoo.com&quot;],[&quot;google.com&quot;], [&quot;bing.com&quot;], [&quot;yahoo.com&quot;],[&quot;google.com&quot;], [&quot;bing.com&quot;], [&quot;yahoo.com&quot;], [&quot;test.com&quot;]]
        test: list = await ping_hosts(hosts)
        print(test)
        print(f&quot;Script has finished. It took {round(perf_counter()-script_start, 2)} seconds to run.&quot;)
    
    if __name__ == &quot;__main__&quot;:
        run(main())

Testing 64 hosts, results:

    [[True, &#39;1.1.1.1&#39;, 0.009948200000000018], [True, &#39;46.246.29.69&#39;, 0.0228458], [True, &#39;bing.com&#39;, 0.015366999999999992], [True, &#39;1.0.0.1&#39;, 0.013153100000000001], [True, &#39;bing.com&#39;, 0.020199400000000006], [True, &#39;89.233.43.71&#39;, 0.02210000000000001], [True, &#39;bing.com&#39;, 0.019370899999999996], [True, &#39;77.68.88.76&#39;, 0.035211400000000004], [True, &#39;bing.com&#39;, 0.018472500000000003], [True, &#39;81.83.12.253&#39;, 0.03657640000000001], [True, &#39;91.239.100.100&#39;, 0.023512000000000005], [True, &#39;bing.com&#39;, 0.01855379999999998], [True, &#39;8.8.8.8&#39;, 0.028554300000000005], [True, &#39;google.com&#39;, 0.0303359], [True, &#39;bing.com&#39;, 0.015900700000000018], [True, &#39;google.com&#39;, 0.026591299999999984], [True, &#39;1.1.1.1&#39;, 0.017966500000000024], [True, &#39;1.0.0.1&#39;, 0.018603599999999998], [True, &#39;google.com&#39;, 0.028806400000000038], [True, &#39;91.239.100.100&#39;, 0.0188546], [True, &#39;bing.com&#39;, 0.017099299999999984], [True, &#39;google.com&#39;, 0.028467900000000018], [True, &#39;195.238.40.45&#39;, 0.03879199999999999], [True, &#39;google.com&#39;, 0.027985999999999983], [True, &#39;89.233.43.71&#39;, 0.021615100000000026], [True, &#39;bing.com&#39;, 0.021515199999999984], [True, &#39;46.246.29.69&#39;, 0.027923399999999987], [True, &#39;8.8.8.8&#39;, 0.02808860000000002], [True, &#39;google.com&#39;, 0.03274550000000001], [True, &#39;google.com&#39;, 0.027735399999999966], [True, &#39;bing.com&#39;, 0.021062299999999978], [True, &#39;google.com&#39;, 0.026245300000000027], [True, &#39;google.com&#39;, 0.027394100000000005], [True, &#39;google.com&#39;, 0.024639000000000022], [True, &#39;bing.com&#39;, 0.011746499999999993], [True, &#39;bing.com&#39;, 0.011770199999999953], [True, &#39;bing.com&#39;, 0.01149420000000001], [True, &#39;81.83.12.253&#39;, 0.03624450000000001], [True, &#39;195.238.40.45&#39;, 0.03807920000000001], [True, &#39;77.68.88.76&#39;, 0.040769900000000026], [True, &#39;google.com&#39;, 0.02052520000000002], [True, &#39;google.com&#39;, 0.021225800000000017], [True, &#39;yahoo.com&#39;, 0.12673379999999998], [True, &#39;yahoo.com&#39;, 0.13697290000000004], [True, &#39;yahoo.com&#39;, 0.1341727], [True, &#39;yahoo.com&#39;, 0.1289647], [True, &#39;yahoo.com&#39;, 0.11841639999999998], [True, &#39;yahoo.com&#39;, 0.12921200000000005], [True, &#39;yahoo.com&#39;, 0.17543620000000001], [True, &#39;yahoo.com&#39;, 0.17722649999999998], [True, &#39;yahoo.com&#39;, 0.17867729999999998], [True, &#39;yahoo.com&#39;, 0.18185169999999998], [True, &#39;yahoo.com&#39;, 0.1791689], [True, &#39;yahoo.com&#39;, 
    0.17632149999999996], [False, &#39;23.253.163.53&#39;, &#39;23.253.163.53 timed out.&#39;], [False, &#39;205.204.88.60&#39;, &#39;205.204.88.60 timed out.&#39;], [False, &#39;test.com&#39;, &#39;test.com timed out.&#39;], [False, &#39;198.101.242.72&#39;, &#39;198.101.242.72 timed out.&#39;], [False, &#39;test.com&#39;, &#39;test.com timed out.&#39;], [False, &#39;205.204.88.60&#39;, &#39;205.204.88.60 timed out.&#39;], [False, &#39;test.com&#39;, &#39;test.com timed out.&#39;], [False, &#39;23.253.163.53&#39;, &#39;23.253.163.53 timed out.&#39;], [False, &#39;198.101.242.72&#39;, &#39;198.101.242.72 timed out.&#39;], [False, &#39;test.com&#39;, &#39;test.com timed out.&#39;]]
    Script has finished. It took 2.09 seconds to run.

</details>



huangapple
  • 本文由 发表于 2023年3月9日 17:12:46
  • 转载请务必保留本文链接:https://go.coder-hub.com/75682507.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定