英文:
Python Asyncio: how to read from odbc and write to file at the same time?
问题
我正在尝试使用Python和asyncio从ODBC加载数据并同时写入文件,但目前还没有太多成功的运气。
我从ODBC获取了200,000行的批次数据,我希望在其中一个批次完成读取时开始写入文件系统。我希望写入操作在不阻塞下一批次读取的情况下开始,但到目前为止,写入操作是在最后才同时进行的,而不是并发进行的。
下面是您提供的代码片段中的相关部分:
async def writeToCsv_v5(list_tables_source):
# ...
for table in list_tables_source:
# ...
reader = read_arrow_batches_from_odbc(
# ...
)
i = 0
list_task = []
for batch in reader:
# ...
list_task.append(asyncio.create_task(write_to_parquet_v1(df, i)))
# ...
print("finish read")
await asyncio.gather(*list_task)
async def write_to_parquet_v1(df: pd.DataFrame, i: int):
# ...
df.to_parquet(path + table + "_" + str(i) + ".parquet", engine='fastparquet')
# ...
根据您的描述,写入操作只有在所有读取操作完成后才开始。您尝试了使用await asyncio.sleep(0)
的 hack,但读取和写入操作现在都是同步的。
如果您希望实现并发读取和写入,您可以尝试以下更改:
- 将
await asyncio.sleep(0)
删除,以允许并发执行。 - 使用
await asyncio.gather(*list_tasks)
将读取和写入操作都包装在asyncio.gather
中,以便它们可以并发执行。
修改后的代码如下所示:
async def writeToCsv_v6(list_tables_source):
# ...
for table in list_tables_source:
# ...
reader = read_arrow_batches_from_odbc(
# ...
)
i = 0
list_tasks = []
for batch in reader:
# ...
task = asyncio.create_task(write_to_parquet_v2(df, i))
list_tasks.append(task)
print("Time to read " + str(i) + " batch is: " + str(read_stop - read_start))
i += 1
print("finish read")
await asyncio.gather(*list_tasks)
这应该允许读取和写入操作并发执行,而不会阻塞彼此。希望这有助于解决您的问题。
英文:
I am trying to load data from odbc and write to file at the same time using python and ayncio without much luck yet.
I have batchs of 200 000
rows coming from the odbc and I'd like to start writing to the filesystem as soon as one of the batch finishes reading. I'd like the write to start writting without blocking the read of the next batch but so far the writting happens at the very end instead of conccurently while reading.
async def writeToCsv_v5(list_tables_source):
# start=time.time()
connection_string='Driver=Progress OpenEdge 12.2 Driver;HOST=bla;DB=bla;PORT=9000;UID=bla;PASSWORD=bla'
for table in list_tables_source:
list_columns=' ,'.join(f'"{item}"' for item in list_tables_source[table] )
reader = read_arrow_batches_from_odbc(
query='Select '+list_columns+' from PUB."'+table+'"',
connection_string=connection_string,
batch_size=200000,
max_text_size=500,
)
i=0
list_task=[]
for batch in reader:
read_start=time.time()
# Process arrow batches
df = batch.to_pandas()
read_stop=time.time()
list_task.append(asyncio.create_task(write_to_parquet_v1(df,i)))
print("Time to read "+str(1)+"batch is :"+str(read_stop-read_start))
i+=1
print("finish read")
await asyncio.gather(*list_task)
async def write_to_parquet_v1(df: pd.DataFrame,i: int):
print("Start taks : "+str(i))
df=df.applymap(str)
write_start=time.time()
df.to_parquet(path+table+"_"+str(i)+".parquet",engine='fastparquet')
write_stop=time.time()
print("Time to write to file "+str(1)+"batch is :"+str(write_stop-write_start))
asyncio.run(writeToCsv_v5(list_tables_columns))
So the outcome is the following :
Time to read 1batch is :0.1856553554534912
Time to read 1batch is :0.173567533493042
Time to read 1batch is :0.1639997959136963
Time to read 1batch is :0.1889955997467041
Time to read 1batch is :0.14117860794067383
Time to read 1batch is :0.14899969100952148
Time to read 1batch is :0.14300036430358887
Time to read 1batch is :0.1419994831085205
Time to read 1batch is :0.14099979400634766
Time to read 1batch is :0.1399984359741211
Time to read 1batch is :0.1340029239654541
Finish reading:
Start taks : 0
Time to write to file 1batch is :0.440692663192749
Start taks : 1
Time to write to file 1batch is :0.23961353302001953
Start taks : 2
Time to write to file 1batch is :0.2606239318847656
Start taks : 3
Time to write to file 1batch is :0.2226719856262207
Start taks : 4
Time to write to file 1batch is :0.21300196647644043
Start taks : 5
Time to write to file 1batch is :0.22265386581420898
Start taks : 6
Time to write to file 1batch is :0.22600030899047852
Start taks : 7
Time to write to file 1batch is :0.28110337257385254
Start taks : 8
Time to write to file 1batch is :0.21700191497802734
Start taks : 9
Time to write to file 1batch is :0.24160361289978027
Start taks : 10
Time to write to file 1batch is :0.26999974250793457
It show the write part only starts when all the read are finnished.
Edit : I used a hack await asyncio.sleep(0)
so the write process start immediatly my new code is now
async def writeToCsv_v6(list_tables_source):
# start=time.time()
connection_string='secret'
for table in list_tables_source:
list_columns=' ,'.join(f'"{item}"' for item in list_tables_source[table] )
#rows = cursor.execute('Select '+list_columns+' from PUB."'+table+'"')
reader = read_arrow_batches_from_odbc(
query='Select '+list_columns+' from PUB."'+table+'"',
connection_string=connection_string,
batch_size=200000,
max_text_size=500,
)
i=0
list_tasks=[]
for batch in reader:
read_start=time.time()
# Process arrow batches
df = batch.to_pandas()
read_stop=time.time()
task=asyncio.create_task(write_to_parquet_v2(df,i))
await asyncio.sleep(0)
list_tasks.append(task)
print("Time to read "+str(i)+"batch is :"+str(read_stop-read_start))
i+=1
# test
# if i>10:
# break
print("finish read")
await asyncio.gather(*list_tasks)
But now both the read and the write process are synchronous. Which doesn't help me much.
答案1
得分: 4
异步程序不能像你期望的那样同时进行两件事情 - 它不是多线程。
它只是提供了方便的方式让一个程序在等待另一个完成时做一件事情 - 你必须始终告诉代码何时暂停以等待某些事情发生 - 这是程序的其他部分可以运行的时候。
也就是说,创建一个任务并不足以让代码运行起来,它只是安排代码要运行 - 实际上是调用asyncio.gather
才会让异步循环遍历所有已安排的任务(并且聚合本身将在作为参数传递的任务完成时返回,但同时它将遍历所有现有的异步任务:无论是作为它的参数还是不是参数的)。
所以,在每个批次之后将代码写入文件只是将gather
调用移到迭代批次的for循环内部的问题。这将每200,000行写入一次。或者,如你发现的,将一些await asyncio.sleep(0)
散布在代码中间也会使异步代码遍历一次所有已创建的任务。但是仅靠这个不能使事情同时运行:你的read_arrow_batches_from_odbc
和write_to_parquet_v2
都是同步函数,这意味着:当它们在等待I/O时,它们不会将执行委托给异步事件循环 - 它们将简单地运行直到完成:在写入磁盘之前,可以完成一个批次的完全收集,然后ODBC驱动程序才会获取下一个批次。
正如你所看到的,你也可以完全从代码中删除异步 - 这是完成工作的更简单方法 - 否则,它可以被改为以多线程方式工作,而不需要太多更改。但是要使它在异步中正确地并行运行,需要你的ODBC驱动程序和parquet写入程序的异步版本。为了使它“看起来像异步”(也许大多数异步Python代码都是这样做的),可以在其他线程中运行这些任务,只使用异步来协调获取数据的调用。因此,在这里可以做一个“假”的异步 - 但可能结果的代码会比只在主线程中协调获取数据的单线程更复杂。
(你需要编写一个作为包装器的异步生成器,以实现队列通信代码来获取输入数据,例如。包装器只是一个不需要的额外层)
我将简单地将你的代码重写为多线程。 "大"的区别在于每个功能必须写成一个不同的函数,这样Python会在不同的线程中运行每个函数。通过从底层开始编写异步代码,使用工作的异步库等等,可以使获取和写入数据在单个函数中完成,就像你所做的那样:但这并不一定更可读,大多数时候你仍然需要辅助函数。
from threading import Thread
from queue import Queue
_sentinel = object()
def writeToCsv_v6(list_tables_source, data_queue):
# start=time.time()
connection_string='secret'
for table in list_tables_source:
list_columns=','.join(f'"{item}"' for item in list_tables_source[table] )
#rows = cursor.execute('Select '+list_columns+' from PUB."'+table+'"')
reader = read_arrow_batches_from_odbc(
query='Select '+list_columns+' from PUB."'+table+'"',
connection_string=connection_string,
batch_size=200000,
max_text_size=500,
)
i=0
for batch in reader:
read_start=time.time()
# Process arrow batches
df = batch.to_pandas()
read_stop=time.time()
data_queue.put((df, i))
print("Time to read "+str(i)+" batch is :"+str(read_stop-read_start))
i+=1
# test
# if i>10:
# break
data_queue.put(_sentinel)
print("finish read")
#await asyncio.gather(*list_tasks)
def writer_loop(data_queue):
while True:
data = data_queue.get()
if data is _sentinel:
break
df, i = data
print(f"Writing batch {i} to disk")
write_to_parquet_v2(df,i)
def coordinate(list_tables_source):
data_queue = Queue()
reader = Thread(target=writeToCsv_v6, args=(list_tables_source, data_queue))
writer = Thread(target=writer_loop, args=(data_queue,))
reader.start()
writer.start()
reader.join()
writer.join()
print("all done")
(作为一个附带说明:Python 允许任意数量的缩进来作为缩进块 - 但是现在一般来说,4个空格是相当标准的 - 这对于与你共同协作的人来说更加友好)
英文:
Async program does not properly do "two things at the same time" like you are expecting - it is not multi-threading.
It just offers convenient ways of one program doing one thing wile it waits for another to finish - and you have always to tell when the code should pause to "awaiting" for something to happen - that is when other parts of the program can run.
That said, creating a task is not enough to put code in motion, it just schedules the code to be run - it is the call to asyncio.gather
which will actually make the asyncio loop through all schedule tasks (and gather itself will return when the tasks it got as a parameter are done, but meanwhile it will cycle through all existing asyncio tasks: those which are a parameter to it and those who are not.
So, having the code write down to a file after each batch is just a matter of moving that gather
call to inside the for-loop that iterates through batches. That will write each 200.000 rows. Or,as you found out, sprinkling some await asyncio.sleep(0)
in the middle of the code will also make the asyncio code loop through all created tasks once. But that alone can't make things run at the same time: both your read_arrow_batches_from_odbc
and write_to_parquet_v2
are synchronous functions which means: they won't delegate the execution to the asyncio event loop when they are waiting for I/O - they will simply run to completion: one batch fully gathered before writing to disk of that batch can take place, and only then the ODBC driver will fetch the next batch.
As can be seen, you could as well drop asyncio altogether from that code - that is the simpler way to get your job done - otherwise, it can be changed to work in a multi-threaded way, without many changes. But making it work properly in parallel with asyncio would require async versions of your ODBC driver, and for the parquet writer. What can be done to make it "look like asyncio" (and which a lot, maybe the majority of asyncio Python code does), is to run these tasks in other threads, and just use asyncio to coordinate the calls to fetch data. So a "fake" asyncio would be doable there - but possibly the resulting code would be more complex than just having one thread for getting data in, one to get data out, and coordinate things from the main thread.
(You'd need to write an asyncio generator to work as a wrapper to queue communication code to get the input data, for example. The wrapper is just an extra layer that is not needed)
I will re-write your code as simply multi-threaded instead. The "big" difference is each capability has to be written in a different function so that Python runs each function in a different thread. With asyncio code, from the botton-up, working asyncio libs, and so on, it is possible to make the fetching and writing datain a single function like you did: but that is not necessarily even mode readable, and most of time you will need helper functions anyway.
from threading import Thread
from queue import Queue
_sentinel = object()
def writeToCsv_v6(list_tables_source, data_queue):
# start=time.time()
connection_string='secret'
for table in list_tables_source:
list_columns=' ,'.join(f'"{item}"' for item in list_tables_source[table] )
#rows = cursor.execute('Select '+list_columns+' from PUB."'+table+'"')
reader = read_arrow_batches_from_odbc(
query='Select '+list_columns+' from PUB."'+table+'"',
connection_string=connection_string,
batch_size=200000,
max_text_size=500,
)
i=0
for batch in reader:
read_start=time.time()
# Process arrow batches
df = batch.to_pandas()
read_stop=time.time()
data_queue.put((df, i))
print("Time to read "+str(i)+"batch is :"+str(read_stop-read_start))
i+=1
# test
# if i>10:
# break
queue.put(_sentinel)
print("finish read")
#await asyncio.gather(*list_tasks)
def writer_loop(data_queue):
while True:
data = queue.get()
if data is _sentinel:
break
df, i = data
print(f"Writing batch {i} to disk")
write_to_parquet_v2(df,i)
def coordinate(list_tables_source):
data_queue = Queue()
reader = Thread(target=writeToCsv_v6, args=(list_tables_source, data_queue))
writer = Thread(target=loop, args=(data_queue,))
reader.start()
writer.start()
reader.join()
writer.join()
print("all done")
(as a side note: Python does allow any amount of indentation to count as an indented block - but 4 spaces is pretty standard these days - it is nicer to people that will collaborate with you in your code)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论