使用OpenCV和多进程,我如何将一个Python脚本中的变量信息传递给另一个?

huangapple go评论67阅读模式
英文:

Using OpenCV and multiprocessing, how can I feed information from a variable in one Python script to another?

问题

我正在尝试并行运行两个脚本并将一个脚本的输出传递给另一个脚本。

首先,我训练了一个模型来解码不同的手势。我按照这个教程进行操作:https://www.youtube.com/watch?v=yqkISICHH-U

该脚本打开网络摄像头并解码我所做的手势,当连续解码相同的动作3次时(称为mvt_ok),它创建一个新的变量。在这个时候,我希望将信息发送给另一个脚本,这将是在psychopy上开发的实验任务(一种用于制作心理学实验的Python工具)。基本上,一旦第一个脚本(使用网络摄像头的手势检测)向第二个脚本提供信息,我希望为第二个脚本(psychopy任务)呈现另一个刺激。

总结一下,我希望打开视频,然后启动脚本(psychopy)并呈现第一个刺激,然后视频中应该检测到一个动作。这些信息应该传递给psychopy脚本以更改刺激。

到目前为止,我离实现这个目标还很远,我只能够使用以下函数将运动信息发送到另一个脚本:

def f(child_conn, mvt_ok):
    print(mvt_ok)

实际上,我不确定如何重复使用mvt_ok变量将其传递给我的psychopy脚本。

我不会将处理手势识别的所有代码行都贴出来,因为可能太长了,但最关键的部分在这里:(请注意,由于缺少缩进,某些行可能无法正确解析)

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    sentence = []

    while cap.isOpened(): 
        # ... 省略部分代码 ...
        
        if np.max(detections['detection_scores']) > 0.95:
            # ... 省略部分代码 ...
            
            if sentence[-1] == sentence[-2] and sentence[-1] == sentence[-3]:
                print('ok')
                mvt_ok = 1
                p = Process(target=f, args=(child_conn, mvt_ok))
                p.start()
                p.join()
            
        # ... 省略部分代码 ...

如果你有关于如何将mvt_ok传递给psychopy脚本的问题,可以提出。

英文:

I am trying to have 2 scripts running in parallel and feeding one with the other.

First I trained a model to decode different gestures. I followed the tutorial right here: https://www.youtube.com/watch?v=yqkISICHH-U

That script opens the webcam and decodes the gestures I am doing, and create a new variable when the same movement is decoded 3 consecutive times (called mvt_ok). At that time I wish to send the information to another script that will be an experimental task develloped on psychopy (a python tool to make psychology experiments). Basically, as soon as the first script (gestures detection with the webcam) feeds the second one, I want to present another stimulus for the second one (psychopy task).

To summarise, I wish to open the video, then start the script (psychopy) and present the first simulus, then a movement is expected to be detected with the video. This information should be fed to the psychopy script to change stimulus.

So far I am really far of doing that and I have just been able to send movement ok to another script with a function such as the one following:

def f(child_conn,mvt_ok):
   
    print(mvt_ok)

Actually I am not sure how I could reuse the mvt_ok variable to feed it to the my psychopy script.

I won't put all the lines for the part handling the gesture recognition because it is maybe too long but the most crucial ones are here:

