如何在不影响服务器的情况下为我的基于 asyncio 的文件下载脚本添加并发。

huangapple go评论124阅读模式
英文:

How to add concurrency to my asyncio based file downloader script without hitting server

问题

以下是我的代码,尽可能快地使用asycio下载文件的部分:

import asyncio
import os.path
import shutil
import aiofiles

import aiohttp
import lxml.html as htmlparser
import cssselect
import regex, json
from tempfile import TemporaryDirectory

domain = "https://doma.com/"
url = 'https://doma.com/ust/xxxx'
CONTENT_ID = regex.compile(r"/ust/([^?#&/]+)")

def parts_generator(size, start=0, part_size=5 * 1024 ** 2):
    while size - start > part_size:
        yield start, start + part_size
        start += part_size
    yield start, size
    
async def main():
 async def download(url, headers, save_path):
        async with session.get(url, headers=headers) as request:
         file = await aiofiles.open(save_path, 'wb')
         await file.write(await request.content.read())
 
 async with aiohttp.ClientSession() as session:
  async with session.get(url) as first:
   cs = await first.text()
   csrf_token = htmlparser.fromstring(cs).cssselect("meta[name='csrf-token']")[0].get("content")
   content_id = CONTENT_ID.search(url).group(1)
   Headers={"x-requested-with": "XMLHttpRequest", "x-csrf-token": csrf_token}
   async with session.post(domain + "api/get&user=xxx&pass=yyy", headers=Headers, json={"id": content_id}) as resp:
       res = json.loads(await resp.text())
       re = res['result']['Original']['file']
   async with session.get(re) as request:
       size = request.content_length
       tasks = []
       file_parts = []
       filename = 'File.mp4'
       tmp_dir = TemporaryDirectory(prefix=filename, dir=os.path.abspath('.'))
       for number, sizes in enumerate(parts_generator(size)):
        part_file_name = os.path.join(tmp_dir.name, f'{filename}.part{number}')
        file_parts.append(part_file_name)
        tasks.append(await download(re, {'Range': f'bytes={sizes[0]}-{sizes[1]}'}, part_file_name))
       await asyncio.gather(*tasks)
       with open(filename, 'wb') as wfd:
        for f in file_parts:
         with open(f, 'rb') as fd:
          shutil.copyfileobj(fd, wfd)
           
asyncio.run(main())

"使用asycio是否比线程池和多进程更好","我的脚本仍然不能执行并发下载",以及其他相关问题在这个回答中没有被翻译,这部分内容保持不变。

英文:

Below is my code to download file as fast as possible using asycio
Trying to implement, multiple server connection with multi chunk, multi threded downlad just like idm and aria2

import asyncio
import os.path
import shutil
import aiofiles
import aiohttp
import lxml.html as htmlparser
import cssselect
import regex, json
from tempfile import TemporaryDirectory
domain = "https://doma.com/"
url = 'https://doma.com/ust/xxxx'
CONTENT_ID = regex.compile(r"/ust/([^?#&/]+)")
def parts_generator(size, start=0, part_size=5 * 1024 ** 2):
while size - start > part_size:
yield start, start + part_size
start += part_size
yield start, size
async def main():
async def download(url, headers, save_path):
async with session.get(url, headers=headers) as request:
file = await aiofiles.open(save_path, 'wb')
await file.write(await request.content.read())
async with aiohttp.ClientSession() as session:
async with session.get(url) as first:
cs = await first.text()
csrf_token = htmlparser.fromstring(cs).cssselect("meta[name='csrf-token']")[0].get("content")
content_id = CONTENT_ID.search(url).group(1)
Headers={"x-requested-with": "XMLHttpRequest", "x-csrf-token": csrf_token}
async with session.post(domain + "api/get&user=xxx&pass=yyy", headers=Headers, json={"id": content_id}) as resp:
res = json.loads(await resp.text())
re = res['result']['Original']['file']
async with session.get(re) as request:
size = request.content_length
tasks = []
file_parts = []
filename = 'File.mp4'
tmp_dir = TemporaryDirectory(prefix=filename, dir=os.path.abspath('.'))
for number, sizes in enumerate(parts_generator(size)):
part_file_name = os.path.join(tmp_dir.name, f'{filename}.part{number}')
file_parts.append(part_file_name)
tasks.append(await download(re, {'Range': f'bytes={sizes[0]}-{sizes[1]}'}, part_file_name))
await asyncio.gather(*tasks)''
with open(filename, 'wb') as wfd:
for f in file_parts:
with open(f, 'rb') as fd:
shutil.copyfileobj(fd, wfd)
asyncio.run(main())

