所以我正在阅读
this article关于如何在ZMQ中为(X)PUB /(X)SUB消息传递创建代理/代理.有什么建筑应该是这样漂亮的图片:
但是当我看到XSUB socket description时,由于其出局路由策略是N / A,因此我没有通过它转发所有订阅
那么在ZeroMQ中如何实现(un)订阅转发,这样的转发应用程序的最小用户代码是什么(可以在简单的Publisher和Subscriber样本之间插入)?
解决方法
XPUB接收消息 – 接收到的唯一消息是已连接订户的订阅,这些消息应通过XSUB上传到上游.
中继消息的最简单的方法是使用zmq_proxy:
xpub = ctx.socket(zmq.XPUB) xpub.bind(xpub_url) xsub = ctx.socket(zmq.XSUB) xsub.bind(xsub_url) pub = ctx.socket(zmq.PUB) pub.bind(pub_url) zmq.proxy(xpub,xsub,pub)
这将将消息传递到/从xpub和xsub.或者,您可以添加PUB套接字来监视任一方向通过的流量.
如果您希望中间的用户代码实现额外的路由逻辑,您将执行此操作,
它重新实现了zmq_proxy的内部循环:
def broker(ctx):
xpub = ctx.socket(zmq.XPUB)
xpub.bind(xpub_url)
xsub = ctx.socket(zmq.XSUB)
xsub.bind(xsub_url)
poller = zmq.Poller()
poller.register(xpub,zmq.POLLIN)
poller.register(xsub,zmq.POLLIN)
while True:
events = dict(poller.poll(1000))
if xpub in events:
message = xpub.recv_multipart()
print "[broKER] subscription message: %r" % message[0]
xsub.send_multipart(message)
if xsub in events:
message = xsub.recv_multipart()
# print "publishing message: %r" % message
xpub.send_multipart(message)
# insert user code here
full working (Python) example