pytest单元测试”logging+multi-process+QueueHandler”。

huangapple go评论59阅读模式
英文:

pytest unit test for "logging+multi-process+QueueHandler"

问题

import logging
import multiprocessing
import platform
import queue
from logging import handlers
from pathlib import Path
from typing import Callable

LOGGING_FORMAT = (
    "%(asctime)s %(levelname)-8s [%(name)s] %(filename)s - %(funcName)s() : %(message)s"
)
BASE_LOG_NAME = "record"
SUFFIX_LOG_NAME = "%Y-%m-%d.log"


class CustomFormatter(logging.Formatter):
    """
    Custom log formatter class that inherits from `logging.Formatter`.
    It formats the log messages based on their log levels and applies color settings.
    """

    def __init__(self, custom_format: str):
        super().__init__()

        if platform.system() == "Windows":
            from colorama import init

            init()

        grey = "\x1b[38;20m"
        red = "\x1b[31;20m"
        bold_red = "\x1b[31;1m"
        green = "\x1b[32;20m"
        yellow = "\x1b[33;20m"
        reset = "\x1b[0m"

        self.formats = {
            logging.DEBUG: grey + custom_format + reset,
            logging.INFO: green + custom_format + reset,
            logging.WARNING: yellow + custom_format + reset,
            logging.ERROR: red + custom_format + reset,
            logging.CRITICAL: bold_red + custom_format + reset,
        }

    def format(self, record):
        log_fmt = self.formats.get(record.levelno)
        formatter = logging.Formatter(log_fmt)
        return formatter.format(record)


class OutputLogger:
    """
    Serves as a pseudo file-like stream object that redirects written content to a logger instance.
    It overrides the `write` method to append the written messages to an internal buffer (`linebuf`).
    When a message ends with a newline character, it logs the buffered messages as a log record.
    """

    def __init__(self, logger_name: str, logging_level: int = logging.INFO):
        self.logger = logging.getLogger(logger_name)
        self.logging_level = logging_level
        self.linebuf = ""

    def write(self, msg: str):
        self.linebuf += msg
        if msg.endswith("\n"):
            self.logger.log(
                self.logging_level,
                self.linebuf.rstrip(),
                stack_info=False,
                stacklevel=2,
            )
            self.linebuf = ""

    def flush(self):
        pass


class MultiProcessLogger:
    """
    Implements a custom logger designed for multi-process environments.
    It is based on the 'Logging Cookbook - Logging to a single file from multiple processes' example
    in the Python documentation. It utilizes a multiprocessing queue and a listener process to
    enable logging across multiple processes. The class provides functionality for logging records
    to a file and printing logs to the console.
    """

    def __init__(
        self,
        log_path: str | Path | None = None,
        level_log: int = logging.DEBUG,
        format_log: str = LOGGING_FORMAT,
        base_log_name: str = BASE_LOG_NAME,
        suffix_log_name: str = SUFFIX_LOG_NAME,
        rotate_period: tuple[str, int] = ("midnight", 1),
        level_console: int = logging.INFO,
        format_console: str = LOGGING_FORMAT,
    ):
        self.log_path = log_path
        self.level_log = level_log
        self.format_log = format_log
        self.base_log_name = base_log_name
        self.suffix_log_name = suffix_log_name
        self.rotate_period = rotate_period
        self.level_console = level_console
        self.format_console = format_console

        self.queue = multiprocessing.Manager().Queue(-1)
        self.listener = multiprocessing.Process(
            target=self.listener_process, args=(self.queue, self.listener_configurer)
        )

    def listener_configurer(self):
        root = logging.getLogger()

        if self.log_path is not None:
            formatter_log = logging.Formatter(self.format_log)
            handler_log = handlers.TimedRotatingFileHandler(
                (Path(self.log_path) / self.base_log_name).resolve(),
                when=self.rotate_period[0],
                interval=self.rotate_period[1],
                encoding="utf-8",
            )
            handler_log.setFormatter(formatter_log)
            handler_log.suffix = self.suffix_log_name
            root.addHandler(handler_log)

        handler_console = logging.StreamHandler()
        handler_console.setLevel(self.level_console)
        handler_console.setFormatter(CustomFormatter(self.format_console))
        root.addHandler(handler_console)

    def listener_process(self, _queue: queue.Queue, configurer: Callable):
        configurer()
        while True:
            try:
                record = _queue.get()
                if record is None:
                    break
                logger = logging.getLogger(record.name)
                logger.handle(record)
            except Exception:
                import sys
                import traceback

                print(
                    "Oops! An issue occurred during the logging process:",
                    file=sys.stderr,
                )
                traceback.print_exc(file=sys.stderr)

    def start(self):
        self.listener.start()

    def join(self):
        self.queue.put_nowait(None)
        self.listener.join()