if __name__ == '__main__':
    parent_conn,child_conn = Pipe()
    sentence = []

    while cap.isOpened(): 
        ret, frame = cap.read()
        image_np = np.array(frame)
        
        input_tensor = tf.convert_to_tensor(np.expand_dims(image_np, 0), dtype=tf.float32)
        detections = detect_fn(input_tensor)
        
        num_detections = int(detections.pop('num_detections'))
        detections = {key: value[0, :num_detections].numpy()
                      for key, value in detections.items()}
        detections['num_detections'] = num_detections
    
        # detection_classes should be ints.
        detections['detection_classes'] = detections['detection_classes'].astype(np.int64)
    
        label_id_offset = 1
        image_np_with_detections = image_np.copy()
    
        viz_utils.visualize_boxes_and_labels_on_image_array(
                    image_np_with_detections,
                    detections['detection_boxes'],
                    detections['detection_classes']+label_id_offset,
                    detections['detection_scores'],
                    category_index,
                    use_normalized_coordinates=True,
                    max_boxes_to_draw=5,
                    min_score_thresh=.8,
                    agnostic_mode=False)
        
        cv2.imshow('object detection',  cv2.resize(image_np_with_detections, (800, 600)))
        
        if np.max(detections['detection_scores'])>0.95:
            word = category_index[detections['detection_classes'][np.argmax(detections['detection_scores'])]+1]['name']
            sentence.append(word)
            
            if len(sentence)>=3:
                if sentence[-1]==sentence[-2] and sentence[-1]==sentence[-3]:
                    print('ok')
                    mvt_ok=1
                    p = Process(target=f, args=(child_conn,mvt_ok))
                    p.start()
                    p.join()
            
        
        
        if cv2.waitKey(10) & 0xFF == ord('q'):
            cap.release()
            cv2.destroyAllWindows()
            break

答案1

得分: 0

以下是翻译的内容:

使用纯Python的一种方法是:

  • 设置一个多进程管理器作为一个单独的,始终运行的进程。
  • 让摄像头和刺激连接到它,并使用它提供一个Queue(),通过它可以更简单地发送Python对象,而不是通过套接字。

这里有一个很好的例子这里

另一种可能更简单的选择是MQTT。您可以安装mosquitto作为MQTT代理。然后,您的摄像头可以“发布”检测事件,刺激可以“订阅”检测事件并得到通知。反之亦然。MQTT允许多兆字节的消息,所以如果需要大消息,我建议使用它。

视频采集端的代码可能如下所示:

#!/usr/bin/env python3

# 需要:
# 在某处运行mosquitto代理
# pip install paho-mqtt

from time import sleep
import paho.mqtt.client as mqtt
import json

if __name__ == "__main__":
   # 获取设置
   with open('settings.json') as f:
      settings = json.load(f)
      host = settings['MQTThost']
      port = settings['MQTTport']
      topic = settings['MQTTtopic']

   print(f'DEBUG: 连接到MQTT服务器 {host}:{port}')
   client = mqtt.Client()
   client.connect(host, port, 60)
   print('DEBUG: 连接成功')

   # 获取一些视频帧并每5帧发布一个虚拟检测事件
   for i in range(100):
      print(f'DEBUG: 获取帧 {i}')

      if i % 5 == 0:
         # 发布虚拟检测事件
         client.publish(topic, "检测事件")
         print('DEBUG: 检测事件')

      sleep(1)

刺激端的代码可能如下所示:

#!/usr/bin/env python3

# 需要:
# 在某处运行mosquitto代理
# pip install paho-mqtt

from time import sleep
import paho.mqtt.client as mqtt
import json

def on_connect(client, userdata, flags, rc):
   """
   连接到代理成功后调用此函数。
   """
   print(f'DEBUG: 连接成功,返回码 {rc}')
   client.subscribe(topic)

def on_message(client, userdata, msg):
   """
   每次在指定主题上发布消息时都会调用此函数。
   """
   message = msg.payload.decode()
   print(f'收到消息: {message}')

if __name__ == "__main__":
   # 获取设置
   with open('settings.json') as f:
      settings = json.load(f)
      host = settings['MQTThost']
      port = settings['MQTTport']
      topic = settings['MQTTtopic']

   print(f'DEBUG: 连接到MQTT服务器 {host}:{port}')
   client = mqtt.Client()
   client.on_connect = on_connect
   client.on_message = on_message
   client.connect(host, port, 60)

   # 永久等待消息
   client.loop_forever()

我使用了一个包含以下内容的settings.json文件:

{
  "MQTThost" : "localhost",
  "MQTTport" : 1883,
  "MQTTtopic": "/detections"
}

Redis也支持发布/订阅,它简单、快速且轻量级。代码结构与MQTT的类似。您还可以在Redis中共享变量、列表、原子整数或集合。

如果您不想传递大的、复杂的Python对象,还可以使用进程之间的简单UDP消息。如果两个进程位于同一主机上,那么这应该非常可靠,可能允许每个消息传输1KB左右的数据。这是纯Python,不需要额外的包、模块或服务器。

