如何使用asyncio并发地从Azure Blob中读取Parquet文件到Pandas DataFrame?

huangapple go评论92阅读模式
英文:

How to read parquet files from Azure Blobs into Pandas DataFrame concurrently with asyncio?

问题

以下是已翻译的内容:

跟进这个问题: https://stackoverflow.com/questions/63351478/how-to-read-parquet-files-from-azure-blobs-into-pandas-dataframe
我想通过使用 asyncio 来并行下载多个文件但我卡在如何使用 Python 3.11 的 TaskGroup 功能来启动我的任务并等待其完成我如何检索已下载流的列表

我的代码到目前为止

import logging
import asyncio
import pandas as pd
from azure.storage.blob.aio import ContainerClient
from io import BytesIO
from itertools import product

class BlobStorageAsync:
    def __init__(self, connection_string, container_name, logging_enable):
        self.connection_string = connection_string
        self.container_name = container_name
        container_client = ContainerClient.from_connection_string(
            conn_str=connection_string,
            container_name=container_name,
            # 此客户端将在 DEBUG 级别记录有关其 HTTP 会话的详细信息
            logging_enable=logging_enable
        )
        self.container_client = container_client

    async def list_blobs_in_container_async(self, name_starts_with):
        blobs_list = []
        async for blob in self.container_client.list_blobs(name_starts_with=name_starts_with):
            blobs_list.append(blob)
        return blobs_list

    async def download_blob_async(self, blob_name):
        blob_client = self.container_client.get_blob_client(blob=blob_name)
        async with blob_client:
            stream = await blob_client.download_blob()
            data = await stream.readall() # 数据以类似字节的对象返回
        # 将数据作为字节返回(内存中的二进制流)
        return BytesIO(data)

    async def download_blobs_async(self, blobs_list):
        tasks = []
        for blob_name in blobs_list:
            task = asyncio.create_task(self.download_blob_async(blob_name))
            tasks.append(task)
        await asyncio.gather(*tasks)

现在我卡住了因为我无法创建和运行任务并等待结果即这不起作用

async def main():
    blobs_list = ...
    connection_string = ...
    container_name = ...
    BSA = BlobStorageAsync(connection_string, container_name, logging_enable)
    result = asyncio.run(BSA.download_blobs_async(blobs_list))
    # 处理结果:例如读取第一个流并打印它
    df = pd.read_parquet(result[0])
    print(df)

if __name__ == '__main__':
    try:
        main()
    except Exception as ex:
        print(ex)
英文:

Following this question: https://stackoverflow.com/questions/63351478/how-to-read-parquet-files-from-azure-blobs-into-pandas-dataframe
I wanted to add concurrency by donwloading multiple files "in parallel" using asyncio.

I'm stuck on how I can use the TaskGroup feature of Python 3.11 to start my task and wait for it to be completed. How can I retrive a list of the downloaded streams?

My code so far:

import logging
import asyncio
import pandas as pd
from azure.storage.blob.aio import ContainerClient
from io import BytesIO
from itertools import product
class BlobStorageAsync:
def __init__(self, connection_string, container_name, logging_enable):
self.connection_string = connection_string
self.container_name = container_name
container_client = ContainerClient.from_connection_string(
conn_str=connection_string,
container_name=container_name,
# This client will log detailed information about its HTTP sessions, at DEBUGlevel
logging_enable=logging_enable
)
self.container_client = container_client
async def list_blobs_in_container_async(self, name_starts_with):
blobs_list = []
async for blob in self.container_client.list_blobs(name_starts_with=name_starts_with):
blobs_list.append(blob)
return blobs_list
async def download_blob_async(self, blob_name):
blob_client = self.container_client.get_blob_client(blob=blob_name)
async with blob_client:
stream = await blob_client.download_blob()
data = await stream.readall() # data returned as bytes-like object
# return data as bytes (in-memory binary stream)
return BytesIO(data)
async def download_blobs_async(self, blobs_list):
tasks = []
for blob_name in blobs_list:
task = asyncio.create_task(self.download_blob_async(blob_name))
tasks.append(task)
await asyncio.gather(*tasks)

Now I am stuck because I am not able to create and run the tasks, and wait for the results, i.e. this does not work:

async def main():
blobs_list = ...
connection_string = ...
container_name = ...
BSA = BlobStorageAsync(connection_string, container_name, logging_enable)
result = asyncio.run(BSA.download_blobs_async(blobs_list))
# process the result: read the first stream and print it, for instance
df = pd.read_parquet(result[0])
print(df)
if __name__ == '__main__':
try:
main()
except Exception as ex:
print(ex)

答案1

得分: 0

以下是您要求的代码部分的中文翻译:

# 如何使用asyncio同时将Azure Blobs中的Parquet文件读取到Pandas DataFrame中?

