python - ZeroMQ: How to prioritise sockets in a .poll() method? -
imagine following code:
import threading, zmq, time context = zmq.context() receivers = [] poller = zmq.poller() def thread_fn(number: int): sender = context.socket(zmq.push) sender.connect("tcp://localhost:%d" % (6666 + number)) in range(10): sender.send_string("message thread %d" % number) in range(3): new_receiver = context.socket(zmq.pull) new_receiver.bind("tcp://*:%d" % (6666 + i)) poller.register(new_receiver, zmq.pollin) receivers.append(new_receiver) threading.thread(target=lambda: thread_fn(i), daemon=true).start() while true: try: socks = dict(poller.poll()) except keyboardinterrupt: break in range(3): if receivers[i] in socks: print("%d: process message %s" % (i, receivers[i].recv_string())) time.sleep(0.2) # 'process' data
the threads send messages without interruption arrive in random order @ corresponding pull
-sockets 'processed'.
note:
connect 1 pull
-socket example intends provide more 1 receiving socket.
output is:
0: process message message thread 0 1: process message message thread 1 0: process message message thread 0 1: process message message thread 1 2: process message message thread 2 0: process message message thread 0 1: process message message thread 1 2: process message message thread 2 ....
now want read sockets in example i'd prioritise 1 socket.
i.e.: want output be:
0: process message message thread 0 <-- socket 0 processed first 0: process message message thread 0 0: process message message thread 0 0: process message message thread 0 0: process message message thread 0 1: process message message thread 1 1: process message message thread 1 2: process message message thread 2 1: process message message thread 1 2: process message message thread 2 ....
of course can poll sockets separately timeout=0
want be sure zeromq doesn't this me already.
so questions are:
q1:
is there way ( except built-in .poll( timeout )
)
make sure i've read messages 1 socket first
before waiting messages on other sockets?
q2:
is there a known best practice manually?
welcome wild worlds of (managed)-chaos,
a1:
yestl;dr
checkzmq.select()
in recent api / python wrappera2:
yestl;dr
must part of rigorous dependable system design
for sake of q2
, system design ought remain flexible, not handle said prioritisation segmentation, provide serious means robust handling of remote failures comply ( optimistically ) expected modus operandi.
what mean?
if postulated behaviour implemented trivial , naive serial alignment of easy implement principal syntax-constructs' sections alike idea:
# -------------------------------------------------------- # first scan hi-prio socket(s) incoming messages: while true: # process 'em first, based on zeromq-socket's behaviour-fixed ordering ... break # -------------------------------------------------------- # next scan lo-prio socket(s) incoming messages: while true: # process 'em, again, based on zeromq-socket's behaviour-fixed ordering ... break
any benefits system architecture strives create lost in moment forget have robust plan b - how handle blocking states, lost messages, dead counterparty fsa-process, dos-attack, faulty remote-nic sprays inbound interface spurious , massive flow of bytes, nightmares may , appear out of controls.
this means, plan how survive case, first group of sockets started "feed" receiver many messages ( processing tasks ), "can" never exit hi-prio section.
if still not getting point, let me remind great system design, introduced purpose apollo guidance computer (agc) software mit team headed ms. margaret hamilton, which did survive such "infinite-attack" of events, not anticipated engineering guys, did happen during real-life, worse, during landing of eagle ( lunar module ) on moon - second critical phase of whole journey "there , back".
it not overhype state smart design ms. hamilton's team did save both moment , whole glory of u.s. prestigeous apollo programme.
every responsible design has planned survive this.
the current zeromq
wrapper python
provides such purpose tool, poller()
class, -- once due care taken -- may save both design targets , provide space adding reliability-motivated functions, incl. fallback escape strategies taken on colliding priorities/resources situations.
# ------------------------------------------------------------ # initialize separate engines polling set(s) hipriopoller = zmq.poller() lopriopoller = zmq.poller() # ------------------------------------------------------------ # associate hipriopoller.register( socket_0_pull, zmq.pollin ) # 0: lopriopoller.register( ... , zmq.pollin ) # 1: lopriopoller.register( ... , zmq.pollin ) # 2: ... # ------------------------------------------------------------ # detect, waiting in front of closed door alistofhiprioevents = hipriopoller.poll( timeout = 0.200 ) # 200 [us] alistofloprioevents = lopriopoller.poll( timeout = 0 ) # no wait @ # ------------------------------------------------------------ # after have complete view waiting there # 1 # can & shall adapt order / scope of event-handling, # immune infinite-prio-event-flow. ... # ------------------------------------------------------------
poller.poll()
method returns list
of events ready processed. list of tuples of form ( socket, event )
, first element { a-0mq-socket-instance | integer-system-native-fd }
, , second poll-event mask ( pollin, pollout )
. common call decorated adictofevents = dict( apoller.poll() )
, turns list of tuples mapping of { asocket : anevent, ... }
if 1 wishes to.
finally,
epilogue: tribute margaret hamilton , mit team
if story nothing else inspire our thoughts, margaret's brave efforts have taught lot professional system design. may learn lot trully pioneering epoch - more what computer science has implemented on penny-scaled resources' footprints in 60-ies, today systems' designs ( if not worse ) quite in debt ...
( ) post-festum:
with pleasure & delight, let me share white-house news fair & honorable moment when margaret hamilton has been awarded presidential medal of freedom.
in context 1 may consider historical imperative compare nasa remarks on zero-bug ever found in margaret hamilton's team products, deployed --- in apollo, skylab, space shuttle , first u.s. fly-by-wire aircraft --- somehow other kind of experience gathered on products 1 other medalist, awarded "two rows" before margaret.
an uncompromised excellence remains excellence.
my hat raised, dear mrs. hamilton.
let enlightment shows us, mortals, better ways forwards.
Comments
Post a Comment