def worker_configurer(_queue: queue.Queue, worker_level: int = logging.DEBUG):
    handler = handlers.QueueHandler(_queue)
    root = logging.getLogger()
    root.addHandler(handler)
    root.setLevel(worker_level)


def test_console_info(capsys):
    mpl = MultiProcessLogger()
    mpl.start()
    worker_configurer(mpl.queue)

    logger = logging.getLogger(LOGGER_NAME)

    logger.info(MESSAGE)

    captured = capsys.readouterr()

    assert (
        f"INFO     {Path(__file__).name} - test_console_info() : {MESSAGE}"
        in captured.err
    )
英文:

Question

I don't know how to write the unit test for the codes below:

"""Some implements of `logging` enhancement."""
import logging
import multiprocessing
import platform
import queue
from logging import handlers
from pathlib import Path
from typing import Callable

LOGGING_FORMAT = (
    "%(asctime)s %(levelname)-8s [%(name)s] %(filename)s - %(funcName)s() : %(message)s"
)
BASE_LOG_NAME = "record"
SUFFIX_LOG_NAME = "%Y-%m-%d.log"


class CustomFormatter(logging.Formatter):
    """
    Custom log formatter class that inherits from `logging.Formatter`.
    It formats the log messages based on their log levels and applies color settings.
    """

    def __init__(self, costom_format: str):
        super().__init__()

        if platform.system() == "Windows":
            from colorama import init  # type: ignore

            init()

        # https://talyian.github.io/ansicolors/

        # black = "\x1b[30;20m"
        red = "\x1b[31;20m"
        bold_red = "\x1b[31;1m"
        green = "\x1b[32;20m"
        yellow = "\x1b[33;20m"
        # blue = "\x1b[34;20m"
        # magenta = "\x1b[35;20m"
        # cyan = "\x1b[36;20m"
        # white = "\x1b[37;20m"
        grey = "\x1b[38;20m"

        # \x1b[38;2;r;g;bm - foreground
        # \x1b[48;2;r;g;bm - background

        reset = "\x1b[0m"

        self.formats = {
            logging.DEBUG: grey + costom_format + reset,
            logging.INFO: green + costom_format + reset,
            logging.WARNING: yellow + costom_format + reset,
            logging.ERROR: red + costom_format + reset,
            logging.CRITICAL: bold_red + costom_format + reset,
        }

    def format(self, record):
        log_fmt = self.formats.get(record.levelno)
        formatter = logging.Formatter(log_fmt)
        return formatter.format(record)


class OutputLogger:
    """
    serves as a pseudo file-like stream object that redirects written content to a logger instance.
    It overrides the `write` method to append the written messages to an internal buffer
    (`linebuf`). When a message ends with a newline character, it logs the buffered messages
    as a log record.
    """

    def __init__(self, logger_name: str, logging_level: int = logging.INFO) -> None:
        self.logger = logging.getLogger(logger_name)
        self.logging_level = logging_level
        self.linebuf = ""

    def write(self, msg: str) -> None:
        r"""
        If redirected output end without `\n`, append it into `self.linebuf`,
        and log it out when the redirected output which end with `\n` comes.
        """
        self.linebuf += msg
        if msg.endswith("\n") is True:
            self.logger.log(
                self.logging_level,
                self.linebuf.rstrip(),
                stack_info=False,
                stacklevel=2,
            )
            self.linebuf = ""

    def flush(self) -> None:
        """
        Note that self.linebuf = '' is where the flush is being handled,
        rather than implementing a flush function.
        """


