英文:
Monitor a file-descriptor in background with python
问题
由于我的问题在这里,我必须在后台监视一个文件描述符。
我已经尝试过这个:
import gpiod
import asyncio
import select
async def read():
name = "GPOUT"
line = chip.get_line(INPUT)
line.request(consumer=name, type=gpiod.LINE_REQ_EV_BOTH_EDGES, flags=gpiod.LINE_REQ_FLAG_BIAS_PULL_UP)
fd = line.event_get_fd()
poll = select.poll()
poll.register(fd)
loop = asyncio.get_event_loop()
while True:
await loop.run_in_executor(None, poll.poll, None)
event = line.event_read()
print(event)
def main():
asyncio.run(read())
while True:
input = input("Enter something: ")
print(input)
main()
在poll.poll
调用处被阻塞。我还尝试过这个:
import gpiod
import asyncio
def readCb(line):
print(f"{line.consumer}: {line.event_read()}")
def read():
name = "GPOUT"
line = chip.get_line(INPUT)
line.request(consumer=name, type=gpiod.LINE_REQ_EV_BOTH_EDGES, flags=gpiod.LINE_REQ_FLAG_BIAS_PULL_UP)
fd = line.event_get_fd()
loop = asyncio.get_event_loop()
loop.add_reader(fd, readCb, line)
loop.run_forever()
def main():
read()
while True:
input = input("Enter something: ")
print(input)
main()
在read
函数中也被阻塞。
对于第二个示例,我还尝试将read()
声明为async
并使用asyncio.run()
调用它,但然后会出现以下错误:
File "/usr/lib/python3.9/asyncio/base_events.py", line 578, in _check_running
raise RuntimeError('This event loop is already running')
除了使用真正的线程和子进程之外,是否有任何可能在Python中完成这个任务的方法?
非常感谢您的帮助,最好的问候,
Cone
英文:
Due to my question here, I have to monitor a file-descriptor in the background.
I have tried this:
import gpiod
import asyncio
import select
async def read():
name = "GPOUT"
line = chip.get_line(INPUT)
line.request(consumer=name, type=gpiod.LINE_REQ_EV_BOTH_EDGES, flags=gpiod.LINE_REQ_FLAG_BIAS_PULL_UP)
fd = line.event_get_fd()
poll = select.poll()
poll.register(fd)
loop = asyncio.get_event_loop()
while True:
await loop.run_in_executor(None, poll.poll, None)
event = line.event_read()
print(event)
def main():
asyncio.run(read())
while True:
input = input("Enter something: ")
print(input)
main()
The execution is blocked at call off poll.poll
. I have also tried this:
import gpiod
import asyncio
def readCb(line):
print(f"{line.consumer}: {line.event_read()}")
def read():
name = "GPOUT"
line = chip.get_line(INPUT)
line.request(consumer=name, type=gpiod.LINE_REQ_EV_BOTH_EDGES, flags=gpiod.LINE_REQ_FLAG_BIAS_PULL_UP)
fd = line.event_get_fd()
loop = asyncio.get_event_loop()
loop.add_reader(fd, readCb, line)
loop.run_forever()
def main():
read()
while True:
input = input("Enter something: ")
print(input)
main()
Also blocks in the read function.
For the second example, I also tried to make read()
async
and call it with asyncio.run()
but then the following error occurs:
File "/usr/lib/python3.9/asyncio/base_events.py", line 578, in _check_running
raise RuntimeError('This event loop is already running')
Is there any possibility to do this in python, besides real threads and subprocesses?
Many thanks for your help and best regards,
Cone
答案1
得分: 0
尽管从名称上看,poll.poll
会阻塞,直到其中一个注册的 fd
准备就绪。它不是轮询。
使用 libgpiod
从一行中读取事件,如果没有事件存在,会阻塞。因此,如果你不希望它阻塞,首先检查 fd
是否可读,以确定是否有事件可用。
如果你的其他代码也受限于某个 fd
上的 I/O 操作,那么你可以让 poll
等待任何一个事件。这是传统的异步风格。
如果你的其他代码不会在某个 fd
上阻塞,那么你有两个选项,即线程和轮询。使用线程,你可以在一个 Python 线程中运行 libgpiod
调用,而在另一个线程中运行其他代码。使用轮询,你会定期检查该行,以查看是否发生了边缘事件。轮询通常比线程效率低,但如果你认为 Python 线程是“真正的线程”,那么你唯一的选择就是轮询。
英文:
Despite what you might think from the name, poll.poll
blocks until one of the registered fd
s is ready. It does not poll.
Reading events from a line with libgpiod
is blocking if no event is present.
So if you don't want it to block then check the there is an event available first - by testing that the fd
is readable.
If the other code that you want to run is also I/O bound on an fd
then you can have the poll
wait for events from either. That is traditional async style.
If your other code does not block on an fd
then you have two options, threading or polling. With threading you run the libgpiod
calls in one Python thread and your other code in another. With polling, you periodically check the line to see if an edge event has occurred. Polling is generally less efficient than threading, but if you consider Python threads to be "real threads" then the solution you are left with is polling.
答案2
得分: 0
感谢Kent Gibson在我的另一个问题上的帮助,我能够构建以下内容:
import gpiod
import asyncio
async def read():
name = "GPOUT"
line = chip.get_line(INPUT)
line.request(consumer=name, type=gpiod.LINE_REQ_EV_BOTH_EDGES, flags=gpiod.LINE_REQ_FLAG_BIAS_PULL_UP)
loop = asyncio.get_event_loop()
while True:
await loop.run_in_executor(None, line.event_wait, None)
event = line.event_read()
print(event)
def main():
asyncio.run(read())
while True:
input = input("输入内容:")
print(input)
main()
这个代码能够按照预期工作。
英文:
Thanks to the help from Kent Gibson on my other question, I was able to build the following:
import gpiod
import asyncio
async def read():
name = "GPOUT"
line = chip.get_line(INPUT)
line.request(consumer=name, type=gpiod.LINE_REQ_EV_BOTH_EDGES, flags=gpiod.LINE_REQ_FLAG_BIAS_PULL_UP)
loop = asyncio.get_event_loop()
while True:
await loop.run_in_executor(None, line.event_wait, None)
event = line.event_read()
print(event)
def main():
asyncio.run(read())
while True:
input = input("Enter something: ")
print(input)
main()
Which works as intendend.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论