Important: ZeroMQ is a library, not a broker. Configuration happens at the application level through socket options and connection strings.
¶ Socket Types and Patterns
| Pattern |
Socket Types |
Description |
| Request-Reply |
REQ, REP |
Synchronous RPC-style communication |
| Publish-Subscribe |
PUB, SUB |
One-to-many data distribution |
| Push-Pull |
PUSH, PULL |
Pipeline for parallel task distribution |
| Router-Dealer |
ROUTER, DEALER |
Advanced asynchronous messaging |
| Pair |
PAIR |
Exclusive connection between two threads |
# TCP (network)
socket.bind("tcp://*:5555") # Bind to all interfaces
socket.connect("tcp://host:5555") # Connect to specific host
# IPC (inter-process, Unix only)
socket.bind("ipc:///tmp/myapp")
# In-process (within same application)
socket.bind("inproc://myqueue")
# PGM (reliable multicast)
socket.bind("pgm://eth0;239.192.1.1:5555")
import zmq
context = zmq.Context()
socket = context.socket(zmq.REQ)
# Connection management
socket.setsockopt(zmq.LINGER, 0) # Don't block on close
socket.setsockopt(zmq.SNDHWM, 1000) # Send high-water mark
socket.setsockopt(zmq.RCVHWM, 1000) # Receive high-water mark
# Timeouts (in milliseconds)
socket.setsockopt(zmq.SNDTIMEO, 5000) # Send timeout: 5s
socket.setsockopt(zmq.RCVTIMEO, 5000) # Receive timeout: 5s
# TCP keepalive
socket.setsockopt(zmq.TCP_KEEPALIVE, 1)
socket.setsockopt(zmq.TCP_KEEPALIVE_IDLE, 60)
# Server side
socket.setsockopt(zmq.CURVE_SERVER, True)
socket.setsockopt(zmq.CURVE_SECRETKEY, server_secret)
# Client side
socket.setsockopt(zmq.CURVE_SERVERKEY, server_public)
socket.setsockopt(zmq.CURVE_PUBLICKEY, client_public)
socket.setsockopt(zmq.CURVE_SECRETKEY, client_secret)
import zmq
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
while True:
message = socket.recv()
print(f"Received: {message}")
socket.send(b"World")
import zmq
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
for i in range(3):
socket.send(b"Hello")
message = socket.recv()
print(f"Received: {message}")
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5556")
while True:
socket.send(b"weather update")
time.sleep(1)
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5556")
socket.setsockopt(zmq.SUBSCRIBE, b"") # Subscribe to all
while True:
message = socket.recv()
print(f"Received: {message}")
| Variable |
Description |
ZMQ_MAX_SOCKETS |
Maximum number of sockets |
ZMQ_IO_THREADS |
Number of I/O threads |
# Set before creating context
context = zmq.Context()
context.setsockopt(zmq.IO_THREADS, 4)
context.setsockopt(zmq.MAX_SOCKETS, 1024)