How can I impose server priority on a UDP client receiving from multiple servers on the same port

huangapple go评论60阅读模式
英文:

How can I impose server priority on a UDP client receiving from multiple servers on the same port

问题

以下是您要翻译的代码部分:

class App(tk.Tk):
    def __init__(self):
        super().__init__()
        self.title('UDP Client Test')
        # set up window close handler
        self.protocol('WM_DELETE_WINDOW', self.on_close)
     
        # display the received value
        self.data_label_var = tk.StringVar(self, 'No Data')
        self.data_label = ttk.Label(self, textvariable=self.data_label_var)
        self.data_label.pack()

        # server IP addresses (example)
        self.primary = '10.0.42.1'
        self.secondary = '10.0.42.2'

        self.port = 5555
        self.timeout = 2.0
        self.client_socket = self.get_client_socket(self.port, self.timeout)

        self.dq_loop = None  # placeholder for dequeue loop 'after' ID

        self.receiver_queue = Queue(maxsize=1)
        self.stop_event = Event()
        self.receiver_thread = Thread(
            name='status_receiver',
            target=self.receiver_worker,
            args=(
                self.client_socket,
                (self.primary, self.secondary),
                self.receiver_queue,
                self.stop_event
            )
        )

    def get_client_socket(self, port: int, timeout: float) -> skt.socket:
        """Set up a UDP socket bound to the given port"""
        client_socket = skt.socket(skt.AF_INET, skt.SOCK_DGRAM)
        client_socket.settimeout(timeout)
        client_socket.bind('', port)  # accept traffic on this port from any IP address
        return client_socket

    @staticmethod
    def receiver_worker(
        socket: skt.socket,
        addresses: tuple[str, str],
        queue: queue.Queue,
        stop_event: Event,
    ) -> None:
        """Thread worker that receives data over UDP and puts it in a lossy queue"""
        primary, secondary = addresses  # server IPs

        while not stop_event.is_set():  # loop until application exit...
            try:
                data, server = socket.recvfrom(1024)
                # here's where I'm having trouble - if traffic is coming in from both servers,
                # there's a good chance my frontend will just pick up data from both alternately
                # (and yes, I know these conditions do the same thing...for now)
                if server[0] == primary:
                    queue.put_nowait((data, server))
                elif server[0] == secondary:
                    queue.put_nowait((data, server))
                else:  # inbound traffic on the correct port, but from some other server
                    print('disregard...')
            except queue.Full:
                print('Queue full...')  # not a problem, just here in case...
            except skt.timeout:
                print('Timeout...')  # TODO

    def receiver_dequeue(self) -> None:
        """Periodically fetch data from the worker queue and update the UI"""
        try:
            data, server = self.receiver_queue.get_nowait()
        except queue.Empty:
            pass  # nothing to do
        else:  # update the label
            self.data_label_var.set(data.decode())
        finally:  # continue updating 10x / second
            self.dq_loop = self.after(100, self.receiver_dequeue)

    def on_close(self) -> None:
        """Perform cleanup tasks on application exit"""
        if self.dq_loop:
            self.after_cancel(self.dq_loop)
        self.stop_event.set()  # stop the receiver thread loop
        self.receiver_thread.join()
        self.client_socket.close()
        self.quit()

if __name__ == '__main__':
    app = App()
    app.mainloop()

希望这对您有所帮助!

英文:

I have a client application that is set to receive data from a given UDP port, and two servers (let's call them "primary" and "secondary") that are broadcasting data over that port.

I've set up a UDP receiver thread that uses a lossy queue to update my frontend. Lossy is okay here because the data are just status info strings, e.g. 'on'/'off', that I'm picking up periodically.

My desired behavior is as follows:

  • If the primary server is active and broadcasting, the client will accept data from the primary server only (regardless of data coming in from the secondary server)
  • If the primary server stops broadcasting, the client will accept data from the secondary server
  • If the primary server resumes broadcasting, don't cede to the primary unless the secondary server goes down (to prevent bouncing back and forth in the event that the primary sever is going in and out of failure)
  • If neither server is broadcasting, raise a flag

Currently the problem is that if both servers are broadcasting (which they will be most of the time), my client happily receives data from both and bounces back and forth between the two. I understand why this is happening, but I'm unsure how to stop it / work around it.

How can I structure my client to disregard data coming in from the secondary server as long as it's also getting data from the primary server?

NB - I'm using threads and queues here to keep my UDP operations from blocking my GUI

# EXAMPLE CLIENT APP
import queue
import socket as skt
import tkinter as tk
from threading import Event, Thread
class App(tk.Tk):
def __init__(self):
super().__init__()
self.title('UDP Client Test')
# set up window close handler
self.protocol('WM_DELETE_WINDOW', self.on_close)
# display the received value
self.data_label_var = tk.StringVar(self, 'No Data')
self.data_label = ttk.Label(self, textvariable=self.data_label_var)
self.data_label.pack()
# server IP addresses (example)
self.primary = '10.0.42.1'
self.secondary = '10.0.42.2'
self.port = 5555
self.timeout = 2.0
self.client_socket = self.get_client_socket(self.port, self.timeout)
self.dq_loop = None  # placeholder for dequeue loop 'after' ID
self.receiver_queue = Queue(maxsize=1)
self.stop_event = Event()
self.receiver_thread = Thread(
name='status_receiver',
target=self.receiver_worker,
args=(
self.client_socket,
(self.primary, self.secondary),
self.receiver_queue,
self.stop_event
)
)
def get_client_socket(self, port: int, timeout: float) -> skt.socket:
"""Set up a UDP socket bound to the given port"""
client_socket = skt.socket(skt.AF_INET, skt.SOCK_DGRAM)
client_socket.settimeout(timeout)
client_socket.bind('', port)  # accept traffic on this port from any IP address
return client_socket
@staticmethod
def receiver_worker(
socket: skt.socket,
addresses: tuple[str, str],
queue: queue.Queue,
stop_event: Event,
) -> None:
"""Thread worker that receives data over UDP and puts it in a lossy queue"""
primary, secondary = addresses  # server IPs
while not stop_event.is_set():  # loop until application exit...
try:
data, server = socket.recvfrom(1024)
# here's where I'm having trouble - if traffic is coming in from both servers,
# there's a good chance my frontend will just pick up data from both alternately
# (and yes, I know these conditions do the same thing...for now)
if server[0] == primary:
queue.put_nowait((data, server))
elif server[0] == secondary:
queue.put_nowait((data, server))
else:  # inbound traffic on the correct port, but from some other server
print('disredard...')
except queue.Full:
print('Queue full...')  # not a problem, just here in case...
except skt.timeout:
print('Timeout...')  # TODO
def receiver_dequeue(self) -> None:
"""Periodically fetch data from the worker queue and update the UI"""
try:
data, server = self.receiver_queue.get_nowait()
except queue.Empty:
pass  # nothing to do
else:  # update the label
self.data_label_var.set(data.decode())
finally:  # continue updating 10x / second
self.dq_loop = self.after(100, self.receiver_dequeue)
def on_close(self) -> None:
"""Perform cleanup tasks on application exit"""
if self.dq_loop:
self.after_cancel(self.dq_loop)
self.stop_event.set()  # stop the receiver thread loop
self.receiver_thread.join()
self.client_socket.close()
self.quit()
if __name__ == '__main__':
app = App()
app.mainloop()

My actual application is only slightly more complex than this, but the basic operation is the same: get data from UDP, use data to update UI...rinse and repeat

I suspect the changes need to be made to my receiver_worker method, but I'm not sure where to go from here. Any help is very much welcome and appreciated! And thanks for taking the time to read this long question!

Addendum: FWIW I did some reading about Selectors but I'm not sure how to go about implementing them in my case - if anybody can point me to a relevant example, that would be amazing

答案1

得分: 1

问题的核心是:如何确定给定的服务器是否真正“离线”,而不仅仅是“暂时休息”,例如由于瞬时的网络故障?

您的客户端实际上只知道它是否最近从某个源IP地址接收到任何UDP数据包,以某种合适的“最近”定义。因此,在您的客户端中,您可以在收到来自给定服务器的UDP数据包时,将与IP地址相关的成员变量更新为当前时间戳。然后,您可以创建一个类似以下伪代码的辅助方法:

def HowManyMillisecondsSinceTheLastUDPPacketWasReceivedFromServer(self, packetSourceIP):
{
    return current_timestamp_milliseconds() - self._lastPacketReceiveTimeStamp[packetSourceIP]
}

然后,例如,如果您知道您的服务器将每秒发送一个UDP数据包,您可以规定如果在过去的5秒内没有从它那里接收到任何UDP数据包,那么给定的服务器将被正式视为“离线”。(当然,您可以根据需要选择自己的数字)

然后,在接收数据包并更新相应的服务器时间戳成员变量之后,您还可以更新一个成员变量,指示哪个服务器现在是“活动服务器”(即,您当前应该监听的服务器):

def UpdateActiveServer(self)
{
    millisSincePrimary = HowManyMillisecondsSinceTheLastUDPPacketWasReceivedFromServer(self._primaryServerIP)
    millisSinceSecondary = HowManyMillisecondsSinceTheLastUDPPacketWasReceivedFromServer(self, _secondaryServerIP)

    serverOfflineMillis = 5*1000  // 5
    primaryIsOffline = (millisSincePrimary >= serverOfflineMillis)
    secondaryIsOffline = (millisSinceSecondary >= serverOfflineMillis)

    if ((primaryIsOffline) and (not secondaryIsOffline)):
       self._usePacketsFromSecondaryServer = true
    if ((secondaryIsOffline) and (not primaryIsOffline)):
       self._usePacketsFromSecondaryServer = false
}

然后,您的代码的其余部分可以检查self._usePacketsFromSecondaryServer的当前值,以决定要监听哪些传入UDP数据包以及哪些要忽略(伪代码):

def PacketReceived(whichServer):
   if ((whichServer == self._primaryServerIP) and (not self._usePacketsFromSecondaryServer)) or ((whichServer == self._secondaryServerIP) and (self._usePacketsFromSecondaryServer)):
      # 解析和使用UDP数据包的代码在此
英文:

The core of the problem is: how do you determine that a given server is really offline as opposed to just temporarily taking a break, e.g. due to a momentary network glitch?

All your client really knows is whether it has received any UDP packets from a given source IP address recently or not, for some well-chosen definition of "recently". So what you can do in your client is update a per-IP-address member-variable to the current timestamp, whenever you receive a UDP packet from a given server. Then you can have a helper method like this (pseudocode):

def HowManyMillisecondsSinceTheLastUDPPacketWasReceivedFromServer(self, packetSourceIP):
{
return current_timestamp_milliseconds() - self._lastPacketReceiveTimeStamp[packetSourceIP]
}

Then e.g. if you know that your servers will be sending out a UDP packet once per second, you can decree that a given server is officially considered "offline" if you haven't received any UDP packets from it within the last 5 seconds. (Choose your own numbers here to suit, of course)

Then after you receive a packet and update the corresponding server-timestamp-member-variable, you can also update a member-variable indicating which server is the now the "active server" (i.e. the server you should currently be listening to):

def UpdateActiveServer(self)
{
millisSincePrimary = HowManyMillisecondsSinceTheLastUDPPacketWasReceivedFromServer(self._primaryServerIP)
millisSinceSecondary = HowManyMillisecondsSinceTheLastUDPPacketWasReceivedFromServer(self, _secondaryServerIP)
serverOfflineMillis = 5*1000  // 5 seconds
primaryIsOffline = (millisSincePrimary >= serverOfflineMillis)
secondaryIsOffline = (millisSinceSecondary >= serverOfflineMillis)
if ((primaryIsOffline) and (not secondaryIsOffline)):
self._usePacketsFromSecondaryServer = true
if ((secondaryIsOffline) and (not primaryIsOffline)):
self._usePacketsFromSecondaryServer = false
}

.... then the rest of your code can check the current value of self._usePacketsFromSecondaryServer to decide which incoming UDP packets to listen to and which ones to ignore (pseudocode):

def PacketReceived(whichServer):
if ((whichServer == self._primaryServerIP) and (not self._usePacketsFromSecondaryServer)) or ((whichServer == self._secondaryServerIP) and (self._usePacketsFromSecondaryServer)):
# code to parse and use UDP packet goes here

huangapple
  • 本文由 发表于 2023年2月9日 02:07:00
  • 转载请务必保留本文链接:https://go.coder-hub.com/75390001.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定