用Python在后台监视文件描述符

huangapple go评论63阅读模式
英文:

Monitor a file-descriptor in background with python

问题

由于我的问题在这里,我必须在后台监视一个文件描述符。

我已经尝试过这个:

import gpiod
import asyncio
import select

async def read():
    name = "GPOUT"
    line = chip.get_line(INPUT)
    line.request(consumer=name, type=gpiod.LINE_REQ_EV_BOTH_EDGES, flags=gpiod.LINE_REQ_FLAG_BIAS_PULL_UP)
    fd = line.event_get_fd()
    poll = select.poll()
    poll.register(fd)
    loop = asyncio.get_event_loop()
    while True:
        await loop.run_in_executor(None, poll.poll, None)
        event = line.event_read()
        print(event)

def main():
    asyncio.run(read())
    while True:
        input = input("Enter something: ")
        print(input)

main()

poll.poll调用处被阻塞。我还尝试过这个:

import gpiod
import asyncio

def readCb(line):
    print(f"{line.consumer}: {line.event_read()}")

def read():
    name = "GPOUT"
    line = chip.get_line(INPUT)
    line.request(consumer=name, type=gpiod.LINE_REQ_EV_BOTH_EDGES, flags=gpiod.LINE_REQ_FLAG_BIAS_PULL_UP)
    fd = line.event_get_fd()
    loop = asyncio.get_event_loop()
    loop.add_reader(fd, readCb, line)
    loop.run_forever()

def main():
    read()
    while True:
        input = input("Enter something: ")
        print(input)

main()

read函数中也被阻塞。

对于第二个示例,我还尝试将read()声明为async并使用asyncio.run()调用它,但然后会出现以下错误:

File "/usr/lib/python3.9/asyncio/base_events.py", line 578, in _check_running
    raise RuntimeError('This event loop is already running')

除了使用真正的线程和子进程之外,是否有任何可能在Python中完成这个任务的方法?

非常感谢您的帮助,最好的问候,

Cone

英文:

Due to my question here, I have to monitor a file-descriptor in the background.

I have tried this:

import gpiod
import asyncio
import select

async def read():
    name = "GPOUT"
    line = chip.get_line(INPUT)
    line.request(consumer=name, type=gpiod.LINE_REQ_EV_BOTH_EDGES, flags=gpiod.LINE_REQ_FLAG_BIAS_PULL_UP)
    fd = line.event_get_fd()
    poll = select.poll()
    poll.register(fd)
    loop = asyncio.get_event_loop()
    while True:
        await loop.run_in_executor(None, poll.poll, None)
        event = line.event_read()
        print(event)

def main():
    asyncio.run(read())
    while True:
        input = input("Enter something: ")
        print(input)

main()

The execution is blocked at call off poll.poll. I have also tried this:

import gpiod
import asyncio

def readCb(line):
    print(f"{line.consumer}: {line.event_read()}")

def read():
    name = "GPOUT"
    line = chip.get_line(INPUT)
    line.request(consumer=name, type=gpiod.LINE_REQ_EV_BOTH_EDGES, flags=gpiod.LINE_REQ_FLAG_BIAS_PULL_UP)
    fd = line.event_get_fd()
    loop = asyncio.get_event_loop()
    loop.add_reader(fd, readCb, line)
    loop.run_forever()

def main():
    read()
    while True:
        input = input("Enter something: ")
        print(input)

main()

Also blocks in the read function.

For the second example, I also tried to make read() async and call it with asyncio.run() but then the following error occurs:

  File "/usr/lib/python3.9/asyncio/base_events.py", line 578, in _check_running
    raise RuntimeError('This event loop is already running')

Is there any possibility to do this in python, besides real threads and subprocesses?

Many thanks for your help and best regards,

Cone

答案1

得分: 0

尽管从名称上看,poll.poll 会阻塞,直到其中一个注册的 fd 准备就绪。它不是轮询。

使用 libgpiod 从一行中读取事件,如果没有事件存在,会阻塞。因此,如果你不希望它阻塞,首先检查 fd 是否可读,以确定是否有事件可用。

如果你的其他代码也受限于某个 fd 上的 I/O 操作,那么你可以让 poll 等待任何一个事件。这是传统的异步风格。

如果你的其他代码不会在某个 fd 上阻塞,那么你有两个选项,即线程和轮询。使用线程,你可以在一个 Python 线程中运行 libgpiod 调用,而在另一个线程中运行其他代码。使用轮询,你会定期检查该行,以查看是否发生了边缘事件。轮询通常比线程效率低,但如果你认为 Python 线程是“真正的线程”,那么你唯一的选择就是轮询。

英文:

Despite what you might think from the name, poll.poll blocks until one of the registered fds is ready. It does not poll.

Reading events from a line with libgpiod is blocking if no event is present.
So if you don't want it to block then check the there is an event available first - by testing that the fd is readable.

If the other code that you want to run is also I/O bound on an fd then you can have the poll wait for events from either. That is traditional async style.

If your other code does not block on an fd then you have two options, threading or polling. With threading you run the libgpiod calls in one Python thread and your other code in another. With polling, you periodically check the line to see if an edge event has occurred. Polling is generally less efficient than threading, but if you consider Python threads to be "real threads" then the solution you are left with is polling.

答案2

得分: 0

感谢Kent Gibson在我的另一个问题上的帮助,我能够构建以下内容:

import gpiod
import asyncio

async def read():
    name = "GPOUT"
    line = chip.get_line(INPUT)
    line.request(consumer=name, type=gpiod.LINE_REQ_EV_BOTH_EDGES, flags=gpiod.LINE_REQ_FLAG_BIAS_PULL_UP)
    loop = asyncio.get_event_loop()
    while True:
        await loop.run_in_executor(None, line.event_wait, None)
        event = line.event_read()
        print(event)

def main():
    asyncio.run(read())
    while True:
        input = input("输入内容:")
        print(input)

main()

这个代码能够按照预期工作。

英文:

Thanks to the help from Kent Gibson on my other question, I was able to build the following:

import gpiod
import asyncio

async def read():
    name = "GPOUT"
    line = chip.get_line(INPUT)
    line.request(consumer=name, type=gpiod.LINE_REQ_EV_BOTH_EDGES, flags=gpiod.LINE_REQ_FLAG_BIAS_PULL_UP)
    loop = asyncio.get_event_loop()
    while True:
        await loop.run_in_executor(None, line.event_wait, None)
        event = line.event_read()
        print(event)

def main():
    asyncio.run(read())
    while True:
        input = input("Enter something: ")
        print(input)

main()

Which works as intendend.

huangapple
  • 本文由 发表于 2023年7月13日 17:05:50
  • 转载请务必保留本文链接:https://go.coder-hub.com/76677680.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定