你如何从 GPIO.wait_for_edge 中退出?

huangapple go评论64阅读模式
英文:

How can I escape from a GPIO.wait_for_edge?

问题

I have this code:

while True:
  try:
    Stat1 = GPIO.input(WPIN)
    time.sleep(WARNST)
    Stat2 = GPIO.input(WPIN)
    if Stat1 == Stat2: SendWarnStat(Stat2)
    Event = None
    while Event != WPIN: Event = GPIO.wait_for_edge(WPIN, GPIO.BOTH)
  except:
    break

Is there a way to make my script exit when I press Ctrl+C or the system asks it to terminate? It seems that everything is blocked by the wait_for_edge function. I want to send some messages when the state of the pin is changed, and I want to do this without bothering the CPU constantly checking that pin, so I have to use the wait function. Can I move the wait function into a thread and wait for multiple objects, one of them being a flag that is switched from the main thread? I am very new to Python.

英文:

I have this code:

while True:
  try:
    Stat1 = GPIO.input(WPIN)
    time.sleep(WARNST)
    Stat2 = GPIO.input(WPIN)
    if Stat1 == Stat2: SendWarnStat(Stat2)
    Event = None
    while Event != WPIN: Event = GPIO.wait_for_edge(WPIN, GPIO.BOTH)
  except:
    break

Is there a way to make my script exist when I press Ctrl+C or the system ask it to terminate ? It seems that everything is blocked by the wait_for_edge function... I want to send some messages when the state of the pin is changed, and I want to do this without bother the CPU constantly checking that pin, so I have to use the wait function... Can I move the wait function in a thread and wait for multiple objects, one of them being a flag that is swiched from the main thread ? I am verry new to Python...

答案1

得分: 1

如在维基中描述的那样,您可以通过使用线程化的非阻塞 add_event_detect + add_event_callback 来替代阻塞的 wait_for_edge

引用:

RPi.GPIO 为回调函数运行第二个线程。这意味着回调函数可以与主程序同时运行,以立即响应边缘事件。

示例代码(未经测试)

# 定义事件的回调函数
def do_something(channel):
    print(f"检测到事件的通道 {channel}!")

# 添加事件触发并在循环之外链接回调函数
GPIO.add_event_detect(WPIN, GPIO.BOTH)
GPIO.add_event_callback(WPIN, do_something)

# 您的代码
while True:
  try:
    Stat1 = GPIO.input(WPIN)
    time.sleep(WARNST)
    Stat2 = GPIO.input(WPIN)
    if Stat1 == Stat2:
        SendWarnStat(Stat2)
  except:
    break

然后,您可以直接从代码的任何地方调用此函数,或者通过定义的事件触发它。

英文:

As described in a wiki, you could replace the blocking wait_for_edge by a threaded non-blocking add_event_detect + add_event_callback

Quote:

> RPi.GPIO runs a second thread for callback functions. This means that callback functions can be run at the same time as your main program, in immediate response to an edge.

Exemplary code (untested)

# define callback for event
def do_something(channel):
    print(f"Channel {channel} detected an event!")

# add event trigger and link callback outside of loop
GPIO.add_event_detect(WPIN, GPIO.BOTH)
GPIO.add_event_callback(WPIN, do_something)

# your code
while True:
  try:
    Stat1 = GPIO.input(WPIN)
    time.sleep(WARNST)
    Stat2 = GPIO.input(WPIN)
    if Stat1 == Stat2: SendWarnStat(Stat2)
  except:
    break

You then can either call this function directly from any place in your code, or trigger it with the defined event.

答案2

得分: 0

以下是翻译好的部分:

这是我解决问题的方法...

callback_thread,尽管看起来没有做任何事情,但必须存在,以便可以异步执行my_callback,而不是在主线程中执行。我不理解它们之间的关联,但这是AI告诉我的。

import RPi.GPIO as GPIO
import threading
import time
import signal
import sys

def my_callback(channel):
  global OldStat
  time.sleep(3)
  NewStat = GPIO.input(channel)
  if NewStat != OldStat:
    if NewStat == True: print("通道 {} 高电平".format(channel))
    else: print("通道 {} 低电平".format(channel))
    OldStat = NewStat

def callback_thread(event):
  event.wait()

def Exit_Handler(signum, frame):
  event.set()
  GPIO.cleanup()
  thread.join()
  sys.exit(0)

signal.signal(signal.SIGTERM, Exit_Handler)
signal.signal(signal.SIGINT, Exit_Handler)

event = threading.Event()
thread = threading.Thread(target=callback_thread, args=(event,))
thread.start()

GPIO.setmode(GPIO.BCM)
GPIO.setup(4, GPIO.IN, pull_up_down=GPIO.PUD_UP)
OldStat = GPIO.input(4)
if OldStat == True: print("通道 4 高电平")
else: print("通道 4 低电平")
GPIO.add_event_detect(4, GPIO.BOTH, callback=my_callback, bouncetime=100)
event.wait()
英文:

This is how I solved the problem...

The callback_thread, although it doesn't seem to do anything, it must be present so that my_callback can be executed asynchronously, not in the main thread. I don't understand what is the connection between them, but that's what AI told me.

import RPi.GPIO as GPIO
import threading
import time
import signal
import sys

def my_callback(channel):
  global OldStat
  time.sleep(3)
  NewStat = GPIO.input(channel)
  if NewStat != OldStat:
    if NewStat == True: print("Channel {} is HIGH".format(channel))
    else: print("Channel {} is LOW".format(channel))
    OldStat = NewStat

def callback_thread(event):
  event.wait()

def Exit_Handler(signum, frame):
  event.set()
  GPIO.cleanup()
  thread.join()
  sys.exit(0)

signal.signal(signal.SIGTERM, Exit_Handler)
signal.signal(signal.SIGINT, Exit_Handler)

event = threading.Event()
thread = threading.Thread(target=callback_thread, args=(event,))
thread.start()

GPIO.setmode(GPIO.BCM)
GPIO.setup(4, GPIO.IN, pull_up_down=GPIO.PUD_UP)
OldStat = GPIO.input(4)
if OldStat == True: print("Channel 4 is HIGH")
else: print("Channel 4 is LOW")
GPIO.add_event_detect(4, GPIO.BOTH, callback=my_callback, bouncetime=100)
event.wait()

huangapple
  • 本文由 发表于 2023年3月20日 22:43:46
  • 转载请务必保留本文链接:https://go.coder-hub.com/75791727.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定