并行下载在Python线程中不起作用。

huangapple go评论67阅读模式
英文:

Parallel downloading don't work in python threading

问题

我正在使用 threading 模块构建一个并行下载库。

当我使用我的库时,它可以下载文件而不出现错误,但是视频文件的内容与通过浏览器下载的内容不一样

我使用 threading 进行并行下载,我认为问题出在 threading.Lockfile.seek 上,但我无法弄清楚如何解决问题。

这是我的代码:

import requests
import threading
from tqdm import tqdm

DOWNLOAD_CHUNK_SIZE = 1 << 20 # 1 MiB

class DownloadPart:
    def __init__(self, url, byte_range) -> None:
        self.url = url
        self.byte_range = byte_range

        self.lock = threading.Lock()

    def download(self, file, pbar=None):
        response = requests.get(
            self.url,
            headers={"Range": "bytes={}-{}".format(*self.byte_range)},
            allow_redirects=True,
            stream=True,
        )

        written = 0

        for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE):
            if chunk:
                self.lock.acquire()

                file.seek(self.byte_range[0] + written)
                length = file.write(chunk)
                file.flush()

                written += length
                pbar.update(length)

                self.lock.release()


class Downloader:
    def __init__(self, url, parts=10):
        self.url = url
        self.parts = parts

    def _get_file_size(self) -> int:
        info = requests.head(self.url, allow_redirects=True)

        info.raise_for_status()

        size = info.headers.get("content-length", None)
        assert size
        return int(size)

    def download(self, filename):
        file_size = self._get_file_size()
        # file_size = 1024

        size_per_part = file_size // self.parts

        print(file_size, size_per_part)

        file = open(filename, "wb")

        pbar = tqdm(total=file_size)

        threads = []
        for index in range(self.parts):
            # fix last part have more bytes
            if index + 1 == self.parts:
                byte_range = (size_per_part * index, file_size - 1)
            else:
                byte_range = (size_per_part * index, size_per_part * (index + 1) - 1)

            thread = threading.Thread(
                target=DownloadPart(self.url, byte_range).download, args=(file,), kwargs={"pbar": pbar}
            )
            thread.start()
            threads.append(thread)

        for thread in threads:
            thread.join()

        file.close()


URL = "https://s-delivery38.mxdcontent.net/v/8a5f59673042ed97c402be84ceeb20d9.mp4?s=TfiDzO2oBLrhub_GhToCiQ&e=1676489987&_t=1676476332"

d = Downloader(URL)

d.download("video.mp4")

如何解决我的库中的问题并获取相同的文件数据?感谢任何帮助。

英文:

I'm building a parallel download library using threading module.

When I use my library, it downloads the file without error, but the video file doesn't have the same content as if I downloaded it through the browser.

I use threading for parallel downloading and I think I have a problem with threading.Lock and file.seek, but I can't figure out how to fix the problem.

This is my code:

import requests
import threading
from tqdm import tqdm
DOWNLOAD_CHUNK_SIZE = 1 &lt;&lt; 20 # 1 MiB
class DownloadPart:
def __init__(self, url, byte_range) -&gt; None:
self.url = url
self.byte_range = byte_range
self.lock = threading.Lock()
def download(self, file, pbar=None):
response = requests.get(
self.url,
headers={&quot;Range&quot;: &quot;bytes={}-{}&quot;.format(*self.byte_range)},
allow_redirects=True,
stream=True,
)
written = 0
for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE):
if chunk:
self.lock.acquire()
file.seek(self.byte_range[0] + written)
length = file.write(chunk)
file.flush()
written += length
pbar.update(length)
self.lock.release()
class Downloader:
def __init__(self, url, parts=10):
self.url = url
self.parts = parts
def _get_file_size(self) -&gt; int:
info = requests.head(self.url, allow_redirects=True)
info.raise_for_status()
size = info.headers.get(&quot;content-length&quot;, None)
assert size
return int(size)
def download(self, filename):
file_size = self._get_file_size()
# file_size = 1024
size_per_part = file_size // self.parts
print(file_size, size_per_part)
file = open(filename, &quot;wb&quot;)
pbar = tqdm(total=file_size)
threads = []
for index in range(self.parts):
# fix last part have more bytes
if index + 1 == self.parts:
byte_range = (size_per_part * index, file_size - 1)
else:
byte_range = (size_per_part * index, size_per_part * (index + 1) - 1)
thread = threading.Thread(
target=DownloadPart(self.url, byte_range).download, args=(file,), kwargs={&quot;pbar&quot;: pbar}
)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
file.close()
URL = &quot;https://s-delivery38.mxdcontent.net/v/8a5f59673042ed97c402be84ceeb20d9.mp4?s=TfiDzO2oBLrhub_GhToCiQ&amp;e=1676489987&amp;_t=1676476332&quot;
d = Downloader(URL)
d.download(&quot;video.mp4&quot;)

How can I solve the problem with my library and get the same data in the file? Thank you for any help.

答案1

得分: 0

  1. 我在我的代码中发现了两个问题:

  2. 我在这里找到了第一个问题的解决方案。https://stackoverflow.com/a/25165183/14900791:

