英文:
Parallel downloading don't work in python threading
问题
我正在使用 threading
模块构建一个并行下载库。
当我使用我的库时,它可以下载文件而不出现错误,但是视频文件的内容与通过浏览器下载的内容不一样。
我使用 threading
进行并行下载,我认为问题出在 threading.Lock
和 file.seek
上,但我无法弄清楚如何解决问题。
这是我的代码:
import requests
import threading
from tqdm import tqdm
DOWNLOAD_CHUNK_SIZE = 1 << 20 # 1 MiB
class DownloadPart:
def __init__(self, url, byte_range) -> None:
self.url = url
self.byte_range = byte_range
self.lock = threading.Lock()
def download(self, file, pbar=None):
response = requests.get(
self.url,
headers={"Range": "bytes={}-{}".format(*self.byte_range)},
allow_redirects=True,
stream=True,
)
written = 0
for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE):
if chunk:
self.lock.acquire()
file.seek(self.byte_range[0] + written)
length = file.write(chunk)
file.flush()
written += length
pbar.update(length)
self.lock.release()
class Downloader:
def __init__(self, url, parts=10):
self.url = url
self.parts = parts
def _get_file_size(self) -> int:
info = requests.head(self.url, allow_redirects=True)
info.raise_for_status()
size = info.headers.get("content-length", None)
assert size
return int(size)
def download(self, filename):
file_size = self._get_file_size()
# file_size = 1024
size_per_part = file_size // self.parts
print(file_size, size_per_part)
file = open(filename, "wb")
pbar = tqdm(total=file_size)
threads = []
for index in range(self.parts):
# fix last part have more bytes
if index + 1 == self.parts:
byte_range = (size_per_part * index, file_size - 1)
else:
byte_range = (size_per_part * index, size_per_part * (index + 1) - 1)
thread = threading.Thread(
target=DownloadPart(self.url, byte_range).download, args=(file,), kwargs={"pbar": pbar}
)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
file.close()
URL = "https://s-delivery38.mxdcontent.net/v/8a5f59673042ed97c402be84ceeb20d9.mp4?s=TfiDzO2oBLrhub_GhToCiQ&e=1676489987&_t=1676476332"
d = Downloader(URL)
d.download("video.mp4")
如何解决我的库中的问题并获取相同的文件数据?感谢任何帮助。
英文:
I'm building a parallel download library using threading
module.
When I use my library, it downloads the file without error, but the video file doesn't have the same content as if I downloaded it through the browser.
I use threading
for parallel downloading and I think I have a problem with threading.Lock
and file.seek
, but I can't figure out how to fix the problem.
This is my code:
import requests
import threading
from tqdm import tqdm
DOWNLOAD_CHUNK_SIZE = 1 << 20 # 1 MiB
class DownloadPart:
def __init__(self, url, byte_range) -> None:
self.url = url
self.byte_range = byte_range
self.lock = threading.Lock()
def download(self, file, pbar=None):
response = requests.get(
self.url,
headers={"Range": "bytes={}-{}".format(*self.byte_range)},
allow_redirects=True,
stream=True,
)
written = 0
for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE):
if chunk:
self.lock.acquire()
file.seek(self.byte_range[0] + written)
length = file.write(chunk)
file.flush()
written += length
pbar.update(length)
self.lock.release()
class Downloader:
def __init__(self, url, parts=10):
self.url = url
self.parts = parts
def _get_file_size(self) -> int:
info = requests.head(self.url, allow_redirects=True)
info.raise_for_status()
size = info.headers.get("content-length", None)
assert size
return int(size)
def download(self, filename):
file_size = self._get_file_size()
# file_size = 1024
size_per_part = file_size // self.parts
print(file_size, size_per_part)
file = open(filename, "wb")
pbar = tqdm(total=file_size)
threads = []
for index in range(self.parts):
# fix last part have more bytes
if index + 1 == self.parts:
byte_range = (size_per_part * index, file_size - 1)
else:
byte_range = (size_per_part * index, size_per_part * (index + 1) - 1)
thread = threading.Thread(
target=DownloadPart(self.url, byte_range).download, args=(file,), kwargs={"pbar": pbar}
)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
file.close()
URL = "https://s-delivery38.mxdcontent.net/v/8a5f59673042ed97c402be84ceeb20d9.mp4?s=TfiDzO2oBLrhub_GhToCiQ&e=1676489987&_t=1676476332"
d = Downloader(URL)
d.download("video.mp4")
How can I solve the problem with my library and get the same data in the file? Thank you for any help.
答案1
得分: 0
-
我在我的代码中发现了两个问题:
-
我在这里找到了第一个问题的解决方案。https://stackoverflow.com/a/25165183/14900791:
Lock()
函数创建了一个全新的锁 - 只有调用该函数的线程才能使用。这就是为什么它不起作用,因为每个线程都在锁定一个完全不同的锁。
- Mixdrop (
mxdcontent.net
) 只允许在同一IP上有两个视频,所以代码只适用于两个部分,其他部分返回状态码509
(我没有检查状态码,所以没有收到错误)。
import requests
import threading
from tqdm import tqdm
DOWNLOAD_CHUNK_SIZE = 1 << 20 # 1 MiB
# 全局锁实例
lock = threading.Lock()
class DownloadPart:
def __init__(self, url, byte_range) -> None:
self.url = url
self.byte_range = byte_range
def download(self, file, pbar=None):
response = requests.get(
self.url,
headers={"Range": "bytes={}-{}".format(*self.byte_range)},
allow_redirects=True,
stream=True,
)
written = 0
for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE):
if chunk:
lock.acquire()
file.seek(self.byte_range[0] + written)
length = file.write(chunk)
file.flush()
written += length
pbar.update(length)
lock.release()
class Downloader:
def __init__(self, url, parts=10):
self.url = url
self.parts = parts
def _get_file_size(self) -> int:
info = requests.head(self.url, allow_redirects=True)
info.raise_for_status()
size = info.headers.get("content-length", None)
assert size
return int(size)
def download(self, filename):
file_size = self._get_file_size()
# file_size = 1024
size_per_part = file_size // self.parts
print(file_size, size_per_part)
file = open(filename, "wb")
pbar = tqdm(total=file_size)
threads = []
for index in range(self.parts):
# 修复最后一部分有更多的字节
if index + 1 == self.parts:
byte_range = (size_per_part * index, file_size - 1)
else:
byte_range = (size_per_part * index, size_per_part * (index + 1) - 1)
thread = threading.Thread(
target=DownloadPart(self.url, byte_range).download, args=(file,), kwargs={"pbar": pbar}
)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
file.close()
URL = "https://s-delivery38.mxdcontent.net/v/8a5f59673042ed97c402be84ceeb20d9.mp4?s=TfiDzO2oBLrhub_GhToCiQ&e=1676489987&_t=1676476332"
d = Downloader(URL)
d.download("video.mp4")
英文:
There were two problems with my code:
-
I found a solution to the first problem here. https://stackoverflow.com/a/25165183/14900791:
> TheLock()
function creates an entirely new lock - one that only the
> thread calling the function can use. That's why it doesn't work,
> because each thread is locking an entirely different lock. -
Mixdrop (
mxdcontent.net
) only allows two videos in the same ip, so the code only works for two parts, the others got status code509
(I didn't checked the status code so I didn't get an error).
import requests
import threading
from tqdm import tqdm
DOWNLOAD_CHUNK_SIZE = 1 << 20 # 1 MiB
# global lock instance
lock = threading.Lock()
class DownloadPart:
def __init__(self, url, byte_range) -> None:
self.url = url
self.byte_range = byte_range
def download(self, file, pbar=None):
response = requests.get(
self.url,
headers={"Range": "bytes={}-{}".format(*self.byte_range)},
allow_redirects=True,
stream=True,
)
written = 0
for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE):
if chunk:
lock.acquire()
file.seek(self.byte_range[0] + written)
length = file.write(chunk)
file.flush()
written += length
pbar.update(length)
lock.release()
class Downloader:
def __init__(self, url, parts=10):
self.url = url
self.parts = parts
def _get_file_size(self) -> int:
info = requests.head(self.url, allow_redirects=True)
info.raise_for_status()
size = info.headers.get("content-length", None)
assert size
return int(size)
def download(self, filename):
file_size = self._get_file_size()
# file_size = 1024
size_per_part = file_size // self.parts
print(file_size, size_per_part)
file = open(filename, "wb")
pbar = tqdm(total=file_size)
threads = []
for index in range(self.parts):
# fix last part have more bytes
if index + 1 == self.parts:
byte_range = (size_per_part * index, file_size - 1)
else:
byte_range = (size_per_part * index, size_per_part * (index + 1) - 1)
thread = threading.Thread(
target=DownloadPart(self.url, byte_range).download, args=(file,), kwargs={"pbar": pbar}
)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
file.close()
URL = "https://s-delivery38.mxdcontent.net/v/8a5f59673042ed97c402be84ceeb20d9.mp4?s=TfiDzO2oBLrhub_GhToCiQ&e=1676489987&_t=1676476332"
d = Downloader(URL)
d.download("video.mp4")
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论