视频采集端的代码可能如下所示:

#!/usr/bin/env python3

from time import sleep
import json
import socket

if __name__ == "__main__":
   # 获取设置
   with open('settings2.json') as f:
      settings = json.load(f)
      host = settings['host']
      port = settings['port']

   print(f'DEBUG: 创建UDP套接字,连接到 {host}:{port}')
   sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
   address = (host, port)

   # 获取一些视频帧并每5帧发布一个虚拟检测事件
   for i in range(100):
      print(f'DEBUG: 获取帧 {i}')

      if i % 5 == 0:
         # 发布虚拟检测事件
         sock.sendto(b'检测事件', address)
         print('DEBUG: 检测事件')

      sleep(1)

刺激端的代码可能如下所示:

#!/usr/bin/env python3

import json
import socket

if __name__ == "__main__":
   # 获取设置
   with open('settings2.json') as f:
      settings = json.load(f)
      host = settings['host']
      port = settings['port']

   print(f'DEBUG: 在 {host}:{port} 上建立监听')
   # 创建一个UDP套接字
   sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
   # 将套接字绑定到端口
   sock.bind((host, port))
   print('DEBUG: 监听中')

   while True:
      # 等待消息
      message, _ = sock.recvfrom(4096)
      print(f'DEBUG: 收到消息 {message}')

我使用了一个包含以下内容的settings2.json文件:

{
  "host" : "localhost",
  "port" : 50000
}

另一种纯Python的方法是使用多进程连接。如果使用此方法,您需要首先启动stimulus进程,或者至少是在其中放置监听器的进程。请注意,使用此技术可以发送Python对象,只需更改为conn.send(SOMEDICT or ARRAY or LIST)

视频采集端的代码可能如

英文:

One way to do this is with pure Python is to:

  • set up a multiprocessing manager as a separate, always running process
  • let webcam and stimulus connect to it and use it to provide a Queue() that Python objects can be sent through much more simply than via sockets

There is a very good example here.


Another, possibly easier, option is MQTT. You could install mosquitto as an MQTT broker. Then your webcam can "publish" detection events and your stimulus can "subscribe" to detections and get notified. And vice versa. MQTT allows for multi-megabyte messages so if big messages are needed, I would recommend it.

The code for the video acquisition end might look like this:

#!/usr/bin/env python3

# Requires:
# mosquitto broker to be running somewhere
# pip install paho-mqtt

from time import sleep
import paho.mqtt.client as mqtt
import json

if __name__ == "__main__":
   # Get settings
   with open('settings.json') as f:
      settings = json.load(f)
      host = settings['MQTThost']
      port = settings['MQTTport']
      topic= settings['MQTTtopic']

   print(f'DEBUG: Connecting to MQTT on {host}:{port}')
   client = mqtt.Client()
   client.connect(host,port,60)
   print(f'DEBUG: Connected')

   # Grab some video frames and publish a dummy detection every 5 frames
   for i in range(100):
      print(f'DEBUG: Grabbing frame {i}')
    
      if i%5 == 0:
         # Publish dummy detection
         client.publish(topic, "Detection event")
         print(f'DEBUG: Detection event')

      sleep(1)

And for the stimulus end, it might look like this:

#!/usr/bin/env python3

# Requires:
# mosquitto broker to be running somewhere
# pip install paho-mqtt

from time import sleep
import paho.mqtt.client as mqtt
import json

def on_connect(client, userdata, flags, rc):
   """
   This function is called on successful connection to the broker.
   """
   print(f'DEBUG: Connected with result code {rc}')
   client.subscribe(topic)

def on_message(client, userdata, msg):
   """
   This function is called every time a message is published on
   the specified topic.
   """
   message = msg.payload.decode()
   print(f'Message received: {message}')

if __name__ == "__main__":
   # Get settings
   with open('settings.json') as f:
      settings = json.load(f)
      host = settings['MQTThost']
      port = settings['MQTTport']
      topic= settings['MQTTtopic']

   print(f'DEBUG: Connecting to MQTT on {host}:{port}')
   client = mqtt.Client()
   client.on_connect = on_connect
   client.on_message = on_message
   client.connect(host,port,60)

   # Wait forever for messages
   client.loop_forever()

