在Python中,是否有一个好的解决方案用于异步写入NetCDF文件?

huangapple go评论69阅读模式
英文:

In Python, is there a good solution for asynchronously writing to a NetCDF file?

问题

以下是您要翻译的部分:

  • Do a number of calculations on multiple threads
  • Wait until these calculations are done, then start writing the results to a NetCDF output file on one thread
  • Begin the next iteration without waiting for the NetCDF writes to finish

class NetcdfOutput:
def init(self, filename, lon, lat):
self.__filename = filename
self.__lon = lon
self.__lat = lat
self.__nc = netCDF4.Dataset(self.__filename + ".nc", "w")
self.__group_main = self.__nc.createGroup("Main")
self.__group_main_var_spd = self.__group_main.createVariable("spd", "f4", ("time", "latitude", "longitude"), zlib=True,
complevel=2, fill_value=netCDF4.default_fillvals["f4"])
self.__group_main_var_dir = self.__group_main.createVariable("dir", "f4", ("time", "latitude", "longitude"), zlib=True,
complevel=2, fill_value=netCDF4.default_fillvals["f4"])

def append(self, idx, uvel, vvel):
    self.__group_main_var_spd[idx, :, :] = magnitude_from_uv(uvel, vvel)
    self.__group_main_var_dir[idx, :, :] = dir_met_to_and_from_math(direction_from_uv(uvel, vvel))

So, imagine we have an instance of this class called wind. To write to the NetCDF file, I just need to call wind.append. I want to do this in such a way that it doesn't block the next iteration of calculations from beginning, though.

The NetCDF writes take ~1 second and the calculations take ~2.5-3 seconds, so there is no concern of having multiple concurrent writes using the high-level method I called out above. Reducing runtime is my top priority and Python 3.7 is my target version.

I was originally planning to use multiprocessing.Pool for this, and pool.map does work fine for my computations, but when I attempt to handle the writes via pool.map_async it fails because NetcdfOutput includes a NetCDF4.Dataset object (self.__nc) and Datasets are not picklable.

As a workaround, I can see reworking the class to take a filename instead of a netCDF4.Dataset object and just opening and closing the NetCDF file with every loop iteration, but when speed is the goal opening and closing the same file hundreds of times just doesn't feel like the right answer. I admittedly haven't benchmarked this, though.

英文:

My goal is to accomplish the following:

  • Do a number of calculations on multiple threads
  • Wait until these calculations are done, then start writing the results to a NetCDF output file on one thread
  • Begin the next iteration without waiting for the NetCDF writes to finish

Here is a simplified version of how my NetCDF output class is set up:

class NetcdfOutput:
    def __init__(self, filename, lon, lat):
        self.__filename = filename
        self.__lon = lon
        self.__lat = lat
        self.__nc = netCDF4.Dataset(self.__filename + ".nc", "w")
        self.__group_main = self.__nc.createGroup("Main")
        self.__group_main_var_spd = self.__group_main.createVariable("spd", "f4", ("time", "latitude", "longitude"), zlib=True,
                                                                     complevel=2, fill_value=netCDF4.default_fillvals["f4"])
        self.__group_main_var_dir = self.__group_main.createVariable("dir", "f4", ("time", "latitude", "longitude"), zlib=True,
                                                                     complevel=2, fill_value=netCDF4.default_fillvals["f4"])

    def append(self, idx, uvel, vvel):
        self.__group_main_var_spd[idx, :, :] = magnitude_from_uv(uvel, vvel)
        self.__group_main_var_dir[idx, :, :] = dir_met_to_and_from_math(direction_from_uv(uvel, vvel))

So, imagine we have an instance of this class called wind. To write to the NetCDF file, I just need to call wind.append. I want to do this in such a way that it doesn't block the next iteration of calculations from beginning, though.

The NetCDF writes take ~1 second and the calculations take ~2.5-3 seconds, so there is no concern of having multiple concurrent writes using the high-level method I called out above. Reducing runtime is my top priority and Python 3.7 is my target version.

I was originally planning to use multiprocessing.Pool for this, and pool.map does work fine for my computations, but when I attempt to handle the writes via pool.map_async it fails because NetcdfOutput includes a NetCDF4.Dataset object (self.__nc) and Datasets are not picklable.