Lock() 函数创建了一个全新的锁 - 只有调用该函数的线程才能使用。这就是为什么它不起作用,因为每个线程都在锁定一个完全不同的锁。

  1. Mixdrop (mxdcontent.net) 只允许在同一IP上有两个视频,所以代码只适用于两个部分,其他部分返回状态码 509(我没有检查状态码,所以没有收到错误)。
import requests
import threading
from tqdm import tqdm

DOWNLOAD_CHUNK_SIZE = 1 << 20  # 1 MiB

# 全局锁实例
lock = threading.Lock()

class DownloadPart:
    def __init__(self, url, byte_range) -> None:
        self.url = url
        self.byte_range = byte_range

    def download(self, file, pbar=None):
        response = requests.get(
            self.url,
            headers={"Range": "bytes={}-{}".format(*self.byte_range)},
            allow_redirects=True,
            stream=True,
        )

        written = 0

        for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE):
            if chunk:
                lock.acquire()

                file.seek(self.byte_range[0] + written)
                length = file.write(chunk)
                file.flush()

                written += length
                pbar.update(length)

                lock.release()

class Downloader:
    def __init__(self, url, parts=10):
        self.url = url
        self.parts = parts

    def _get_file_size(self) -> int:
        info = requests.head(self.url, allow_redirects=True)

        info.raise_for_status()

        size = info.headers.get("content-length", None)
        assert size
        return int(size)

    def download(self, filename):
        file_size = self._get_file_size()
        # file_size = 1024

        size_per_part = file_size // self.parts

        print(file_size, size_per_part)

        file = open(filename, "wb")

        pbar = tqdm(total=file_size)

        threads = []
        for index in range(self.parts):
            # 修复最后一部分有更多的字节
            if index + 1 == self.parts:
                byte_range = (size_per_part * index, file_size - 1)
            else:
                byte_range = (size_per_part * index, size_per_part * (index + 1) - 1)

            thread = threading.Thread(
                target=DownloadPart(self.url, byte_range).download, args=(file,), kwargs={"pbar": pbar}
            )
            thread.start()
            threads.append(thread)

        for thread in threads:
            thread.join()

        file.close()

URL = "https://s-delivery38.mxdcontent.net/v/8a5f59673042ed97c402be84ceeb20d9.mp4?s=TfiDzO2oBLrhub_GhToCiQ&amp;e=1676489987&amp;_t=1676476332"

d = Downloader(URL)

d.download("video.mp4")
英文:

There were two problems with my code:

  1. I found a solution to the first problem here. https://stackoverflow.com/a/25165183/14900791:
    > The Lock() function creates an entirely new lock - one that only the
    > thread calling the function can use. That's why it doesn't work,
    > because each thread is locking an entirely different lock.

  2. Mixdrop (mxdcontent.net) only allows two videos in the same ip, so the code only works for two parts, the others got status code 509 (I didn't checked the status code so I didn't get an error).

import requests
import threading
from tqdm import tqdm

DOWNLOAD_CHUNK_SIZE = 1 &lt;&lt; 20 # 1 MiB

# global lock instance
lock = threading.Lock()

class DownloadPart:
    def __init__(self, url, byte_range) -&gt; None:
        self.url = url
        self.byte_range = byte_range

    def download(self, file, pbar=None):
        response = requests.get(
            self.url,
            headers={&quot;Range&quot;: &quot;bytes={}-{}&quot;.format(*self.byte_range)},
            allow_redirects=True,
            stream=True,
        )

        written = 0

        for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE):
            if chunk:
                lock.acquire()

                file.seek(self.byte_range[0] + written)
                length = file.write(chunk)
                file.flush()

                written += length
                pbar.update(length)

                lock.release()


class Downloader:
    def __init__(self, url, parts=10):
        self.url = url
        self.parts = parts

    def _get_file_size(self) -&gt; int:
        info = requests.head(self.url, allow_redirects=True)

        info.raise_for_status()

        size = info.headers.get(&quot;content-length&quot;, None)
        assert size
        return int(size)

    def download(self, filename):
        file_size = self._get_file_size()
        # file_size = 1024

        size_per_part = file_size // self.parts

        print(file_size, size_per_part)

        file = open(filename, &quot;wb&quot;)

        pbar = tqdm(total=file_size)

        threads = []
        for index in range(self.parts):
            # fix last part have more bytes
            if index + 1 == self.parts:
                byte_range = (size_per_part * index, file_size - 1)
            else:
                byte_range = (size_per_part * index, size_per_part * (index + 1) - 1)

            thread = threading.Thread(
                target=DownloadPart(self.url, byte_range).download, args=(file,), kwargs={&quot;pbar&quot;: pbar}
            )
            thread.start()
            threads.append(thread)

        for thread in threads:
            thread.join()

        file.close()


URL = &quot;https://s-delivery38.mxdcontent.net/v/8a5f59673042ed97c402be84ceeb20d9.mp4?s=TfiDzO2oBLrhub_GhToCiQ&amp;e=1676489987&amp;_t=1676476332&quot;

d = Downloader(URL)

d.download(&quot;video.mp4&quot;)

huangapple
  • 本文由 发表于 2023年2月16日 03:25:03
  • 转载请务必保留本文链接:https://go.coder-hub.com/75464574.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定