How can I impose server priority on a UDP client receiving from multiple servers on the same port

huangapple go评论53阅读模式

How can I impose server priority on a UDP client receiving from multiple servers on the same port



class App(tk.Tk):
    def __init__(self):
        self.title('UDP Client Test')
        # set up window close handler
        self.protocol('WM_DELETE_WINDOW', self.on_close)
        # display the received value
        self.data_label_var = tk.StringVar(self, 'No Data')
        self.data_label = ttk.Label(self, textvariable=self.data_label_var)

        # server IP addresses (example)
        self.primary = ''
        self.secondary = ''

        self.port = 5555
        self.timeout = 2.0
        self.client_socket = self.get_client_socket(self.port, self.timeout)

        self.dq_loop = None  # placeholder for dequeue loop 'after' ID

        self.receiver_queue = Queue(maxsize=1)
        self.stop_event = Event()
        self.receiver_thread = Thread(
                (self.primary, self.secondary),

    def get_client_socket(self, port: int, timeout: float) -> skt.socket:
        """Set up a UDP socket bound to the given port"""
        client_socket = skt.socket(skt.AF_INET, skt.SOCK_DGRAM)
        client_socket.bind('', port)  # accept traffic on this port from any IP address
        return client_socket

    def receiver_worker(
        socket: skt.socket,
        addresses: tuple[str, str],
        queue: queue.Queue,
        stop_event: Event,
    ) -> None:
        """Thread worker that receives data over UDP and puts it in a lossy queue"""
        primary, secondary = addresses  # server IPs

        while not stop_event.is_set():  # loop until application exit...
                data, server = socket.recvfrom(1024)
                # here's where I'm having trouble - if traffic is coming in from both servers,
                # there's a good chance my frontend will just pick up data from both alternately
                # (and yes, I know these conditions do the same thing...for now)
                if server[0] == primary:
                    queue.put_nowait((data, server))
                elif server[0] == secondary:
                    queue.put_nowait((data, server))
                else:  # inbound traffic on the correct port, but from some other server
            except queue.Full:
                print('Queue full...')  # not a problem, just here in case...
            except skt.timeout:
                print('Timeout...')  # TODO

    def receiver_dequeue(self) -> None:
        """Periodically fetch data from the worker queue and update the UI"""
            data, server = self.receiver_queue.get_nowait()
        except queue.Empty:
            pass  # nothing to do
        else:  # update the label
        finally:  # continue updating 10x / second
            self.dq_loop = self.after(100, self.receiver_dequeue)

    def on_close(self) -> None:
        """Perform cleanup tasks on application exit"""
        if self.dq_loop:
        self.stop_event.set()  # stop the receiver thread loop

if __name__ == '__main__':
    app = App()



I have a client application that is set to receive data from a given UDP port, and two servers (let's call them "primary" and "secondary") that are broadcasting data over that port.

I've set up a UDP receiver thread that uses a lossy queue to update my frontend. Lossy is okay here because the data are just status info strings, e.g. 'on'/'off', that I'm picking up periodically.

My desired behavior is as follows:

  • If the primary server is active and broadcasting, the client will accept data from the primary server only (regardless of data coming in from the secondary server)
  • If the primary server stops broadcasting, the client will accept data from the secondary server
  • If the primary server resumes broadcasting, don't cede to the primary unless the secondary server goes down (to prevent bouncing back and forth in the event that the primary sever is going in and out of failure)
  • If neither server is broadcasting, raise a flag

Currently the problem is that if both servers are broadcasting (which they will be most of the time), my client happily receives data from both and bounces back and forth between the two. I understand why this is happening, but I'm unsure how to stop it / work around it.

How can I structure my client to disregard data coming in from the secondary server as long as it's also getting data from the primary server?

NB - I'm using threads and queues here to keep my UDP operations from blocking my GUI

import queue
import socket as skt
import tkinter as tk
from threading import Event, Thread
class App(tk.Tk):
def __init__(self):
self.title('UDP Client Test')
# set up window close handler
self.protocol('WM_DELETE_WINDOW', self.on_close)
# display the received value
self.data_label_var = tk.StringVar(self, 'No Data')
self.data_label = ttk.Label(self, textvariable=self.data_label_var)
# server IP addresses (example)
self.primary = ''
self.secondary = ''
self.port = 5555
self.timeout = 2.0
self.client_socket = self.get_client_socket(self.port, self.timeout)
self.dq_loop = None  # placeholder for dequeue loop 'after' ID
self.receiver_queue = Queue(maxsize=1)
self.stop_event = Event()
self.receiver_thread = Thread(
(self.primary, self.secondary),
def get_client_socket(self, port: int, timeout: float) -> skt.socket:
"""Set up a UDP socket bound to the given port"""
client_socket = skt.socket(skt.AF_INET, skt.SOCK_DGRAM)
client_socket.bind('', port)  # accept traffic on this port from any IP address
return client_socket
def receiver_worker(
socket: skt.socket,
addresses: tuple[str, str],
queue: queue.Queue,
stop_event: Event,
) -> None:
"""Thread worker that receives data over UDP and puts it in a lossy queue"""
primary, secondary = addresses  # server IPs
while not stop_event.is_set():  # loop until application exit...
data, server = socket.recvfrom(1024)
# here's where I'm having trouble - if traffic is coming in from both servers,
# there's a good chance my frontend will just pick up data from both alternately
# (and yes, I know these conditions do the same thing...for now)
if server[0] == primary:
queue.put_nowait((data, server))
elif server[0] == secondary:
queue.put_nowait((data, server))
else:  # inbound traffic on the correct port, but from some other server
except queue.Full:
print('Queue full...')  # not a problem, just here in case...
except skt.timeout:
print('Timeout...')  # TODO
def receiver_dequeue(self) -> None:
"""Periodically fetch data from the worker queue and update the UI"""
data, server = self.receiver_queue.get_nowait()
except queue.Empty:
pass  # nothing to do
else:  # update the label
finally:  # continue updating 10x / second
self.dq_loop = self.after(100, self.receiver_dequeue)
def on_close(self) -> None:
"""Perform cleanup tasks on application exit"""
if self.dq_loop:
self.stop_event.set()  # stop the receiver thread loop
if __name__ == '__main__':
app = App()

My actual application is only slightly more complex than this, but the basic operation is the same: get data from UDP, use data to update UI...rinse and repeat

I suspect the changes need to be made to my receiver_worker method, but I'm not sure where to go from here. Any help is very much welcome and appreciated! And thanks for taking the time to read this long question!

Addendum: FWIW I did some reading about Selectors but I'm not sure how to go about implementing them in my case - if anybody can point me to a relevant example, that would be amazing


得分: 1



def HowManyMillisecondsSinceTheLastUDPPacketWasReceivedFromServer(self, packetSourceIP):
    return current_timestamp_milliseconds() - self._lastPacketReceiveTimeStamp[packetSourceIP]



def UpdateActiveServer(self)
    millisSincePrimary = HowManyMillisecondsSinceTheLastUDPPacketWasReceivedFromServer(self._primaryServerIP)
    millisSinceSecondary = HowManyMillisecondsSinceTheLastUDPPacketWasReceivedFromServer(self, _secondaryServerIP)

    serverOfflineMillis = 5*1000  // 5
    primaryIsOffline = (millisSincePrimary >= serverOfflineMillis)
    secondaryIsOffline = (millisSinceSecondary >= serverOfflineMillis)

    if ((primaryIsOffline) and (not secondaryIsOffline)):
       self._usePacketsFromSecondaryServer = true
    if ((secondaryIsOffline) and (not primaryIsOffline)):
       self._usePacketsFromSecondaryServer = false


def PacketReceived(whichServer):
   if ((whichServer == self._primaryServerIP) and (not self._usePacketsFromSecondaryServer)) or ((whichServer == self._secondaryServerIP) and (self._usePacketsFromSecondaryServer)):
      # 解析和使用UDP数据包的代码在此

The core of the problem is: how do you determine that a given server is really offline as opposed to just temporarily taking a break, e.g. due to a momentary network glitch?

All your client really knows is whether it has received any UDP packets from a given source IP address recently or not, for some well-chosen definition of "recently". So what you can do in your client is update a per-IP-address member-variable to the current timestamp, whenever you receive a UDP packet from a given server. Then you can have a helper method like this (pseudocode):

def HowManyMillisecondsSinceTheLastUDPPacketWasReceivedFromServer(self, packetSourceIP):
return current_timestamp_milliseconds() - self._lastPacketReceiveTimeStamp[packetSourceIP]

Then e.g. if you know that your servers will be sending out a UDP packet once per second, you can decree that a given server is officially considered "offline" if you haven't received any UDP packets from it within the last 5 seconds. (Choose your own numbers here to suit, of course)

Then after you receive a packet and update the corresponding server-timestamp-member-variable, you can also update a member-variable indicating which server is the now the "active server" (i.e. the server you should currently be listening to):

def UpdateActiveServer(self)
millisSincePrimary = HowManyMillisecondsSinceTheLastUDPPacketWasReceivedFromServer(self._primaryServerIP)
millisSinceSecondary = HowManyMillisecondsSinceTheLastUDPPacketWasReceivedFromServer(self, _secondaryServerIP)
serverOfflineMillis = 5*1000  // 5 seconds
primaryIsOffline = (millisSincePrimary >= serverOfflineMillis)
secondaryIsOffline = (millisSinceSecondary >= serverOfflineMillis)
if ((primaryIsOffline) and (not secondaryIsOffline)):
self._usePacketsFromSecondaryServer = true
if ((secondaryIsOffline) and (not primaryIsOffline)):
self._usePacketsFromSecondaryServer = false

.... then the rest of your code can check the current value of self._usePacketsFromSecondaryServer to decide which incoming UDP packets to listen to and which ones to ignore (pseudocode):

def PacketReceived(whichServer):
if ((whichServer == self._primaryServerIP) and (not self._usePacketsFromSecondaryServer)) or ((whichServer == self._secondaryServerIP) and (self._usePacketsFromSecondaryServer)):
# code to parse and use UDP packet goes here

  • 本文由 发表于 2023年2月9日 02:07:00
  • 转载请务必保留本文链接:



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
