如何使用Tkinter显示动态字符串

huangapple go评论60阅读模式
英文:

How do you display a dynamic string with Tkinter

问题

以下是您要翻译的内容:

"I have what I hope to be a fairly simple issue to solve, but I've not managed to get it work as I want with my limited knowledge.

I have a C# script piping in a string (a single word) to my python code pasted below. I originally had this code displaying the string on a small OLED display every-time a new string was received (I have now removed this function form the code). I am now wanting to change this so that the string is displayed graphically using Tkinter on a window on my monitor.

My effort below has a window open when a string (message.decode()) is received with the word displayed, but it does not change, it just hangs. Is anyone able to advise on what I need to do to make this automatically change in the same window?"

注意:代码部分不被翻译,只提供翻译后的文本。

英文:

I have what I hope to be a fairly simple issue to solve, but I've not managed to get it work as I want with my limited knowledge.

I have a C# script piping in a string (a single word) to my python code pasted below. I originally had this code displaying the string on a small OLED display every-time a new string was received (I have now removed this function form the code). I am now wanting to change this so that the string is displayed graphically using Tkinter on a window on my monitor.

My effort below has a window open when a string (message.decode()) is received with the word displayed, but it does not change, it just hangs. Is anyone able to advise on what I need to do to make this automatically change in the same window?

from board import SCL, SDA
import busio
from PIL import Image, ImageDraw, ImageFont
import adafruit_ssd1306
import socket
import sys
import struct

# importing whole module
from tkinter import *
from tkinter.ttk import *

# importing strftime function to
# retrieve system's time
from time import strftime

# Create the I2C interface.
i2c = busio.I2C(SCL, SDA)

# Create a UD socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)

# Connect the socket to the port where the server is listening
server_address = '/tmp/CoreFxPipe_testpipe'
print('connecting to {}'.format(server_address))

try:
    sock.connect(server_address)
except socket.error as msg:
    print(msg)
    sys.exit(1)

try:
    word = ""
    while True:
        # amount_expected = struct.unpack('I', sock.recv(4))[0]
        # print("amount_expected :", amount_expected)

        message = sock.recv(128)
        if message:
            print("Received message : ", message.decode())  # making it a string


            class Window(Frame):
                def __init__(self, master=None):
                    Frame.__init__(self, master)
                    self.master = master
                    self.pack(fill=BOTH, expand=1)

                    text = Label(self, text=message.decode())
                    text.place(x=70, y=90)


            root = Tk()
            app = Window(root)
            root.wm_title("tkinter window")
            root.geometry("200x200")
            root.mainloop()




finally:
    print('closing socket...')
    sock.close()


答案1

得分: 1

以下是代码的翻译部分:

你可以在单独的 Thread 中运行 socket 消费者(接收者),将它接收到的消息放入一个 Queue 中,然后使用 tkinter 的 after 方法在 tkinter 事件循环中消费消息。下面是一个示例:

...  # 为了简洁起见,我略去了你的其他导入部分
import tkinter as tk
import tkinter.messagebox as tkm
from queue import Queue, Full, Empty
from threading import Thread, Event
from tkinter import ttk

class Root(tk.Tk):
    def __init__(self):
        super().__init__()  # 初始化 Tk
        self.title('我的应用!')
        self.geometry('200x200')
        # 告诉应用在关闭时要执行什么操作
        self.protocol('WM_DELETE_WINDOW', self.on_close)
        # 这个队列将消息从工作线程传递到主 UI
        self.message_queue = Queue(maxsize=1) 
        # 这个事件标志是我们在退出时停止线程工作的方式
        self.stop_event = Event()  
        self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        self.text = ttk.Label(self, text='等待消息中...')
        self.text.pack(expand=True, fill='both')
        self.consumer_loop_id = None  # 用于消费者 'after' 的占位符
        # 这个设置了消费者循环检查更新的速度
        self.loop_rate_ms = 100

        try:
            self.sock.connect(server_address)
        except socket.error as msg:
            tkm.showerror('发生错误', msg)
            self.on_close()  # 清理并退出
        else:
            self.start_socket_worker()  # 让我们开始吧!
            self.message_consumer()  # 启动消费者循环

    # 这个函数将在自己的线程中运行
    @staticmethod
    def socket_worker(sock, message_queue, stop_event):
        """接收消息并将其放入不可阻塞队列中"""
        # 一直运行,直到设置了 stop_event 标志
        while not stop_event.is_set():
            try:
                message = sock.recv(128).decode()
                # 将消息放入队列而不阻塞
                message_queue.put_nowait(message)
            except Full:  # 队列已满,上一条消息还没有被消费...
                pass  # 太糟糕了,下次再试吧
            except socket.error:
                stop_event.set()  # 退出

    def start_socket_worker(self):
        """启动工作线程"""
        self.worker_thread = Thread(
            name='消息工作线程',  # 不是必需的,但有助于调试!
            target=self.socket_worker,
            args=(self.sock, self.message_queue, self.stop_event,)
        )
        self.worker_thread.start()

    def message_consumer(self):
        """从队列中获取消息并更新 UI"""
        try:
            message = self.message_queue.get_nowait()
        except Empty:
            pass  # 没有消息,无事可做
        else:
            self.text.config(text=message)  # 使用新消息更新标签
        finally:
            self.consumer_loop_id = self.after(
                self.loop_rate_ms,
                self.message_consumer  # 再次调用此方法以继续循环!
            )
    
    def on_close(self):
        if self.consumer_loop_id:
            self.after_cancel(self.consumer_loop_id)  # 停止消费者循环
        if not self.stop_event.is_set():
            self.stop_event.set()  # 停止工作线程
        
        # 注意:对于非守护线程,'join' 是不必要的!
        # if self.worker_thread.is_alive():
            # self.worker_thread.join()

        self.sock.close()
        self.quit()