And I used a settings.json containing this:

{
  "MQTThost" : "localhost",
  "MQTTport" : 1883,
  "MQTTtopic": "/detections"
}

Redis also supports pub/sub and is simple, fast and lightweight. The could would be structured very similarly to that above for MQTT. You can also just share a variable, or a list, or an atomic integer, or a set with Redis.


You could also use a simple UDP message between the processes if you don't want to pass big, complicated Python objects. It should be very reliable if both processes are on the same host and probably will allow up to 1kB or so of data per message. This is pure Python with no extra packages or modules or servers being needed.

The video acquisition might look like this:

#!/usr/bin/env python3

from time import sleep
import json
import socket

if __name__ == "__main__":
   # Get settings
   with open('settings2.json') as f:
      settings = json.load(f)
      host = settings['host']
      port = settings['port']

   print(f'DEBUG: Creating UDP socket, connecting to {host}:{port}')
   sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
   address = (host, port)

   # Grab some video frames and emit a dummy detection every 5 frames
   for i in range(100):
      print(f'DEBUG: Grabbing frame {i}')
    
      if i%5 == 0:
         # Emit dummy detection
         sock.sendto(b'Event detected', address)
         print(f'DEBUG: Detection event')

      sleep(1)

And the stimulus code might look like this:

#!/usr/bin/env python3

import json
import socket

if __name__ == "__main__":
   # Get settings
   with open('settings2.json') as f:
      settings = json.load(f)
      host = settings['host']
      port = settings['port']

   print(f'DEBUG: Establishing listener on {host}:{port}')
   # Create a UDP socket
   sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
   # Bind the socket to the port
   sock.bind((host, port))
   print(f'DEBUG: Listening')

   while True:
      # Wait for message
      message, _ = sock.recvfrom(4096)
      print(f'DEBUG: Received {message}')

And I used a settings2.json containing this:

{
  "host" : "localhost",
  "port" : 50000
}

Another, pure Python way of doing this is with a multiprocessing connection. You would need to start the stimulus process first if using this method - or at least the process in which you put the listener. Note that you can send Python objects using this technique, just change to conn.send(SOMEDICT or ARRAY or LIST)

The video acquisition might look like this:

#!/usr/bin/env python3

import time
import json
from multiprocessing.connection import Client

if __name__ == "__main__":
   # Get settings
   with open('settings2.json') as f:
      settings = json.load(f)
      host = settings['host']
      port = settings['port']
      auth = settings['auth']

   print(f'DEBUG: Creating client, connecting to {host}:{port}')
   with Client((host, port), authkey=auth.encode()) as conn:

      # Grab some video frames and emit a dummy detection every 5 frames
      for i in range(100):
         print(f'DEBUG: Grabbing frame {i}')
    
         if i%5 == 0:
            # Emit dummy detection with current time
            conn.send('Detection event')
            print(f'DEBUG: Detection event')

         time.sleep(1)

And the stimulus end might look like this:

#!/usr/bin/env python3

import json
import time
from multiprocessing.connection import Listener

if __name__ == "__main__":
   # Get settings
   with open('settings2.json') as f:
      settings = json.load(f)
      host = settings['host']
      port = settings['port']
      auth = settings['auth']

   print(f'DEBUG: Establishing listener on {host}:{port}')
   with Listener((host,port), authkey=auth.encode()) as listener:
    with listener.accept() as conn:
        print(f'DEBUG: connection accepted from', listener.last_accepted)
        while True:
           # Wait for message
           message = conn.recv()
           print(f'DEBUG: Event received {message}')

And your settings2.json would need to look like this:

{
  "host" : "localhost",
  "port" : 50000,
  "auth" : "secret"
}

Some of the above ideas are very similar the examples I gave in this answer although the purpose is slightly different.


None of these methods are locking/blocking, so you can happily run one program without the other needing to be running.

huangapple
  • 本文由 发表于 2023年6月1日 04:45:29
  • 转载请务必保留本文链接:https://go.coder-hub.com/76377153.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定