class MultiProcessLogger:
    """
    Implements a custom logger designed for multi-process environments.
    It is based on the 'Logging Cookbook - Logging to a single file from multiple processes' example
    in the Python documentation. It utilizes a multiprocessing queue and a listener process to
    enable logging across multiple processes. The class provides functionality for logging records
    to a file and printing logs to the console.
    """

    def __init__(
        self,
        # record logs into file
        log_path: str | Path | None = None,
        level_log: int = logging.DEBUG,
        format_log: str = LOGGING_FORMAT,
        base_log_name: str = BASE_LOG_NAME,
        suffix_log_name: str = SUFFIX_LOG_NAME,
        rotate_period: tuple[str, int] = ("midnight", 1),
        # print logs on console
        level_console: int = logging.INFO,
        format_console: str = LOGGING_FORMAT,
    ):
        self.log_path = log_path
        self.level_log = level_log
        self.format_log = format_log
        self.base_log_name = base_log_name
        self.suffix_log_name = suffix_log_name
        self.rotate_period = rotate_period
        self.level_console = level_console
        self.format_console = format_console

        self.queue = multiprocessing.Manager().Queue(-1)
        self.listener = multiprocessing.Process(
            target=self.listener_process, args=(self.queue, self.listener_configurer)
        )

    def listener_configurer(self):
        root = logging.getLogger()

        # logger for logs
        if self.log_path is not None:
            formatter_log = logging.Formatter(self.format_log)
            handler_log = handlers.TimedRotatingFileHandler(
                (Path(self.log_path) / self.base_log_name).resolve(),
                when=self.rotate_period[0],
                interval=self.rotate_period[1],
                encoding="utf-8",
            )
            handler_log.setFormatter(formatter_log)
            handler_log.suffix = self.suffix_log_name
            root.addHandler(handler_log)

        # logger for console
        handler_console = logging.StreamHandler()
        handler_console.setLevel(self.level_console)
        handler_console.setFormatter(CustomFormatter(self.format_console))
        root.addHandler(handler_console)

    def listener_process(self, _queue: queue.Queue, configurer: Callable) -> None:
        configurer()
        while True:
            try:
                record = _queue.get()
                if record is None:
                    break
                logger = logging.getLogger(record.name)
                logger.handle(record)
            except Exception:
                import sys
                import traceback

                print(
                    "Oops! An issue occurred during the logging process:",
                    file=sys.stderr,
                )
                traceback.print_exc(file=sys.stderr)

    def start(self):
        self.listener.start()

    def join(self):
        self.queue.put_nowait(None)
        self.listener.join()


def worker_configurer(_queue: queue.Queue, worker_level: int = logging.DEBUG) -> None:
    """
    Configure the logger for worker processes with input `_queue` handler.
    """
    handler = handlers.QueueHandler(_queue)
    root = logging.getLogger()
    root.addHandler(handler)
    # send all messages
    root.setLevel(worker_level)

Introdoction

I want to develop a small project about configuring logging. I have completed most of it on haplog(my own repo). However, I realized thathaplog did not support multi-process, so I referred to Logging to a single file from multiple processes and implemented it.

I was able to confirm that it works perfectly when installing haplog via executing:

pip install "https://github.com/changchiyou/haplog/archive/main/v1.0.2.zip"

And running the demo code

import concurrent.futures
import logging
import logging.handlers
import multiprocessing
from contextlib import redirect_stdout
from pathlib import Path