is using asyncio better than threadpool and multiprocessing

my script still doesnt perform concurrent download

help with adding concurrency and handling cases where server repond with empty payload in casr of excessive request. ysing true loop to sleep and retry in such casees

with idm this mp4 700mb can be downloaded within few minutes and idm can achive 3 mbps speed for this video download on my network

can someone help tweak my python script to achive same spped and failsafe downlaod like idm Also want to be able to play file simultaneiusly while downlaoding

答案1

得分: 4

问题在于你的代码并没有进行真正的异步/并发处理,它在一个for循环中顺序地运行和等待每个异步协程。
有问题的代码块是:

for number, sizes in enumerate(parts_generator(size)):
    ...
    tasks.append(await download(re, {'Range': f'bytes={sizes[0]}-{sizes[1]}'}, part_file_name))
    with open(filename, 'wb') as wfd:
        for f in file_parts:
            ...

另一个问题(在上述代码块中)是,在每次外部for循环迭代中,所有累积的file_parts都会被重复冗余地复制到相同的filename中。应该将它移出父循环,并且只运行一次。

为了实现并发下载,你可以使用asyncio.gather。<br>优化后的代码块如下所示:

...
for number, sizes in enumerate(parts_generator(size)):
    part_file_name = os.path.join(tmp_dir.name, f'{filename}.part{number}')
    file_parts.append(part_file_name)
    tasks.append(download(re, {'Range': f'bytes={sizes[0]}-{sizes[1]}'}, part_file_name))
    
await asyncio.gather(*tasks)
        
with open(filename, 'wb') as wfd:
    for f in file_parts:
        with open(f, 'rb') as fd:
            shutil.copyfileobj(fd, wfd)  
英文:

The issue is that your code does not perform a real asynchronous/concurrent processing, it runs and awaits each async coroutine sequentially in a for loop.
The problematic block is:

    for number, sizes in enumerate(parts_generator(size)):
