英文:
pytest unit test for "logging+multi-process+QueueHandler"
问题
import logging
import multiprocessing
import platform
import queue
from logging import handlers
from pathlib import Path
from typing import Callable
LOGGING_FORMAT = (
"%(asctime)s %(levelname)-8s [%(name)s] %(filename)s - %(funcName)s() : %(message)s"
)
BASE_LOG_NAME = "record"
SUFFIX_LOG_NAME = "%Y-%m-%d.log"
class CustomFormatter(logging.Formatter):
"""
Custom log formatter class that inherits from `logging.Formatter`.
It formats the log messages based on their log levels and applies color settings.
"""
def __init__(self, custom_format: str):
super().__init__()
if platform.system() == "Windows":
from colorama import init
init()
grey = "\x1b[38;20m"
red = "\x1b[31;20m"
bold_red = "\x1b[31;1m"
green = "\x1b[32;20m"
yellow = "\x1b[33;20m"
reset = "\x1b[0m"
self.formats = {
logging.DEBUG: grey + custom_format + reset,
logging.INFO: green + custom_format + reset,
logging.WARNING: yellow + custom_format + reset,
logging.ERROR: red + custom_format + reset,
logging.CRITICAL: bold_red + custom_format + reset,
}
def format(self, record):
log_fmt = self.formats.get(record.levelno)
formatter = logging.Formatter(log_fmt)
return formatter.format(record)
class OutputLogger:
"""
Serves as a pseudo file-like stream object that redirects written content to a logger instance.
It overrides the `write` method to append the written messages to an internal buffer (`linebuf`).
When a message ends with a newline character, it logs the buffered messages as a log record.
"""
def __init__(self, logger_name: str, logging_level: int = logging.INFO):
self.logger = logging.getLogger(logger_name)
self.logging_level = logging_level
self.linebuf = ""
def write(self, msg: str):
self.linebuf += msg
if msg.endswith("\n"):
self.logger.log(
self.logging_level,
self.linebuf.rstrip(),
stack_info=False,
stacklevel=2,
)
self.linebuf = ""
def flush(self):
pass
class MultiProcessLogger:
"""
Implements a custom logger designed for multi-process environments.
It is based on the 'Logging Cookbook - Logging to a single file from multiple processes' example
in the Python documentation. It utilizes a multiprocessing queue and a listener process to
enable logging across multiple processes. The class provides functionality for logging records
to a file and printing logs to the console.
"""
def __init__(
self,
log_path: str | Path | None = None,
level_log: int = logging.DEBUG,
format_log: str = LOGGING_FORMAT,
base_log_name: str = BASE_LOG_NAME,
suffix_log_name: str = SUFFIX_LOG_NAME,
rotate_period: tuple[str, int] = ("midnight", 1),
level_console: int = logging.INFO,
format_console: str = LOGGING_FORMAT,
):
self.log_path = log_path
self.level_log = level_log
self.format_log = format_log
self.base_log_name = base_log_name
self.suffix_log_name = suffix_log_name
self.rotate_period = rotate_period
self.level_console = level_console
self.format_console = format_console
self.queue = multiprocessing.Manager().Queue(-1)
self.listener = multiprocessing.Process(
target=self.listener_process, args=(self.queue, self.listener_configurer)
)
def listener_configurer(self):
root = logging.getLogger()
if self.log_path is not None:
formatter_log = logging.Formatter(self.format_log)
handler_log = handlers.TimedRotatingFileHandler(
(Path(self.log_path) / self.base_log_name).resolve(),
when=self.rotate_period[0],
interval=self.rotate_period[1],
encoding="utf-8",
)
handler_log.setFormatter(formatter_log)
handler_log.suffix = self.suffix_log_name
root.addHandler(handler_log)
handler_console = logging.StreamHandler()
handler_console.setLevel(self.level_console)
handler_console.setFormatter(CustomFormatter(self.format_console))
root.addHandler(handler_console)
def listener_process(self, _queue: queue.Queue, configurer: Callable):
configurer()
while True:
try:
record = _queue.get()
if record is None:
break
logger = logging.getLogger(record.name)
logger.handle(record)
except Exception:
import sys
import traceback
print(
"Oops! An issue occurred during the logging process:",
file=sys.stderr,
)
traceback.print_exc(file=sys.stderr)
def start(self):
self.listener.start()
def join(self):
self.queue.put_nowait(None)
self.listener.join()
def worker_configurer(_queue: queue.Queue, worker_level: int = logging.DEBUG):
handler = handlers.QueueHandler(_queue)
root = logging.getLogger()
root.addHandler(handler)
root.setLevel(worker_level)
def test_console_info(capsys):
mpl = MultiProcessLogger()
mpl.start()
worker_configurer(mpl.queue)
logger = logging.getLogger(LOGGER_NAME)
logger.info(MESSAGE)
captured = capsys.readouterr()
assert (
f"INFO {Path(__file__).name} - test_console_info() : {MESSAGE}"
in captured.err
)
英文:
Question
I don't know how to write the unit test for the codes below:
"""Some implements of `logging` enhancement."""
import logging
import multiprocessing
import platform
import queue
from logging import handlers
from pathlib import Path
from typing import Callable
LOGGING_FORMAT = (
"%(asctime)s %(levelname)-8s [%(name)s] %(filename)s - %(funcName)s() : %(message)s"
)
BASE_LOG_NAME = "record"
SUFFIX_LOG_NAME = "%Y-%m-%d.log"
class CustomFormatter(logging.Formatter):
"""
Custom log formatter class that inherits from `logging.Formatter`.
It formats the log messages based on their log levels and applies color settings.
"""
def __init__(self, costom_format: str):
super().__init__()
if platform.system() == "Windows":
from colorama import init # type: ignore
init()
# https://talyian.github.io/ansicolors/
# black = "\x1b[30;20m"
red = "\x1b[31;20m"
bold_red = "\x1b[31;1m"
green = "\x1b[32;20m"
yellow = "\x1b[33;20m"
# blue = "\x1b[34;20m"
# magenta = "\x1b[35;20m"
# cyan = "\x1b[36;20m"
# white = "\x1b[37;20m"
grey = "\x1b[38;20m"
# \x1b[38;2;r;g;bm - foreground
# \x1b[48;2;r;g;bm - background
reset = "\x1b[0m"
self.formats = {
logging.DEBUG: grey + costom_format + reset,
logging.INFO: green + costom_format + reset,
logging.WARNING: yellow + costom_format + reset,
logging.ERROR: red + costom_format + reset,
logging.CRITICAL: bold_red + costom_format + reset,
}
def format(self, record):
log_fmt = self.formats.get(record.levelno)
formatter = logging.Formatter(log_fmt)
return formatter.format(record)
class OutputLogger:
"""
serves as a pseudo file-like stream object that redirects written content to a logger instance.
It overrides the `write` method to append the written messages to an internal buffer
(`linebuf`). When a message ends with a newline character, it logs the buffered messages
as a log record.
"""
def __init__(self, logger_name: str, logging_level: int = logging.INFO) -> None:
self.logger = logging.getLogger(logger_name)
self.logging_level = logging_level
self.linebuf = ""
def write(self, msg: str) -> None:
r"""
If redirected output end without `\n`, append it into `self.linebuf`,
and log it out when the redirected output which end with `\n` comes.
"""
self.linebuf += msg
if msg.endswith("\n") is True:
self.logger.log(
self.logging_level,
self.linebuf.rstrip(),
stack_info=False,
stacklevel=2,
)
self.linebuf = ""
def flush(self) -> None:
"""
Note that self.linebuf = '' is where the flush is being handled,
rather than implementing a flush function.
"""
class MultiProcessLogger:
"""
Implements a custom logger designed for multi-process environments.
It is based on the 'Logging Cookbook - Logging to a single file from multiple processes' example
in the Python documentation. It utilizes a multiprocessing queue and a listener process to
enable logging across multiple processes. The class provides functionality for logging records
to a file and printing logs to the console.
"""
def __init__(
self,
# record logs into file
log_path: str | Path | None = None,
level_log: int = logging.DEBUG,
format_log: str = LOGGING_FORMAT,
base_log_name: str = BASE_LOG_NAME,
suffix_log_name: str = SUFFIX_LOG_NAME,
rotate_period: tuple[str, int] = ("midnight", 1),
# print logs on console
level_console: int = logging.INFO,
format_console: str = LOGGING_FORMAT,
):
self.log_path = log_path
self.level_log = level_log
self.format_log = format_log
self.base_log_name = base_log_name
self.suffix_log_name = suffix_log_name
self.rotate_period = rotate_period
self.level_console = level_console
self.format_console = format_console
self.queue = multiprocessing.Manager().Queue(-1)
self.listener = multiprocessing.Process(
target=self.listener_process, args=(self.queue, self.listener_configurer)
)
def listener_configurer(self):
root = logging.getLogger()
# logger for logs
if self.log_path is not None:
formatter_log = logging.Formatter(self.format_log)
handler_log = handlers.TimedRotatingFileHandler(
(Path(self.log_path) / self.base_log_name).resolve(),
when=self.rotate_period[0],
interval=self.rotate_period[1],
encoding="utf-8",
)
handler_log.setFormatter(formatter_log)
handler_log.suffix = self.suffix_log_name
root.addHandler(handler_log)
# logger for console
handler_console = logging.StreamHandler()
handler_console.setLevel(self.level_console)
handler_console.setFormatter(CustomFormatter(self.format_console))
root.addHandler(handler_console)
def listener_process(self, _queue: queue.Queue, configurer: Callable) -> None:
configurer()
while True:
try:
record = _queue.get()
if record is None:
break
logger = logging.getLogger(record.name)
logger.handle(record)
except Exception:
import sys
import traceback
print(
"Oops! An issue occurred during the logging process:",
file=sys.stderr,
)
traceback.print_exc(file=sys.stderr)
def start(self):
self.listener.start()
def join(self):
self.queue.put_nowait(None)
self.listener.join()
def worker_configurer(_queue: queue.Queue, worker_level: int = logging.DEBUG) -> None:
"""
Configure the logger for worker processes with input `_queue` handler.
"""
handler = handlers.QueueHandler(_queue)
root = logging.getLogger()
root.addHandler(handler)
# send all messages
root.setLevel(worker_level)
Introdoction
I want to develop a small project about configuring logging
. I have completed most of it on haplog(my own repo). However, I realized thathaplog
did not support multi-process, so I referred to Logging to a single file from multiple processes and implemented it.
I was able to confirm that it works perfectly when installing haplog
via executing:
pip install "https://github.com/changchiyou/haplog/archive/main/v1.0.2.zip"
And running the demo code
import concurrent.futures
import logging
import logging.handlers
import multiprocessing
from contextlib import redirect_stdout
from pathlib import Path
from haplog import MultiProcessLogger, OutputLogger, worker_configurer
LOGGER_NAME = "test"
MESSAGE = "test"
log_folder = (Path(__file__).parent) / "logs"
if log_folder.exists() is False:
log_folder.mkdir()
def third_party_function():
print(MESSAGE + " by third_party_function()")
def single_process():
mpl = MultiProcessLogger(log_folder, level_console=logging.DEBUG)
mpl.start()
worker_configurer(mpl.queue)
logger = logging.getLogger(LOGGER_NAME)
with redirect_stdout(
OutputLogger(logger_name=LOGGER_NAME, logging_level=logging.DEBUG) # type: ignore
):
print(MESSAGE + " by print()")
third_party_function()
logger.debug(MESSAGE)
logger.info(MESSAGE)
logger.warning(MESSAGE)
logger.error(MESSAGE)
logger.critical(MESSAGE)
mpl.join()
def worker_process(queue, configurer):
import time
from random import random
configurer(queue)
name = multiprocessing.current_process().name
logger = logging.getLogger(name)
logger.info("Worker started: %s" % name)
time.sleep(random())
logger.info("Worker finished: %s" % name)
def multi_process():
mpl = MultiProcessLogger(log_folder, level_console=logging.DEBUG)
mpl.start()
with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
for _ in range(10):
executor.submit(worker_process, mpl.queue, worker_configurer)
mpl.join()
if __name__ == "__main__":
single_process()
multi_process()
I got the result
2023-06-16 12:11:01,919 DEBUG [test] demo_haplog.py - single_process() : test by print()
2023-06-16 12:11:01,920 DEBUG [test] demo_haplog.py - third_party_function() : test by third_party_function()
2023-06-16 12:11:01,920 DEBUG [test] demo_haplog.py - single_process() : test
2023-06-16 12:11:01,920 INFO [test] demo_haplog.py - single_process() : test
2023-06-16 12:11:01,920 WARNING [test] demo_haplog.py - single_process() : test
2023-06-16 12:11:01,920 ERROR [test] demo_haplog.py - single_process() : test
2023-06-16 12:11:01,920 CRITICAL [test] demo_haplog.py - single_process() : test
2023-06-16 12:11:02,222 INFO [SpawnProcess-5] demo_haplog.py - worker_process() : Worker started: SpawnProcess-5
2023-06-16 12:11:02,229 INFO [SpawnProcess-7] demo_haplog.py - worker_process() : Worker started: SpawnProcess-7
2023-06-16 12:11:02,230 INFO [SpawnProcess-6] demo_haplog.py - worker_process() : Worker started: SpawnProcess-6
2023-06-16 12:11:02,243 INFO [SpawnProcess-8] demo_haplog.py - worker_process() : Worker started: SpawnProcess-8
2023-06-16 12:11:02,246 INFO [SpawnProcess-9] demo_haplog.py - worker_process() : Worker started: SpawnProcess-9
2023-06-16 12:11:02,253 INFO [SpawnProcess-10] demo_haplog.py - worker_process() : Worker started: SpawnProcess-10
2023-06-16 12:11:02,276 INFO [SpawnProcess-11] demo_haplog.py - worker_process() : Worker started: SpawnProcess-11
2023-06-16 12:11:02,287 INFO [SpawnProcess-12] demo_haplog.py - worker_process() : Worker started: SpawnProcess-12
2023-06-16 12:11:02,304 INFO [SpawnProcess-13] demo_haplog.py - worker_process() : Worker started: SpawnProcess-13
2023-06-16 12:11:02,316 INFO [SpawnProcess-14] demo_haplog.py - worker_process() : Worker started: SpawnProcess-14
2023-06-16 12:11:02,538 INFO [SpawnProcess-7] demo_haplog.py - worker_process() : Worker finished: SpawnProcess-7
2023-06-16 12:11:02,562 INFO [SpawnProcess-11] demo_haplog.py - worker_process() : Worker finished: SpawnProcess-11
2023-06-16 12:11:02,634 INFO [SpawnProcess-6] demo_haplog.py - worker_process() : Worker finished: SpawnProcess-6
2023-06-16 12:11:02,665 INFO [SpawnProcess-5] demo_haplog.py - worker_process() : Worker finished: SpawnProcess-5
2023-06-16 12:11:02,787 INFO [SpawnProcess-14] demo_haplog.py - worker_process() : Worker finished: SpawnProcess-14
2023-06-16 12:11:02,791 INFO [SpawnProcess-13] demo_haplog.py - worker_process() : Worker finished: SpawnProcess-13
2023-06-16 12:11:02,962 INFO [SpawnProcess-9] demo_haplog.py - worker_process() : Worker finished: SpawnProcess-9
2023-06-16 12:11:03,047 INFO [SpawnProcess-12] demo_haplog.py - worker_process() : Worker finished: SpawnProcess-12
2023-06-16 12:11:03,184 INFO [SpawnProcess-10] demo_haplog.py - worker_process() : Worker finished: SpawnProcess-10
2023-06-16 12:11:03,243 INFO [SpawnProcess-8] demo_haplog.py - worker_process() : Worker finished: SpawnProcess-8
As evidenced by this screenshot: result.imgur (I know @Klaus D. said "Links to code or images of text can not be accepted on Stack Overflow.", but my project includes the feature about coloring text, so I guess that I have to provide the VFX)
What have I tried
However, I don't know how to update my unit test. Before (updated the feature about supporting multi-process), I could easily check the output via pytest.capsys
:
def test_console_info(capsys):
instantiate_logger(LOGGER_NAME)
logger = logging.getLogger(LOGGER_NAME)
logger.info(MESSAGE)
captured = capsys.readouterr()
assert f'INFO {Path(__file__).name} - test_console_info() : {MESSAGE}' in captured.err
But after I updated it, seems capsys
has no ability to catch the log from other process:
def test_console_info(capsys):
mpl = MultiProcessLogger()
mpl.start()
worker_configurer(mpl.queue)
logger = logging.getLogger(LOGGER_NAME)
logger.info(MESSAGE)
captured = capsys.readouterr()
assert (
f"INFO {Path(__file__).name} - test_console_info() : {MESSAGE}"
in captured.err
)
E AssertionError: assert 'INFO test_instantiate_logger.py - test_console_info() : test' in ''
E + where '' = CaptureResult(out='', err='').err
I have also tried caplog
and capfd
, but none of them works for me.
Due to the fact that haplog
now uses Queue
and QueueHandler
to send all logging events to one of the processes so that it can handle multiple processes, I don't know how to read/check the entire logs from another process using pytest
.
答案1
得分: 0
I have also tried caplog and capfd, but none of them works for me.
我也尝试过caplog和capfd,但都不适用于我。
I found that capfd
actually works for me, it failed because I forget execute mpl.join()
(close the log-listener process) before.
我发现capfd
实际上适用于我,之前失败是因为我忘记在之前执行mpl.join()
(关闭日志监听进程)。
<details>
<summary>英文:</summary>
> I have also tried caplog and capfd, but none of them works for me.
I found that `capfd` actually works for me, it failed because I forget execute `mpl.join()`(close the log-listener process) before.
=========================================================== test session starts ============================================================
platform darwin -- Python 3.10.11, pytest-7.3.1, pluggy-1.0.0
rootdir: /Users/christopherchang/Code/logger_utils
collected 6 items
tests/test_instantiate_logger.py ...... [100%]
============================================================ 6 passed in 1.94s =============================================================
import logging
from pathlib import Path
from haplog import BASE_LOG_NAME, MultiProcessLogger, worker_configurer
https://stackoverflow.com/questions/76487303/pytest-unit-test-for-loggingmulti-processqueuehandler
May be useful: https://github.com/pytest-dev/pytest/issues/3037#issuecomment-745050393
LOGGER_NAME = "test"
MESSAGE = "test"
def test_console_default_info(capfd):
mpl = MultiProcessLogger()
mpl.start()
worker_configurer(mpl.queue)
logger = logging.getLogger(LOGGER_NAME)
logger.info(MESSAGE)
mpl.join()
captured = capfd.readouterr()
assert (
f"INFO [{LOGGER_NAME}] {Path(__file__).name} - test_console_default_info() : {MESSAGE}"
in captured.err
)
def test_console_unused_config_path(tmp_path, capfd):
mpl = MultiProcessLogger(tmp_path)
mpl.start()
worker_configurer(mpl.queue)
logger = logging.getLogger(LOGGER_NAME)
logger.info(MESSAGE)
mpl.join()
captured = capfd.readouterr()
assert (
f"INFO [{LOGGER_NAME}] {Path(__file__).name} - test_console_unused_config_path() : {MESSAGE}"
in captured.err
)
def test_console_info(capfd):
mpl = MultiProcessLogger(level_console=logging.INFO)
mpl.start()
worker_configurer(mpl.queue)
logger = logging.getLogger(LOGGER_NAME)
logger.info(MESSAGE)
mpl.join()
captured = capfd.readouterr()
assert (
f"INFO [{LOGGER_NAME}] {Path(__file__).name} - test_console_info() : {MESSAGE}"
in captured.err
)
def test_console_debug(capfd):
mpl = MultiProcessLogger(level_console=logging.DEBUG)
mpl.start()
worker_configurer(mpl.queue)
logger = logging.getLogger(LOGGER_NAME)
logger.debug(MESSAGE)
mpl.join()
captured = capfd.readouterr()
assert (
f"DEBUG [{LOGGER_NAME}] {Path(__file__).name} - test_console_debug() : {MESSAGE}"
in captured.err
)
def test_console_debug_info_level(capfd):
mpl = MultiProcessLogger()
mpl.start()
worker_configurer(mpl.queue)
logger = logging.getLogger(LOGGER_NAME)
logger.debug(MESSAGE)
logger.info(MESSAGE)
mpl.join()
captured = capfd.readouterr()
assert (
f"DEBUG [{LOGGER_NAME}] {Path(__file__).name} - test_console_debug_info_level() : {MESSAGE}"
not in captured.err
)
assert (
f"INFO [{LOGGER_NAME}] {Path(__file__).name} - test_console_debug_info_level() : {MESSAGE}"
in captured.err
)
def test_log(tmp_path, capfd):
mpl = MultiProcessLogger(log_path=tmp_path, level_console=logging.DEBUG)
mpl.start()
worker_configurer(mpl.queue)
logger = logging.getLogger(LOGGER_NAME)
logger.info(MESSAGE)
logger.debug(MESSAGE)
mpl.join()
with open(tmp_path / BASE_LOG_NAME, mode="r") as File:
contents = File.read()
File.close()
captured = capfd.readouterr()
assert (
f"INFO [{LOGGER_NAME}] {Path(__file__).name} - test_log() : {MESSAGE}"
in captured.err
)
assert (
f"INFO [{LOGGER_NAME}] {Path(__file__).name} - test_log() : {MESSAGE}"
in contents
)
assert (
f"DEBUG [{LOGGER_NAME}] {Path(__file__).name} - test_log() : {MESSAGE}"
in contents
)
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论