from haplog import MultiProcessLogger, OutputLogger, worker_configurer

LOGGER_NAME = "test"
MESSAGE = "test"

log_folder = (Path(__file__).parent) / "logs"

if log_folder.exists() is False:
    log_folder.mkdir()


def third_party_function():
    print(MESSAGE + " by third_party_function()")


def single_process():
    mpl = MultiProcessLogger(log_folder, level_console=logging.DEBUG)
    mpl.start()
    worker_configurer(mpl.queue)

    logger = logging.getLogger(LOGGER_NAME)

    with redirect_stdout(
        OutputLogger(logger_name=LOGGER_NAME, logging_level=logging.DEBUG)  # type: ignore
    ):
        print(MESSAGE + " by print()")
        third_party_function()

    logger.debug(MESSAGE)
    logger.info(MESSAGE)
    logger.warning(MESSAGE)
    logger.error(MESSAGE)
    logger.critical(MESSAGE)

    mpl.join()


def worker_process(queue, configurer):
    import time
    from random import random

    configurer(queue)

    name = multiprocessing.current_process().name
    logger = logging.getLogger(name)
    logger.info("Worker started: %s" % name)
    time.sleep(random())
    logger.info("Worker finished: %s" % name)


def multi_process():
    mpl = MultiProcessLogger(log_folder, level_console=logging.DEBUG)
    mpl.start()

    with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
        for _ in range(10):
            executor.submit(worker_process, mpl.queue, worker_configurer)

    mpl.join()


if __name__ == "__main__":
    single_process()
    multi_process()

I got the result

2023-06-16 12:11:01,919 DEBUG    [test] demo_haplog.py - single_process() : test by print()
2023-06-16 12:11:01,920 DEBUG    [test] demo_haplog.py - third_party_function() : test by third_party_function()
2023-06-16 12:11:01,920 DEBUG    [test] demo_haplog.py - single_process() : test
2023-06-16 12:11:01,920 INFO     [test] demo_haplog.py - single_process() : test
2023-06-16 12:11:01,920 WARNING  [test] demo_haplog.py - single_process() : test
2023-06-16 12:11:01,920 ERROR    [test] demo_haplog.py - single_process() : test
2023-06-16 12:11:01,920 CRITICAL [test] demo_haplog.py - single_process() : test
2023-06-16 12:11:02,222 INFO     [SpawnProcess-5] demo_haplog.py - worker_process() : Worker started: SpawnProcess-5
2023-06-16 12:11:02,229 INFO     [SpawnProcess-7] demo_haplog.py - worker_process() : Worker started: SpawnProcess-7
2023-06-16 12:11:02,230 INFO     [SpawnProcess-6] demo_haplog.py - worker_process() : Worker started: SpawnProcess-6
2023-06-16 12:11:02,243 INFO     [SpawnProcess-8] demo_haplog.py - worker_process() : Worker started: SpawnProcess-8
2023-06-16 12:11:02,246 INFO     [SpawnProcess-9] demo_haplog.py - worker_process() : Worker started: SpawnProcess-9
2023-06-16 12:11:02,253 INFO     [SpawnProcess-10] demo_haplog.py - worker_process() : Worker started: SpawnProcess-10
2023-06-16 12:11:02,276 INFO     [SpawnProcess-11] demo_haplog.py - worker_process() : Worker started: SpawnProcess-11
2023-06-16 12:11:02,287 INFO     [SpawnProcess-12] demo_haplog.py - worker_process() : Worker started: SpawnProcess-12
2023-06-16 12:11:02,304 INFO     [SpawnProcess-13] demo_haplog.py - worker_process() : Worker started: SpawnProcess-13
2023-06-16 12:11:02,316 INFO     [SpawnProcess-14] demo_haplog.py - worker_process() : Worker started: SpawnProcess-14
2023-06-16 12:11:02,538 INFO     [SpawnProcess-7] demo_haplog.py - worker_process() : Worker finished: SpawnProcess-7
2023-06-16 12:11:02,562 INFO     [SpawnProcess-11] demo_haplog.py - worker_process() : Worker finished: SpawnProcess-11
2023-06-16 12:11:02,634 INFO     [SpawnProcess-6] demo_haplog.py - worker_process() : Worker finished: SpawnProcess-6
2023-06-16 12:11:02,665 INFO     [SpawnProcess-5] demo_haplog.py - worker_process() : Worker finished: SpawnProcess-5
2023-06-16 12:11:02,787 INFO     [SpawnProcess-14] demo_haplog.py - worker_process() : Worker finished: SpawnProcess-14
2023-06-16 12:11:02,791 INFO     [SpawnProcess-13] demo_haplog.py - worker_process() : Worker finished: SpawnProcess-13
2023-06-16 12:11:02,962 INFO     [SpawnProcess-9] demo_haplog.py - worker_process() : Worker finished: SpawnProcess-9
2023-06-16 12:11:03,047 INFO     [SpawnProcess-12] demo_haplog.py - worker_process() : Worker finished: SpawnProcess-12
2023-06-16 12:11:03,184 INFO     [SpawnProcess-10] demo_haplog.py - worker_process() : Worker finished: SpawnProcess-10
2023-06-16 12:11:03,243 INFO     [SpawnProcess-8] demo_haplog.py - worker_process() : Worker finished: SpawnProcess-8