...
tasks.append(await download(re, {&#39;Range&#39;: f&#39;bytes={sizes[0]}-{sizes[1]}&#39;}, part_file_name))
with open(filename, &#39;wb&#39;) as wfd:
for f in file_parts:
...

Another issue (in above block) is that on each external for loop iteration all accumulated file_parts are repeatedly and redundantly copied to the same filename. That should be moved outside the parent loop and runned just once.

To perform downloads concurrently you can use asyncio.gather.<br>The optimized block would look as below:

...
for number, sizes in enumerate(parts_generator(size)):
part_file_name = os.path.join(tmp_dir.name, f&#39;{filename}.part{number}&#39;)
file_parts.append(part_file_name)
tasks.append(download(re, {&#39;Range&#39;: f&#39;bytes={sizes[0]}-{sizes[1]}&#39;}, part_file_name))
await asyncio.gather(*tasks)
with open(filename, &#39;wb&#39;) as wfd:
for f in file_parts:
with open(f, &#39;rb&#39;) as fd:
shutil.copyfileobj(fd, wfd)  

答案2

得分: 1

注意,无论你做什么,你的下载速度仍然会受到你自己和服务器的网络速度和处理能力的限制,所以不要期望从单个服务器上以类似于下载种子文件的速度下载一个1GB的文件。无论如何,肯定有方法可以提高单服务器下载的性能;

计算你必须同时获取多少部分,查询服务器以获取所有单独的部分,将这些部分放入内存或磁盘,然后将所有部分合并成原始文件。

你可能会遇到超时错误,和/或 429 错误。这两者都可能是由于服务器无法同时处理数百个下载请求造成的。对于 429 错误,你可以减少并行下载的数量,对于超时错误,你可以在重试时加一些延迟,给服务器一些时间 - 在下面的示例中,这两者都没有被实现。

import asyncio
import aiohttp
import time

DOWNLOAD_URL = "https://speed.hetzner.de/1GB.bin"
FILENAME = "1GB.bin"
PART_SIZE = 1024 * 1024 * 10  # 10 MB

async def file_size(session, url):
    async with session.get(url) as resp:
        return int(resp.headers["Content-Length"])

async def download_part(session, url, part, start, end):

    print(f"Downloading part {part} from {start} to {end}...")

    # Download the part of the file to memory.
    headers = {"Range": f"bytes={start}-{end}"}
    async with session.get(url, headers=headers) as resp:
        resp = await resp.read()

    return resp

async def combine_parts(parts):

    print("Combining parts...")

    with open(FILENAME, "wb") as f:
        for i in parts:
            f.write(i)

async def main():

    print("Starting download...")
    start_time = time.time()

    async with aiohttp.ClientSession() as session:

        # Retrieves the file size by making a GET request
        # and extracting the Content-Length header value
        # before downloading the file.
        size = await file_size(session, DOWNLOAD_URL)
        print(f"File size: {size} bytes.")

        # With the known file size, we can calculate the number
        # of parts to download in parallel based on how big each
        # part should be.
        num_parts = size // PART_SIZE
        print(f"Number of parts: {num_parts}.")

        # We create a list of tasks to download each part.
        tasks = []
        for part in range(num_parts):
            start, end = part * PART_SIZE, (part + 1) * PART_SIZE
            tasks.append(download_part(session, DOWNLOAD_URL, part, start, end))

        # We use the gather function to download all parts in parallel.
        parts = await asyncio.gather(*tasks)
        print("All parts downloaded.")

        # We can now concatenate all parts to get the full file.
        await combine_parts(parts)
        print("File combined.")

    end_time = time.time()
    print(f"File downloaded in {end_time - start_time} seconds.")

asyncio.run(main())

完成于66秒。与一次性下载相比,它快了4.5倍。请注意,这并不一定意味着你拥有的下载分块越多越好(100个工作线程每个下载10MB)。可能的情况是,每个工作线程下载更大的块可能更有效(10个工作线程每个下载100MB),因为目标服务器可以更好地为较少的同时客户端提供服务。这是你可能需要尝试的。

英文:

Bear in mind that whatever you do, you will still be limited by your own and the servers' network speed & processing power, so don't expect torrent-like speed downloading a 1GB file from a single server. Whatever the case, there is certainly a way to improve download performance for single-server downloads;

Calculate how many parts you have to gather simultaneously, query the server for all separate parts, place the parts in memory or disk, and combine all of the parts into the original file.

You might find yourself getting timeout errors, and/or 429 errors. Both of these would be due to the server being unable to process hundreds of downloads simultaneously. For 429 you can decrease the amount of parallel downloads, and for timeouts you can retry with some sleep to give the server some time - neither of which is implemented in the example below.

import asyncio
import aiohttp
import time

DOWNLOAD_URL = &quot;https://speed.hetzner.de/1GB.bin&quot;
FILENAME = &quot;1GB.bin&quot;
PART_SIZE = 1024 * 1024 * 10  # 10 MB

async def file_size(session, url):
    async with session.get(url) as resp:
        return int(resp.headers[&quot;Content-Length&quot;])

async def download_part(session, url, part, start, end):

    print(f&quot;Downloading part {part} from {start} to {end}...&quot;)

    # Download the part of the file to memory.
    headers = {&quot;Range&quot;: f&quot;bytes={start}-{end}&quot;}
    async with session.get(url, headers=headers) as resp:
        resp = await resp.read()

    return resp

async def combine_parts(parts):

    print(&quot;Combining parts...&quot;)

    with open(FILENAME, &quot;wb&quot;) as f:
        for i in parts:
            f.write(i)

async def main():

    print(&quot;Starting download...&quot;)
    start_time = time.time()

    async with aiohttp.ClientSession() as session:

        # Retrieves the file size by making a GET request
        # and extracting the Content-Length header value
        # before downloading the file.
        size = await file_size(session, DOWNLOAD_URL)
        print(f&quot;File size: {size} bytes.&quot;)

        # With the known file size, we can calculate the number
        # of parts to download in parallel based on how big each
        # part should be.
        num_parts = size // PART_SIZE
        print(f&quot;Number of parts: {num_parts}.&quot;)

        # We create a list of tasks to download each part.
        tasks = []
        for part in range(num_parts):
            start, end = part * PART_SIZE, (part + 1) * PART_SIZE
            tasks.append(download_part(session, DOWNLOAD_URL, part, start, end))

        # We use the gather function to download all parts in parallel.
        parts = await asyncio.gather(*tasks)
        print(&quot;All parts downloaded.&quot;)

        # We can now concatenate all parts to get the full file.
        await combine_parts(parts)
        print(&quot;File combined.&quot;)

    end_time = time.time()
    print(f&quot;File downloaded in {end_time - start_time} seconds.&quot;)

asyncio.run(main())

Starting download...
File size: 1048576000 bytes.
Number of parts: 100.
Downloading part 0 from 0 to 10485760...
Downloading part 1 from 10485760 to 20971520...
Downloading part 2 from 20971520 to 31457280...
Downloading part 3 from 31457280 to 41943040...
Downloading part 4 from 41943040 to 52428800...
Downloading part 5 from 52428800 to 62914560...
Downloading part 6 from 62914560 to 73400320...
Downloading part 7 from 73400320 to 83886080...
Downloading part 8 from 83886080 to 94371840...
Downloading part 9 from 94371840 to 104857600...
Downloading part 10 from 104857600 to 115343360...
Downloading part 11 from 115343360 to 125829120...
Downloading part 12 from 125829120 to 136314880...
Downloading part 13 from 136314880 to 146800640...
Downloading part 14 from 146800640 to 157286400...
Downloading part 15 from 157286400 to 167772160...
Downloading part 16 from 167772160 to 178257920...
Downloading part 17 from 178257920 to 188743680...
Downloading part 18 from 188743680 to 199229440...
Downloading part 19 from 199229440 to 209715200...
Downloading part 20 from 209715200 to 220200960...
Downloading part 21 from 220200960 to 230686720...
Downloading part 22 from 230686720 to 241172480...
Downloading part 23 from 241172480 to 251658240...
Downloading part 24 from 251658240 to 262144000...
Downloading part 25 from 262144000 to 272629760...
Downloading part 26 from 272629760 to 283115520...
Downloading part 27 from 283115520 to 293601280...
Downloading part 28 from 293601280 to 304087040...
Downloading part 29 from 304087040 to 314572800...
Downloading part 30 from 314572800 to 325058560...
Downloading part 31 from 325058560 to 335544320...
Downloading part 32 from 335544320 to 346030080...
Downloading part 33 from 346030080 to 356515840...
Downloading part 34 from 356515840 to 367001600...
Downloading part 35 from 367001600 to 377487360...
Downloading part 36 from 377487360 to 387973120...
Downloading part 37 from 387973120 to 398458880...
Downloading part 38 from 398458880 to 408944640...
Downloading part 39 from 408944640 to 419430400...
Downloading part 40 from 419430400 to 429916160...
Downloading part 41 from 429916160 to 440401920...
Downloading part 42 from 440401920 to 450887680...
Downloading part 43 from 450887680 to 461373440...
Downloading part 44 from 461373440 to 471859200...
Downloading part 45 from 471859200 to 482344960...
Downloading part 46 from 482344960 to 492830720...
Downloading part 47 from 492830720 to 503316480...
Downloading part 48 from 503316480 to 513802240...
Downloading part 49 from 513802240 to 524288000...
Downloading part 50 from 524288000 to 534773760...
Downloading part 51 from 534773760 to 545259520...
Downloading part 52 from 545259520 to 555745280...
Downloading part 53 from 555745280 to 566231040...
Downloading part 54 from 566231040 to 576716800...
Downloading part 55 from 576716800 to 587202560...
Downloading part 56 from 587202560 to 597688320...
Downloading part 57 from 597688320 to 608174080...
Downloading part 58 from 608174080 to 618659840...
Downloading part 59 from 618659840 to 629145600...
Downloading part 60 from 629145600 to 639631360...
Downloading part 61 from 639631360 to 650117120...
Downloading part 62 from 650117120 to 660602880...
Downloading part 63 from 660602880 to 671088640...
Downloading part 64 from 671088640 to 681574400...
Downloading part 65 from 681574400 to 692060160...
Downloading part 66 from 692060160 to 702545920...
Downloading part 67 from 702545920 to 713031680...
Downloading part 68 from 713031680 to 723517440...
Downloading part 69 from 723517440 to 734003200...
Downloading part 70 from 734003200 to 744488960...
Downloading part 71 from 744488960 to 754974720...
Downloading part 72 from 754974720 to 765460480...
Downloading part 73 from 765460480 to 775946240...
Downloading part 74 from 775946240 to 786432000...
Downloading part 75 from 786432000 to 796917760...
Downloading part 76 from 796917760 to 807403520...
Downloading part 77 from 807403520 to 817889280...
Downloading part 78 from 817889280 to 828375040...
Downloading part 79 from 828375040 to 838860800...
Downloading part 80 from 838860800 to 849346560...
Downloading part 81 from 849346560 to 859832320...
Downloading part 82 from 859832320 to 870318080...
Downloading part 83 from 870318080 to 880803840...
Downloading part 84 from 880803840 to 891289600...
Downloading part 85 from 891289600 to 901775360...
Downloading part 86 from 901775360 to 912261120...
Downloading part 87 from 912261120 to 922746880...
Downloading part 88 from 922746880 to 933232640...
Downloading part 89 from 933232640 to 943718400...
Downloading part 90 from 943718400 to 954204160...
Downloading part 91 from 954204160 to 964689920...
Downloading part 92 from 964689920 to 975175680...
Downloading part 93 from 975175680 to 985661440...
Downloading part 94 from 985661440 to 996147200...
Downloading part 95 from 996147200 to 1006632960...
Downloading part 96 from 1006632960 to 1017118720...
Downloading part 97 from 1017118720 to 1027604480...
Downloading part 98 from 1027604480 to 1038090240...
Downloading part 99 from 1038090240 to 1048576000...
All parts downloaded.
Combining parts...
File combined.
File downloaded in 66.21038007736206 seconds.

Finished in 66 seconds. Versus downloading it in one shot;

import asyncio
import aiohttp
import time

FILENAME = &quot;1GB.bin&quot;
DOWNLOAD_URL = &quot;https://speed.hetzner.de/1GB.bin&quot;

async def main():

    print(&quot;Starting download.&quot;)
    start_time = time.time()

    async with aiohttp.ClientSession() as session:
        async with session.get(DOWNLOAD_URL) as response:

            print(f&quot;File size: {response.content_length} bytes.&quot;)
            print(&quot;Number of parts: 1&quot;)

            data = await response.read()

    print(&quot;Finished download.&quot;)
    with open(FILENAME, &quot;wb&quot;) as f:
        f.write(data)

    end_time = time.time()
    print(f&quot;File downloaded in {end_time - start_time} seconds.&quot;)

asyncio.run(main())
Starting download.
File size: 1048576000 bytes.
Number of parts: 1
Finished download.
File downloaded in 295.3884241580963 seconds.

A 4.5x performance improvement. Do note that this does not necessarily mean that the more workers you have downloading the chunks the better (100 workers downloading 10MB each). It might be the case that downloading larger chunks per worker might be more effective (10 workers downloading 100MB each), as the destination server can better serve less simultaneous clients. Something you might need to play around with.

huangapple
  • 本文由 发表于 2023年3月12日 18:49:34
  • 转载请务必保留本文链接:https://go.coder-hub.com/75712595.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定