您可以使用以下代码使用asyncio同时将Azure Blob中的Parquet文件读取到Pandas DataFrame中

**代码**

```python
import asyncio
import pandas as pd
from azure.storage.blob.aio import ContainerClient
from io import BytesIO

class BlobStorageAsync:
    def __init__(self, connection_string, container_name):
        self.connection_string = connection_string
        self.container_name = container_name
        container_client = ContainerClient.from_connection_string(
            conn_str=connection_string,
            container_name=container_name,
        )
        self.container_client = container_client

    async def list_blobs_in_container_async(self, name_starts_with):
        blobs_list = []
        async for blob in self.container_client.list_blobs(name_starts_with=name_starts_with):
            blobs_list.append(blob)
        return blobs_list

    async def download_blob_async(self, blob_name):
        blob_client = self.container_client.get_blob_client(blob=blob_name)
        async with blob_client:
            stream = await blob_client.download_blob()
            data = await stream.readall() # 数据以类似字节的对象返回
        # 将数据作为字节返回(内存中的二进制流)
        return BytesIO(data)

    async def download_blobs_async(self, blobs_list):
        tasks = []
        for blob_name in blobs_list:
            task = asyncio.create_task(self.download_blob_async(blob_name))
            tasks.append(task)
        results = await asyncio.gather(*tasks)
        # 返回已下载流的列表
        return results

async def main():
    blobs_list=["pqt_file4","pqt_file5"]
    connection_string =""
    container_name = "test1"
    BSA = BlobStorageAsync(connection_string, container_name)
    try:
        results = await BSA.download_blobs_async(blobs_list)
        for stream in results:
            df = pd.read_parquet(stream, engine="pyarrow")
            print(df)
    finally:
        await BSA.container_client.close()

if __name__ == '__main__':
    try:
        asyncio.run(main())
    except Exception as ex:
        print(ex)

输出:

      id       name
0  1       Kala
1  2  Arulmozhi
2  6   Rajaraja
id       name
0  1     Aditha
1  2  Arulmozhi
2  3   Kundavai
3  6   Rajaraja

如何使用asyncio并发地从Azure Blob中读取Parquet文件到Pandas DataFrame?


请注意,代码中的某些字符串(如连接字符串和容器名称)需要根据您的Azure Blob存储配置进行设置。
<details>
<summary>英文:</summary>
&gt; How to read parquet files from Azure Blobs into Pandas DataFrame concurrently with asyncio?
You can use the below code to read parquet files from Azure blobs into Pandas DataFrame concurrently with asyncio.
**Code:**
import asyncio
import pandas as pd
from azure.storage.blob.aio import ContainerClient
from io import BytesIO
class BlobStorageAsync:
def __init__(self, connection_string, container_name):
self.connection_string = connection_string
self.container_name = container_name
container_client = ContainerClient.from_connection_string(
conn_str=connection_string,
container_name=container_name,
)
self.container_client = container_client
async def list_blobs_in_container_async(self, name_starts_with):
blobs_list = []
async for blob in self.container_client.list_blobs(name_starts_with=name_starts_with):
blobs_list.append(blob)
return blobs_list
async def download_blob_async(self, blob_name):
blob_client = self.container_client.get_blob_client(blob=blob_name)
async with blob_client:
stream = await blob_client.download_blob()
data = await stream.readall() # data returned as bytes-like object
# return data as bytes (in-memory binary stream)
return BytesIO(data)
async def download_blobs_async(self, blobs_list):
tasks = []
for blob_name in blobs_list:
task = asyncio.create_task(self.download_blob_async(blob_name))
tasks.append(task)
results = await asyncio.gather(*tasks)
# return the list of downloaded streams
return results
async def main():
blobs_list=[&quot;pqt_file4&quot;,&quot;pqt_file5&quot;]
connection_string =&quot;&quot;
container_name = &quot;test1&quot;
BSA = BlobStorageAsync(connection_string, container_name)
try:
results = await BSA.download_blobs_async(blobs_list)
for stream in results:
df = pd.read_parquet(stream, engine=&quot;pyarrow&quot;)
print(df)
finally:
await BSA.container_client.close()
if __name__ == &#39;__main__&#39;:
try:
asyncio.run(main())
except Exception as ex:
print(ex)
**Output:**
id       name
0  1       Kala
1  2  Arulmozhi
2  6   Rajaraja
id       name
0  1     Aditha
1  2  Arulmozhi
2  3   Kundavai
3  6   Rajaraja
![enter image description here](https://i.imgur.com/WeYzt9k.png)
</details>

huangapple
  • 本文由 发表于 2023年6月29日 21:44:33
  • 转载请务必保留本文链接:https://go.coder-hub.com/76581623.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定