As evidenced by this screenshot: result.imgur (I know @Klaus D. said "Links to code or images of text can not be accepted on Stack Overflow.", but my project includes the feature about coloring text, so I guess that I have to provide the VFX)


What have I tried

However, I don't know how to update my unit test. Before (updated the feature about supporting multi-process), I could easily check the output via pytest.capsys:

def test_console_info(capsys):
    instantiate_logger(LOGGER_NAME)
    logger = logging.getLogger(LOGGER_NAME)

    logger.info(MESSAGE)

    captured = capsys.readouterr()

    assert f'INFO     {Path(__file__).name} - test_console_info() : {MESSAGE}' in captured.err

But after I updated it, seems capsys has no ability to catch the log from other process:

def test_console_info(capsys):
    mpl = MultiProcessLogger()
    mpl.start()
    worker_configurer(mpl.queue)

    logger = logging.getLogger(LOGGER_NAME)

    logger.info(MESSAGE)

    captured = capsys.readouterr()

    assert (
        f"INFO     {Path(__file__).name} - test_console_info() : {MESSAGE}"
        in captured.err
    )
E       AssertionError: assert 'INFO     test_instantiate_logger.py - test_console_info() : test' in ''
E        +  where '' = CaptureResult(out='', err='').err

I have also tried caplog and capfd, but none of them works for me.

Due to the fact that haplog now uses Queue and QueueHandler to send all logging events to one of the processes so that it can handle multiple processes, I don't know how to read/check the entire logs from another process using pytest.

答案1

得分: 0

I have also tried caplog and capfd, but none of them works for me.

我也尝试过caplog和capfd,但都不适用于我。

I found that capfd actually works for me, it failed because I forget execute mpl.join()(close the log-listener process) before.

我发现capfd实际上适用于我,之前失败是因为我忘记在之前执行mpl.join()(关闭日志监听进程)。


<details>
<summary>英文:</summary>
&gt; I have also tried caplog and capfd, but none of them works for me.
I found that `capfd` actually works for me, it failed because I forget execute `mpl.join()`(close the log-listener process) before.

=========================================================== test session starts ============================================================
platform darwin -- Python 3.10.11, pytest-7.3.1, pluggy-1.0.0
rootdir: /Users/christopherchang/Code/logger_utils
collected 6 items

tests/test_instantiate_logger.py ...... [100%]

============================================================ 6 passed in 1.94s =============================================================


import logging
from pathlib import Path

from haplog import BASE_LOG_NAME, MultiProcessLogger, worker_configurer

