英文:
How can I escape from a GPIO.wait_for_edge?
问题
I have this code:
while True:
try:
Stat1 = GPIO.input(WPIN)
time.sleep(WARNST)
Stat2 = GPIO.input(WPIN)
if Stat1 == Stat2: SendWarnStat(Stat2)
Event = None
while Event != WPIN: Event = GPIO.wait_for_edge(WPIN, GPIO.BOTH)
except:
break
Is there a way to make my script exit when I press Ctrl+C or the system asks it to terminate? It seems that everything is blocked by the wait_for_edge
function. I want to send some messages when the state of the pin is changed, and I want to do this without bothering the CPU constantly checking that pin, so I have to use the wait function. Can I move the wait function into a thread and wait for multiple objects, one of them being a flag that is switched from the main thread? I am very new to Python.
英文:
I have this code:
while True:
try:
Stat1 = GPIO.input(WPIN)
time.sleep(WARNST)
Stat2 = GPIO.input(WPIN)
if Stat1 == Stat2: SendWarnStat(Stat2)
Event = None
while Event != WPIN: Event = GPIO.wait_for_edge(WPIN, GPIO.BOTH)
except:
break
Is there a way to make my script exist when I press Ctrl+C or the system ask it to terminate ? It seems that everything is blocked by the wait_for_edge
function... I want to send some messages when the state of the pin is changed, and I want to do this without bother the CPU constantly checking that pin, so I have to use the wait function... Can I move the wait function in a thread and wait for multiple objects, one of them being a flag that is swiched from the main thread ? I am verry new to Python...
答案1
得分: 1
如在维基中描述的那样,您可以通过使用线程化的非阻塞 add_event_detect
+ add_event_callback
来替代阻塞的 wait_for_edge
。
引用:
RPi.GPIO 为回调函数运行第二个线程。这意味着回调函数可以与主程序同时运行,以立即响应边缘事件。
示例代码(未经测试)
# 定义事件的回调函数
def do_something(channel):
print(f"检测到事件的通道 {channel}!")
# 添加事件触发并在循环之外链接回调函数
GPIO.add_event_detect(WPIN, GPIO.BOTH)
GPIO.add_event_callback(WPIN, do_something)
# 您的代码
while True:
try:
Stat1 = GPIO.input(WPIN)
time.sleep(WARNST)
Stat2 = GPIO.input(WPIN)
if Stat1 == Stat2:
SendWarnStat(Stat2)
except:
break
然后,您可以直接从代码的任何地方调用此函数,或者通过定义的事件触发它。
英文:
As described in a wiki, you could replace the blocking wait_for_edge
by a threaded non-blocking add_event_detect
+ add_event_callback
Quote:
> RPi.GPIO runs a second thread for callback functions. This means that callback functions can be run at the same time as your main program, in immediate response to an edge.
Exemplary code (untested)
# define callback for event
def do_something(channel):
print(f"Channel {channel} detected an event!")
# add event trigger and link callback outside of loop
GPIO.add_event_detect(WPIN, GPIO.BOTH)
GPIO.add_event_callback(WPIN, do_something)
# your code
while True:
try:
Stat1 = GPIO.input(WPIN)
time.sleep(WARNST)
Stat2 = GPIO.input(WPIN)
if Stat1 == Stat2: SendWarnStat(Stat2)
except:
break
You then can either call this function directly from any place in your code, or trigger it with the defined event.
答案2
得分: 0
以下是翻译好的部分:
这是我解决问题的方法...
callback_thread
,尽管看起来没有做任何事情,但必须存在,以便可以异步执行my_callback
,而不是在主线程中执行。我不理解它们之间的关联,但这是AI告诉我的。
import RPi.GPIO as GPIO
import threading
import time
import signal
import sys
def my_callback(channel):
global OldStat
time.sleep(3)
NewStat = GPIO.input(channel)
if NewStat != OldStat:
if NewStat == True: print("通道 {} 高电平".format(channel))
else: print("通道 {} 低电平".format(channel))
OldStat = NewStat
def callback_thread(event):
event.wait()
def Exit_Handler(signum, frame):
event.set()
GPIO.cleanup()
thread.join()
sys.exit(0)
signal.signal(signal.SIGTERM, Exit_Handler)
signal.signal(signal.SIGINT, Exit_Handler)
event = threading.Event()
thread = threading.Thread(target=callback_thread, args=(event,))
thread.start()
GPIO.setmode(GPIO.BCM)
GPIO.setup(4, GPIO.IN, pull_up_down=GPIO.PUD_UP)
OldStat = GPIO.input(4)
if OldStat == True: print("通道 4 高电平")
else: print("通道 4 低电平")
GPIO.add_event_detect(4, GPIO.BOTH, callback=my_callback, bouncetime=100)
event.wait()
英文:
This is how I solved the problem...
The callback_thread
, although it doesn't seem to do anything, it must be present so that my_callback
can be executed asynchronously, not in the main thread. I don't understand what is the connection between them, but that's what AI told me.
import RPi.GPIO as GPIO
import threading
import time
import signal
import sys
def my_callback(channel):
global OldStat
time.sleep(3)
NewStat = GPIO.input(channel)
if NewStat != OldStat:
if NewStat == True: print("Channel {} is HIGH".format(channel))
else: print("Channel {} is LOW".format(channel))
OldStat = NewStat
def callback_thread(event):
event.wait()
def Exit_Handler(signum, frame):
event.set()
GPIO.cleanup()
thread.join()
sys.exit(0)
signal.signal(signal.SIGTERM, Exit_Handler)
signal.signal(signal.SIGINT, Exit_Handler)
event = threading.Event()
thread = threading.Thread(target=callback_thread, args=(event,))
thread.start()
GPIO.setmode(GPIO.BCM)
GPIO.setup(4, GPIO.IN, pull_up_down=GPIO.PUD_UP)
OldStat = GPIO.input(4)
if OldStat == True: print("Channel 4 is HIGH")
else: print("Channel 4 is LOW")
GPIO.add_event_detect(4, GPIO.BOTH, callback=my_callback, bouncetime=100)
event.wait()
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论