As a workaround, I can see reworking the class to take a filename instead of a netCDF4.Dataset object and just opening and closing the NetCDF file with every loop iteration, but when speed is the goal opening and closing the same file hundreds of times just doesn't feel like the right answer. I admittedly haven't benchmarked this, though.

答案1

得分: 3

The threading module ended up being the path of least resistance. My call to write to the NetCDF file looks like this:

threading.Thread(target=wind.append, args=(arg1, arg2, ...)).start()

There was no need to change around my NetcdfOutput class or anything else in my code. I just imported the module and added this line.

It only takes ~.0008 seconds for me to run this command, which is fabulous in comparison to the original write time (~1 second). The time it takes to run this command is all that really matters too, since wind.append will always finish before the calculations for the next iteration. So, even if there are usually performance downsides to using threading (no earthly idea), they don't impact me in this case.

英文:

The threading module ended up being the path of least resistance. My call to write to the NetCDF file looks like this:

threading.Thread(target=wind.append, args=(arg1, arg2, ...)).start()

There was no need to change around my NetcdfOutput class or anything else in my code. I just imported the module and added this line.

It only takes ~.0008 seconds for me to run this command, which is fabulous in comparison to the original write time (~1 second). The time it takes to run this command is all that really matters too, since wind.append will always finish before the calculations for the next iteration. So, even if there are usually performance downsides to using threading (no earthly idea), they don't impact me in this case.

答案2

得分: 0

不需要在进程之间传递 netCDF4.Dataset,因为你只需要进程池来执行繁重的计算部分,而不是文件写入部分,因此可以避免 pickle 问题。

这是一个简单的示例,其中包括 1 秒的繁重计算和 0.5 秒的文件写入操作。繁重计算部分被提交给进程池,而(串行的)文件写入操作在主线程中完成(尽管它们可以被卸载到不同的线程中),与繁重计算并行进行:

from concurrent.futures import ProcessPoolExecutor, Future
from time import sleep, perf_counter
import netCDF4

N = 10

def heavy_computation(i: int) -> int:
    sleep(1.0)
    return i

def long_write(value: int, variable):
    sleep(0.5)
    variable[value] = value

def main():
    with netCDF4.Dataset("test.nc", "w") as ds, ProcessPoolExecutor() as pool:
        main = ds.createGroup("main")
        main.createDimension("value", N)
        dir = main.createVariable("dir", "f4", ("value",))

        start = perf_counter()

        futures: list[Future[int]] = []
        for i in range(N):
            future = pool.submit(heavy_computation, i)
            futures.append(future)

        for future in futures:
            result = future.result()
            long_write(result, dir)

        elapsed = perf_counter() - start

        print(dir[:])
        print(f"Elapsed: {elapsed:.2f}s")

if __name__ == "__main__":
    main()

对于更复杂的设置,你应该使用队列来同步和分发进程和线程之间的工作。

英文:

You don't need to pass the netCDF4.Dataset between processes since you only need the process pool for the heavy computation, not the file writing part, thus you can avoid the pickle issue.

Here is a simple example with a heavy computation of 1 second and a file writing operation of 0.5 seconds. The heavy computations are feed to a process pool and the (serial) file writing operations are done in the main thread (though they can be offloaded to a different thread), concurrently to the heavy computations:

from concurrent.futures import ProcessPoolExecutor, Future
from time import sleep, perf_counter
import netCDF4

N = 10


def heavy_computation(i: int) -> int:
    sleep(1.0)
    return i


def long_write(value: int, variable):
    sleep(0.5)
    variable[value] = value


def main():
    with netCDF4.Dataset("test.nc", "w") as ds, ProcessPoolExecutor() as pool:
        main = ds.createGroup("main")
        main.createDimension("value", N)
        dir = main.createVariable("dir", "f4", ("value",))

        start = perf_counter()

        futures: list[Future[int]] = []
        for i in range(N):
            future = pool.submit(heavy_computation, i)
            futures.append(future)

        for future in futures:
            result = future.result()
            long_write(result, dir)

        elapsed = perf_counter() - start

        print(dir[:])
        print(f"Elapsed: {elapsed:.2f}s")


if __name__ == "__main__":
    main()

For more complex setups you should use a queue to synchronise and distribute the work between processes and threads.

huangapple
  • 本文由 发表于 2023年3月3日 22:28:55
  • 转载请务必保留本文链接:https://go.coder-hub.com/75628327.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定