https://stackoverflow.com/questions/76487303/pytest-unit-test-for-loggingmulti-processqueuehandler

May be useful: https://github.com/pytest-dev/pytest/issues/3037#issuecomment-745050393

LOGGER_NAME = "test"
MESSAGE = "test"

def test_console_default_info(capfd):
mpl = MultiProcessLogger()
mpl.start()
worker_configurer(mpl.queue)

logger = logging.getLogger(LOGGER_NAME)
logger.info(MESSAGE)
mpl.join()
captured = capfd.readouterr()
assert (
f&quot;INFO     [{LOGGER_NAME}] {Path(__file__).name} - test_console_default_info() : {MESSAGE}&quot;
in captured.err
)

def test_console_unused_config_path(tmp_path, capfd):
mpl = MultiProcessLogger(tmp_path)
mpl.start()
worker_configurer(mpl.queue)

logger = logging.getLogger(LOGGER_NAME)
logger.info(MESSAGE)
mpl.join()
captured = capfd.readouterr()
assert (
f&quot;INFO     [{LOGGER_NAME}] {Path(__file__).name} - test_console_unused_config_path() : {MESSAGE}&quot;
in captured.err
)

def test_console_info(capfd):
mpl = MultiProcessLogger(level_console=logging.INFO)
mpl.start()
worker_configurer(mpl.queue)

logger = logging.getLogger(LOGGER_NAME)
logger.info(MESSAGE)
mpl.join()
captured = capfd.readouterr()
assert (
f&quot;INFO     [{LOGGER_NAME}] {Path(__file__).name} - test_console_info() : {MESSAGE}&quot;
in captured.err
)

def test_console_debug(capfd):
mpl = MultiProcessLogger(level_console=logging.DEBUG)
mpl.start()
worker_configurer(mpl.queue)

logger = logging.getLogger(LOGGER_NAME)
logger.debug(MESSAGE)
mpl.join()
captured = capfd.readouterr()
assert (
f&quot;DEBUG    [{LOGGER_NAME}] {Path(__file__).name} - test_console_debug() : {MESSAGE}&quot;
in captured.err
)

def test_console_debug_info_level(capfd):
mpl = MultiProcessLogger()
mpl.start()
worker_configurer(mpl.queue)

logger = logging.getLogger(LOGGER_NAME)
logger.debug(MESSAGE)
logger.info(MESSAGE)
mpl.join()
captured = capfd.readouterr()
assert (
f&quot;DEBUG    [{LOGGER_NAME}] {Path(__file__).name} - test_console_debug_info_level() : {MESSAGE}&quot;
not in captured.err
)
assert (
f&quot;INFO     [{LOGGER_NAME}] {Path(__file__).name} - test_console_debug_info_level() : {MESSAGE}&quot;
in captured.err
)

def test_log(tmp_path, capfd):
mpl = MultiProcessLogger(log_path=tmp_path, level_console=logging.DEBUG)
mpl.start()
worker_configurer(mpl.queue)

logger = logging.getLogger(LOGGER_NAME)
logger.info(MESSAGE)
logger.debug(MESSAGE)
mpl.join()
with open(tmp_path / BASE_LOG_NAME, mode=&quot;r&quot;) as File:
contents = File.read()
File.close()
captured = capfd.readouterr()
assert (
f&quot;INFO     [{LOGGER_NAME}] {Path(__file__).name} - test_log() : {MESSAGE}&quot;
in captured.err
)
assert (
f&quot;INFO     [{LOGGER_NAME}] {Path(__file__).name} - test_log() : {MESSAGE}&quot;
in contents
)
assert (
f&quot;DEBUG    [{LOGGER_NAME}] {Path(__file__).name} - test_log() : {MESSAGE}&quot;
in contents
)

</details>

huangapple
  • 本文由 发表于 2023年6月16日 13:57:37
  • 转载请务必保留本文链接:https://go.coder-hub.com/76487303.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定