英文:
In Python, is there a good solution for asynchronously writing to a NetCDF file?
问题
以下是您要翻译的部分:
- Do a number of calculations on multiple threads
- Wait until these calculations are done, then start writing the results to a NetCDF output file on one thread
- Begin the next iteration without waiting for the NetCDF writes to finish
class NetcdfOutput:
def init(self, filename, lon, lat):
self.__filename = filename
self.__lon = lon
self.__lat = lat
self.__nc = netCDF4.Dataset(self.__filename + ".nc", "w")
self.__group_main = self.__nc.createGroup("Main")
self.__group_main_var_spd = self.__group_main.createVariable("spd", "f4", ("time", "latitude", "longitude"), zlib=True,
complevel=2, fill_value=netCDF4.default_fillvals["f4"])
self.__group_main_var_dir = self.__group_main.createVariable("dir", "f4", ("time", "latitude", "longitude"), zlib=True,
complevel=2, fill_value=netCDF4.default_fillvals["f4"])
def append(self, idx, uvel, vvel):
self.__group_main_var_spd[idx, :, :] = magnitude_from_uv(uvel, vvel)
self.__group_main_var_dir[idx, :, :] = dir_met_to_and_from_math(direction_from_uv(uvel, vvel))
So, imagine we have an instance of this class called wind. To write to the NetCDF file, I just need to call wind.append. I want to do this in such a way that it doesn't block the next iteration of calculations from beginning, though.
The NetCDF writes take ~1 second and the calculations take ~2.5-3 seconds, so there is no concern of having multiple concurrent writes using the high-level method I called out above. Reducing runtime is my top priority and Python 3.7 is my target version.
I was originally planning to use multiprocessing.Pool for this, and pool.map does work fine for my computations, but when I attempt to handle the writes via pool.map_async it fails because NetcdfOutput includes a NetCDF4.Dataset object (self.__nc) and Datasets are not picklable.
As a workaround, I can see reworking the class to take a filename instead of a netCDF4.Dataset object and just opening and closing the NetCDF file with every loop iteration, but when speed is the goal opening and closing the same file hundreds of times just doesn't feel like the right answer. I admittedly haven't benchmarked this, though.
英文:
My goal is to accomplish the following:
- Do a number of calculations on multiple threads
- Wait until these calculations are done, then start writing the results to a NetCDF output file on one thread
- Begin the next iteration without waiting for the NetCDF writes to finish
Here is a simplified version of how my NetCDF output class is set up:
class NetcdfOutput:
def __init__(self, filename, lon, lat):
self.__filename = filename
self.__lon = lon
self.__lat = lat
self.__nc = netCDF4.Dataset(self.__filename + ".nc", "w")
self.__group_main = self.__nc.createGroup("Main")
self.__group_main_var_spd = self.__group_main.createVariable("spd", "f4", ("time", "latitude", "longitude"), zlib=True,
complevel=2, fill_value=netCDF4.default_fillvals["f4"])
self.__group_main_var_dir = self.__group_main.createVariable("dir", "f4", ("time", "latitude", "longitude"), zlib=True,
complevel=2, fill_value=netCDF4.default_fillvals["f4"])
def append(self, idx, uvel, vvel):
self.__group_main_var_spd[idx, :, :] = magnitude_from_uv(uvel, vvel)
self.__group_main_var_dir[idx, :, :] = dir_met_to_and_from_math(direction_from_uv(uvel, vvel))
So, imagine we have an instance of this class called wind. To write to the NetCDF file, I just need to call wind.append. I want to do this in such a way that it doesn't block the next iteration of calculations from beginning, though.
The NetCDF writes take ~1 second and the calculations take ~2.5-3 seconds, so there is no concern of having multiple concurrent writes using the high-level method I called out above. Reducing runtime is my top priority and Python 3.7 is my target version.
I was originally planning to use multiprocessing.Pool for this, and pool.map does work fine for my computations, but when I attempt to handle the writes via pool.map_async it fails because NetcdfOutput includes a NetCDF4.Dataset object (self.__nc) and Datasets are not picklable.
As a workaround, I can see reworking the class to take a filename instead of a netCDF4.Dataset object and just opening and closing the NetCDF file with every loop iteration, but when speed is the goal opening and closing the same file hundreds of times just doesn't feel like the right answer. I admittedly haven't benchmarked this, though.
答案1
得分: 3
The threading module ended up being the path of least resistance. My call to write to the NetCDF file looks like this:
threading.Thread(target=wind.append, args=(arg1, arg2, ...)).start()
There was no need to change around my NetcdfOutput class or anything else in my code. I just imported the module and added this line.
It only takes ~.0008 seconds for me to run this command, which is fabulous in comparison to the original write time (~1 second). The time it takes to run this command is all that really matters too, since wind.append will always finish before the calculations for the next iteration. So, even if there are usually performance downsides to using threading (no earthly idea), they don't impact me in this case.
英文:
The threading module ended up being the path of least resistance. My call to write to the NetCDF file looks like this:
threading.Thread(target=wind.append, args=(arg1, arg2, ...)).start()
There was no need to change around my NetcdfOutput class or anything else in my code. I just imported the module and added this line.
It only takes ~.0008 seconds for me to run this command, which is fabulous in comparison to the original write time (~1 second). The time it takes to run this command is all that really matters too, since wind.append will always finish before the calculations for the next iteration. So, even if there are usually performance downsides to using threading (no earthly idea), they don't impact me in this case.
答案2
得分: 0
不需要在进程之间传递 netCDF4.Dataset
,因为你只需要进程池来执行繁重的计算部分,而不是文件写入部分,因此可以避免 pickle 问题。
这是一个简单的示例,其中包括 1 秒的繁重计算和 0.5 秒的文件写入操作。繁重计算部分被提交给进程池,而(串行的)文件写入操作在主线程中完成(尽管它们可以被卸载到不同的线程中),与繁重计算并行进行:
from concurrent.futures import ProcessPoolExecutor, Future
from time import sleep, perf_counter
import netCDF4
N = 10
def heavy_computation(i: int) -> int:
sleep(1.0)
return i
def long_write(value: int, variable):
sleep(0.5)
variable[value] = value
def main():
with netCDF4.Dataset("test.nc", "w") as ds, ProcessPoolExecutor() as pool:
main = ds.createGroup("main")
main.createDimension("value", N)
dir = main.createVariable("dir", "f4", ("value",))
start = perf_counter()
futures: list[Future[int]] = []
for i in range(N):
future = pool.submit(heavy_computation, i)
futures.append(future)
for future in futures:
result = future.result()
long_write(result, dir)
elapsed = perf_counter() - start
print(dir[:])
print(f"Elapsed: {elapsed:.2f}s")
if __name__ == "__main__":
main()
对于更复杂的设置,你应该使用队列来同步和分发进程和线程之间的工作。
英文:
You don't need to pass the netCDF4.Dataset
between processes since you only need the process pool for the heavy computation, not the file writing part, thus you can avoid the pickle issue.
Here is a simple example with a heavy computation of 1 second and a file writing operation of 0.5 seconds. The heavy computations are feed to a process pool and the (serial) file writing operations are done in the main thread (though they can be offloaded to a different thread), concurrently to the heavy computations:
from concurrent.futures import ProcessPoolExecutor, Future
from time import sleep, perf_counter
import netCDF4
N = 10
def heavy_computation(i: int) -> int:
sleep(1.0)
return i
def long_write(value: int, variable):
sleep(0.5)
variable[value] = value
def main():
with netCDF4.Dataset("test.nc", "w") as ds, ProcessPoolExecutor() as pool:
main = ds.createGroup("main")
main.createDimension("value", N)
dir = main.createVariable("dir", "f4", ("value",))
start = perf_counter()
futures: list[Future[int]] = []
for i in range(N):
future = pool.submit(heavy_computation, i)
futures.append(future)
for future in futures:
result = future.result()
long_write(result, dir)
elapsed = perf_counter() - start
print(dir[:])
print(f"Elapsed: {elapsed:.2f}s")
if __name__ == "__main__":
main()
For more complex setups you should use a queue to synchronise and distribute the work between processes and threads.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论