英文:
Is it possible print into one line per process in python?
问题
我目前还没有找到针对以下问题的好解决方案:
def do_work() -> None:
print("开始工作", end="")
# 一些耗时的操作
print("完成")
def main():
with Pool(workers) as p:
for i in range(100):
p.apply_async(func=do_work)
p.close()
p.join()
我想要的输出是:
开始工作...完成
开始工作...完成
开始工作...完成
...
开始工作...完成
当然,实际情况并非如此,因为每个进程在不同的时间输出。问题是,是否可以在不使用额外依赖的情况下实现这一点?
英文:
I haven't found a good solution for the following problem so far:
def do_work() -> None:
print("Start work", end="")
# some long running operation
print("done")
def main():
with Pool(workers) as p:
for i in range(100):
p.apply_async(func=do_work)
p.close()
p.join()
The output I would like to have is
Start work...done
Start work...done
Start work...done
...
Start work...done
Of course this is not the case, since every process outputs at different times. Question is, can this be achieved without additional dependencies?
答案1
得分: 0
代码部分不需要翻译。以下是翻译好的内容:
The only "dependencies" are classes that are part of the standard Python library. Does that work for you?
The "trick" is to have your output written to a string and then finally print that string. To accomplish this you need to use an io.StringIO
instance as your string buffer and temporarily replace sys.stdout
with this buffer:
Prints:
Start work...done
Start work...done
Start work...done
Start work...done
Or you can create a decorator to do this:
If the issue is that you cannot modify the worker function do_work
at all, then create a new worker function:
英文:
The only "dependencies" are classes that are part of the standard Python library. Does that work for you?
The "trick" is to have your output written to a string and then finally print that string. To accomplish this you need to use an io.StringIO
instance as your string buffer and temporarily replace sys.stdout
with this buffer:
def do_work() -> None:
from contextlib import redirect_stdout
from io import StringIO
buffer = StringIO()
with redirect_stdout(buffer):
print("Start work", end="...")
# some long running operation
print("done")
print(buffer.getvalue(), end='', flush=True)
def main():
from multiprocessing import Pool
with Pool(4) as p:
for i in range(4): # Reduced range for demo purposes
p.apply_async(func=do_work)
p.close()
p.join()
if __name__ == '__main__':
main()
Prints:
Start work...done
Start work...done
Start work...done
Start work...done
Or you can create a decorator to do this:
def buffer_output(f):
from functools import wraps
@wraps(f)
def wrapper(*args, **kwargs):
from contextlib import redirect_stdout
from io import StringIO
buffer = StringIO()
with redirect_stdout(buffer):
return_value = f(*args, **kwargs)
print(buffer.getvalue(), end='', flush=True)
return return_value
return wrapper
@buffer_output
def do_work() -> None:
print("Start work", end="...")
# some long running operation
print("done")
def main():
from multiprocessing import Pool
with Pool(4) as p:
for i in range(4): # Reduced range for demo purposes
p.apply_async(func=do_work)
p.close()
p.join()
if __name__ == '__main__':
main()
If the issue is that you cannot modify the worker function do_work
at all, then create a new worker function:
def buffer_output(f):
from functools import wraps
@wraps(f)
def wrapper(*args, **kwargs):
from contextlib import redirect_stdout
from io import StringIO
buffer = StringIO()
with redirect_stdout(buffer):
return_value = f(*args, **kwargs)
print(buffer.getvalue(), end='', flush=True)
return return_value
return wrapper
def do_work() -> None:
print("Start work", end="...")
# some long running operation
print("done")
@buffer_output
def worker():
return do_work()
def main():
from multiprocessing import Pool
with Pool(4) as p:
for i in range(4): # Reduced range for demo purposes
# Call our new worker:
p.apply_async(func=worker)
p.close()
p.join()
if __name__ == '__main__':
main()
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论