英文:
How to read parquet files from Azure Blobs into Pandas DataFrame concurrently with asyncio?
问题
以下是已翻译的内容:
跟进这个问题: https://stackoverflow.com/questions/63351478/how-to-read-parquet-files-from-azure-blobs-into-pandas-dataframe
我想通过使用 asyncio 来并行下载多个文件,但我卡在如何使用 Python 3.11 的 TaskGroup 功能来启动我的任务并等待其完成。我如何检索已下载流的列表?
我的代码到目前为止:
import logging
import asyncio
import pandas as pd
from azure.storage.blob.aio import ContainerClient
from io import BytesIO
from itertools import product
class BlobStorageAsync:
def __init__(self, connection_string, container_name, logging_enable):
self.connection_string = connection_string
self.container_name = container_name
container_client = ContainerClient.from_connection_string(
conn_str=connection_string,
container_name=container_name,
# 此客户端将在 DEBUG 级别记录有关其 HTTP 会话的详细信息
logging_enable=logging_enable
)
self.container_client = container_client
async def list_blobs_in_container_async(self, name_starts_with):
blobs_list = []
async for blob in self.container_client.list_blobs(name_starts_with=name_starts_with):
blobs_list.append(blob)
return blobs_list
async def download_blob_async(self, blob_name):
blob_client = self.container_client.get_blob_client(blob=blob_name)
async with blob_client:
stream = await blob_client.download_blob()
data = await stream.readall() # 数据以类似字节的对象返回
# 将数据作为字节返回(内存中的二进制流)
return BytesIO(data)
async def download_blobs_async(self, blobs_list):
tasks = []
for blob_name in blobs_list:
task = asyncio.create_task(self.download_blob_async(blob_name))
tasks.append(task)
await asyncio.gather(*tasks)
现在我卡住了,因为我无法创建和运行任务,并等待结果,即这不起作用:
async def main():
blobs_list = ...
connection_string = ...
container_name = ...
BSA = BlobStorageAsync(connection_string, container_name, logging_enable)
result = asyncio.run(BSA.download_blobs_async(blobs_list))
# 处理结果:例如读取第一个流并打印它
df = pd.read_parquet(result[0])
print(df)
if __name__ == '__main__':
try:
main()
except Exception as ex:
print(ex)
英文:
Following this question: https://stackoverflow.com/questions/63351478/how-to-read-parquet-files-from-azure-blobs-into-pandas-dataframe
I wanted to add concurrency by donwloading multiple files "in parallel" using asyncio.
I'm stuck on how I can use the TaskGroup feature of Python 3.11 to start my task and wait for it to be completed. How can I retrive a list of the downloaded streams?
My code so far:
import logging
import asyncio
import pandas as pd
from azure.storage.blob.aio import ContainerClient
from io import BytesIO
from itertools import product
class BlobStorageAsync:
def __init__(self, connection_string, container_name, logging_enable):
self.connection_string = connection_string
self.container_name = container_name
container_client = ContainerClient.from_connection_string(
conn_str=connection_string,
container_name=container_name,
# This client will log detailed information about its HTTP sessions, at DEBUGlevel
logging_enable=logging_enable
)
self.container_client = container_client
async def list_blobs_in_container_async(self, name_starts_with):
blobs_list = []
async for blob in self.container_client.list_blobs(name_starts_with=name_starts_with):
blobs_list.append(blob)
return blobs_list
async def download_blob_async(self, blob_name):
blob_client = self.container_client.get_blob_client(blob=blob_name)
async with blob_client:
stream = await blob_client.download_blob()
data = await stream.readall() # data returned as bytes-like object
# return data as bytes (in-memory binary stream)
return BytesIO(data)
async def download_blobs_async(self, blobs_list):
tasks = []
for blob_name in blobs_list:
task = asyncio.create_task(self.download_blob_async(blob_name))
tasks.append(task)
await asyncio.gather(*tasks)
Now I am stuck because I am not able to create and run the tasks, and wait for the results, i.e. this does not work:
async def main():
blobs_list = ...
connection_string = ...
container_name = ...
BSA = BlobStorageAsync(connection_string, container_name, logging_enable)
result = asyncio.run(BSA.download_blobs_async(blobs_list))
# process the result: read the first stream and print it, for instance
df = pd.read_parquet(result[0])
print(df)
if __name__ == '__main__':
try:
main()
except Exception as ex:
print(ex)
答案1
得分: 0
以下是您要求的代码部分的中文翻译:
# 如何使用asyncio同时将Azure Blobs中的Parquet文件读取到Pandas DataFrame中?
您可以使用以下代码使用asyncio同时将Azure Blob中的Parquet文件读取到Pandas DataFrame中。
**代码:**
```python
import asyncio
import pandas as pd
from azure.storage.blob.aio import ContainerClient
from io import BytesIO
class BlobStorageAsync:
def __init__(self, connection_string, container_name):
self.connection_string = connection_string
self.container_name = container_name
container_client = ContainerClient.from_connection_string(
conn_str=connection_string,
container_name=container_name,
)
self.container_client = container_client
async def list_blobs_in_container_async(self, name_starts_with):
blobs_list = []
async for blob in self.container_client.list_blobs(name_starts_with=name_starts_with):
blobs_list.append(blob)
return blobs_list
async def download_blob_async(self, blob_name):
blob_client = self.container_client.get_blob_client(blob=blob_name)
async with blob_client:
stream = await blob_client.download_blob()
data = await stream.readall() # 数据以类似字节的对象返回
# 将数据作为字节返回(内存中的二进制流)
return BytesIO(data)
async def download_blobs_async(self, blobs_list):
tasks = []
for blob_name in blobs_list:
task = asyncio.create_task(self.download_blob_async(blob_name))
tasks.append(task)
results = await asyncio.gather(*tasks)
# 返回已下载流的列表
return results
async def main():
blobs_list=["pqt_file4","pqt_file5"]
connection_string =""
container_name = "test1"
BSA = BlobStorageAsync(connection_string, container_name)
try:
results = await BSA.download_blobs_async(blobs_list)
for stream in results:
df = pd.read_parquet(stream, engine="pyarrow")
print(df)
finally:
await BSA.container_client.close()
if __name__ == '__main__':
try:
asyncio.run(main())
except Exception as ex:
print(ex)
输出:
id name
0 1 Kala
1 2 Arulmozhi
2 6 Rajaraja
id name
0 1 Aditha
1 2 Arulmozhi
2 3 Kundavai
3 6 Rajaraja
请注意,代码中的某些字符串(如连接字符串和容器名称)需要根据您的Azure Blob存储配置进行设置。
<details>
<summary>英文:</summary>
> How to read parquet files from Azure Blobs into Pandas DataFrame concurrently with asyncio?
You can use the below code to read parquet files from Azure blobs into Pandas DataFrame concurrently with asyncio.
**Code:**
import asyncio
import pandas as pd
from azure.storage.blob.aio import ContainerClient
from io import BytesIO
class BlobStorageAsync:
def __init__(self, connection_string, container_name):
self.connection_string = connection_string
self.container_name = container_name
container_client = ContainerClient.from_connection_string(
conn_str=connection_string,
container_name=container_name,
)
self.container_client = container_client
async def list_blobs_in_container_async(self, name_starts_with):
blobs_list = []
async for blob in self.container_client.list_blobs(name_starts_with=name_starts_with):
blobs_list.append(blob)
return blobs_list
async def download_blob_async(self, blob_name):
blob_client = self.container_client.get_blob_client(blob=blob_name)
async with blob_client:
stream = await blob_client.download_blob()
data = await stream.readall() # data returned as bytes-like object
# return data as bytes (in-memory binary stream)
return BytesIO(data)
async def download_blobs_async(self, blobs_list):
tasks = []
for blob_name in blobs_list:
task = asyncio.create_task(self.download_blob_async(blob_name))
tasks.append(task)
results = await asyncio.gather(*tasks)
# return the list of downloaded streams
return results
async def main():
blobs_list=["pqt_file4","pqt_file5"]
connection_string =""
container_name = "test1"
BSA = BlobStorageAsync(connection_string, container_name)
try:
results = await BSA.download_blobs_async(blobs_list)
for stream in results:
df = pd.read_parquet(stream, engine="pyarrow")
print(df)
finally:
await BSA.container_client.close()
if __name__ == '__main__':
try:
asyncio.run(main())
except Exception as ex:
print(ex)
**Output:**
id name
0 1 Kala
1 2 Arulmozhi
2 6 Rajaraja
id name
0 1 Aditha
1 2 Arulmozhi
2 3 Kundavai
3 6 Rajaraja
![enter image description here](https://i.imgur.com/WeYzt9k.png)
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论