if __name__ == '__main__':
    app = Root()
    app.mainloop()  # 启动应用

希望这能帮助你理解代码的逻辑。如果有任何问题,请随时提问。

英文:

You could run the socket consumer (receiver) in a separate Thread and put the message it receives into a Queue, then consume the message in the tkinter event loop using tkinter's after method. Here's an example:

...  # I'm leaving out your other imports for brevity's sake
import tkinter as tk
import tkinter.messagebox as tkm
from queue import Queue, Full, Empty
from threading import Thread, Event
from tkinter import ttk
class Root(tk.Tk):
def __init__(self):
super().__init__()  # initialze Tk
self.title('My App!')
self.geometry('200x200')
# tell the app what to do when closing
self.protocol('WM_DELETE_WINDOW', self.on_close)
# this queue passes the msg from the worker to the main UI
self.message_queue = Queue(maxsize=1) 
# this event flag is how we'll stop the thread worker on exit
self.stop_event = Event()  
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.text = ttk.Label(self, text='Waiting for a message...')
self.text.pack(expand=True, fill='both')
self.consumer_loop_id = None  # placeholder for the consumer 'after' ID
# this sets how quickly you want the consumer loop to check for updates
self.loop_rate_ms = 100
try:
self.sock.connect(server_address)
except socket.error as msg:
tkm.showerror('An Error Occurred', msg)
self.on_close()  # clean up and quit
else:
self.start_worker_thread()  # let's go!
self.message_consumer()  # start the consumer loop
# this function will get run in its own thread
@staticmethod
def socket_worker(sock, message_queue, stop_event):
"""Receive messages and put them into a lossy queue"""
# keep running until the stop_event flag is set
while not stop_event.is_set():  
try:
message = sock.recv(128).decode()
# put msg in queue without blocking
message_queue.put_nowait(message)  
except Full:  # queue is full, last message hasn't been consumed yet...
pass  # too bad, better luck next time
except socket.error:
stop_event.set()  # bail
def start_socket_worker(self): 
"""Launch the worker thread"""
self.worker_thread = Thread(
name='Message Worker',  # not necessary, but helpful for debugging!
target=self.socket_worker,
args=(self.sock, self.message_queue, self.stop_event,)
)
self.worker_thread.start()
def message_consumer(self):
"""Get messages from the queue and update the UI"""
try:
message = self.message_queue.get_nowait()
except Empty:
pass  # no message, nothing to do
else: 
self.text.config(text=message)  # update the label with the new message
finally:
self.consumer_loop_id = self.after(
self.loop_rate_ms,
self.message_consumer  # call this method again to keep looping!
)
def on_close(self):
if self.consumer_loop_id:
self.after_cancel(consumer_loop_id)  # stop the consumer loop
if not self.stop_event.is_set():
self.stop_event.set()  # stop the worker thread
# NOTE: apparently 'join' is unnecessary for non-daemonic threads!
# if self.worker_thread.is_alive():
# self.worker_thread.join()
self.sock.close()
self.quit()
if __name__ == '__main__':
app = Root()
app.mainloop()  # start the app
</details>

huangapple
  • 本文由 发表于 2023年3月31日 17:40:07
  • 转载请务必保留本文链接:https://go.coder-hub.com/75896989.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定