AgentZero - high-level ZeroMQ socket managers¶
AgentZero lets you create, connect, bind, and modify zeromq sockets in runtime with ease.
It works great with gevent, making it possible to create network applications with simple code that performs complex operations.
Features:¶
- Create labeled sockets, every ZMQ socket in AgentZero has a name.
- seamlessly poll across connected/bound sockets
- seamlessly subscribe to events, which return the boxed type:
Event
- easily publish events
- bind sockets to random ports automatically
- bind to hostnames, with automatic DNS resolution
- ability to wait until a socket has received data
- builtin python log handler that publishes logs in a ZMQ PUB socket
Table of Contents:¶
SocketManager¶
-
class
agentzero.
SocketManager
(zmq, context, serialization_backend=None, polling_timeout=10000, timeout=10)[source]¶ High-level abstraction for zeromq’s non-blocking api.
This component provides utility methods to create, retrieve, connect and bind sockets by name.
It can wait for a socket to become available in either receiving data, sending data or both at the same time.
Parameters: - zmq – a reference to the zmq module (either from
import zmq
orimport zmq.green as zmq
) - context – the context where the sockets will be created
- serialization_backend – an instance of a valid
agentzero.serializers.BaseSerializer
. This is completely optional safe for the cases where you utilize the methodssend_safe
andrecv_safe
when communicating to other nodes. - polling_timeout – a float - how long to wait for the socket to become available, in miliseconds
- timeout – default value passed to
engage()
Note
An extra useful feature that comes with using a
SocketManager
is that you can use a SocketManager to create an application that dynamically connects to new nodes based on scaling instructions coming from other nodesWarning
Always use the same context per each thread. If you are using gevent, please using a single instance for your whole main process, across all greenlets that you manage.
import zmq from agentzero.core import SocketManager from agentzero.serializers import JSON context = zmq.Context() sockets = SocketManager(zmq, context, serialization_backend=JSON()) sockets.ensure_and_connect( "requester", zmq.REQ, 'tcp://192.168.2.42:5051', zmq.POLLIN | zmq.POLLOUT )
- zmq – a reference to the zmq module (either from
create¶
get_by_name¶
get_or_create¶
-
SocketManager.
get_or_create
(name, socket_type, polling_mechanism)[source]¶ ensure that a socket exists and is registered with a given polling_mechanism (POLLIN, POLLOUT or both)
returns the socket itself.
Parameters: - name – the socket name
- socket_type – a valid socket type (i.e:
zmq.REQ
,zmq.PUB
,zmq.PAIR
, …) - polling_mechanism – one of (
zmq.POLLIN
,zmq.POLLOUT
,zmq.POLLIN | zmq.POLLOUT
)
import zmq from agentzero.core import SocketManager from agentzero.serializers import JSON context = zmq.Context() sockets = SocketManager(zmq, context, serialization_backend=JSON()) sockets.get_or_create( "requester", zmq.REQ, zmq.POLLIN | zmq.POLLOUT )
register_socket¶
bind¶
-
SocketManager.
bind
(socket_name, address, polling_mechanism)[source]¶ binds a socket to an address and automatically registers it with the given polling mechanism.
returns the socket itself.
Parameters: - socket_name – the socket name
- address – a valid zeromq address (i.e: inproc://whatevs)
- polling_mechanism –
zmq.POLLIN
,zmq.POLLOUT
orzmq.POLLIN | zmq.POLLOUT
Example:
>>> import zmq >>> from agentzero.core import SocketManager >>> >>> sockets = SocketManager() >>> sockets.create('pipe-in', zmq.PULL) >>> sockets.bind('pipe-in', 'tcp://*:6000', zmq.POLLIN)
ensure_and_bind¶
-
SocketManager.
ensure_and_bind
(socket_name, socket_type, address, polling_mechanism)[source]¶ Ensure that a socket exists, that is binded to the given address and that is registered with the given polling mechanism.
Tip
This method is a handy replacement for calling
get_or_create()
,bind()
and thenengage()
.returns the socket itself.
Parameters: - socket_name – the socket name
- socket_type – a valid socket type (i.e:
zmq.REQ
,zmq.PUB
,zmq.PAIR
, …) - address – a valid zeromq address (i.e: inproc://whatevs)
- polling_mechanism –
zmq.POLLIN
,zmq.POLLOUT
orzmq.POLLIN | zmq.POLLOUT
bind_to_random_port¶
-
SocketManager.
bind_to_random_port
(socket_name, polling_mechanism, local_address=u'tcp://0.0.0.0')[source]¶ binds the socket to a random port
returns a 2-item tuple with the socket instance and the address string
Parameters: - socket_name – the socket name
- polling_mechanism –
zmq.POLLIN
,zmq.POLLOUT
orzmq.POLLIN | zmq.POLLOUT
Example:
>>> import zmq >>> from agentzero.core import SocketManager >>> >>> sockets = SocketManager() >>> sockets.create('api-server', zmq.REP) >>> _, address = sockets.bind_to_random_port( ... 'api-server', ... zmq.POLLIN | zmq.POLLOUT, ... local_address='tcp://192.168.10.24 ... ) >>> address 'tcp://192.168.10.24:61432'
connect¶
-
SocketManager.
connect
(socket_name, address, polling_mechanism)[source]¶ connects a socket to an address and automatically registers it with the given polling mechanism.
returns the socket itself.
Parameters: - socket_name – the socket name
- address – a valid zeromq address (i.e: inproc://whatevs)
- polling_mechanism –
zmq.POLLIN
,zmq.POLLOUT
orzmq.POLLIN | zmq.POLLOUT
Example:
>>> import zmq >>> from agentzero.core import SocketManager >>> >>> sockets = SocketManager() >>> sockets.ensure_and_connect( ... socket_name='logs', ... zmq.PUB, ... 'tcp://192.168.10.24:6000', ... zmq.POLLOUT ... ) >>> sockets.publish_safe('logs', 'output', 'some data')
ensure_and_connect¶
-
SocketManager.
ensure_and_connect
(socket_name, socket_type, address, polling_mechanism)[source]¶ Ensure that a socket exists, that is connected to the given address and that is registered with the given polling mechanism.
Tip
This method is a handy replacement for calling
get_or_create()
,connect()
and thenengage()
.returns the socket itself.
Parameters: - socket_name – the socket name
- socket_type – a valid socket type (i.e:
zmq.REQ
,zmq.PUB
,zmq.PAIR
, …) - address – a valid zeromq address (i.e: inproc://whatevs)
- polling_mechanism –
zmq.POLLIN
,zmq.POLLOUT
orzmq.POLLIN | zmq.POLLOUT
Example:
>>> import zmq >>> from agentzero.core import SocketManager >>> >>> sockets = SocketManager() >>> sockets.ensure_and_connect( ... socket_name='logs', ... zmq.REQ, ... 'tcp://192.168.10.24:7000', ... zmq.POLLIN | zmq.POLLOUT ... )
engage¶
-
SocketManager.
engage
(timeout=None)[source]¶ polls all registered sockets with the given timeout in miliseconds
returns a dictionary with the sockets that are ready to be used in their respective state (
zmq.POLLIN
orzmq.POLLOUT
)Parameters: timeout – how long should it poll until a socket becomes available. defaults to agentzero.core.DEFAULT_POLLING_TIMEOUT
send_safe¶
-
SocketManager.
send_safe
(name, data, *args, **kw)[source]¶ serializes the data with the configured
serialization_backend
, waits for the socket to become available, then sends it over through the provided socket name.returns
True
if the message was sent, orFalse
if the socket never became available.Note
you can safely use this function without waiting for a socket to become ready, as it already does it for you.
raises SocketNotFound when the socket name is wrong.
Parameters: - name – the name of the socket where data should be sent through
- data – the data to be serialized then sent
- *args – args to be passed to wait_until_ready
- **kw – kwargs to be passed to wait_until_ready
recv_safe¶
-
SocketManager.
recv_safe
(name, *args, **kw)[source]¶ waits for the socket to become available then receives data through it and deserializes the result using the configured
serialization_backend
before returning.Note
you can safely use this function without waiting for a socket to become ready, as it already does it for you.
raises SocketNotFound when the socket name is wrong.
returns the deserialized data, or
None
if the socket never became availableParameters: - name – the name of the socket where data will pad through
- *args – args to be passed to wait_until_ready
- **kw – kwargs to be passed to wait_until_ready
Example:
>>> import zmq >>> from agentzero.core import SocketManager >>> >>> sockets = SocketManager() >>> sockets.ensure_and_bind('pipe-in', zmq.PULL, 'tcp://*:6000', zmq.POLLIN) >>> sockets.recv_safe('pipe-in') { "pipeline": "video-download", "instructions": { "url": "https://www.youtube.com/watch?v=FPZ6mVsv4EI" } }
recv_event_safe¶
-
SocketManager.
recv_event_safe
(name, topic=False, *args, **kw)[source]¶ waits for the socket to become available then receives multipart data assuming that it’s a pub/sub event, thus it parses the topic and the serialized data, then it deserializes the result using the configured
serialization_backend
before returning.Note
you can safely use this function without waiting for a socket to become ready, as it already does it for you.
raises SocketNotFound when the socket name is wrong.
returns the deserialized data, or
None
if the socket never became availableParameters: - name – the name of the socket where data will pad through
- *args – args to be passed to wait_until_ready
- **kw – kwargs to be passed to wait_until_ready
Example:
>>> import zmq >>> from agentzero.core import SocketManager >>> >>> sockets = SocketManager() >>> sockets.ensure_and_bind('events', zmq.SUB, 'tcp://*:6000', zmq.POLLIN) >>> >>> # subscribe only to topics beginning with "logs" >>> sockets.set_topic('events', 'logs') >>> event = sockets.recv_event_safe('events') >>> event.topic, event.data 'logs:2016-06-20', {'stdout': 'hello world'}
subscribe¶
-
SocketManager.
subscribe
(name, topic=None, keep_polling=None, *args, **kw)[source]¶ waits for the socket to become available then receives data through it and deserializes the result using the configured
serialization_backend
before returning.Note
you can safely use this function without waiting for a socket to become ready, as it already does it for you.
raises SocketNotFound when the socket name is wrong.
returns an :py:class`~agentzero.core.Event`, or
None
if the socket never became availableParameters: Tip
pass a function to the keep_polling to control the finality of the loop
Example:
>>> import zmq >>> from agentzero.core import SocketManager >>> >>> sockets = SocketManager() >>> sockets.ensure_and_bind('logs', zmq.SUB, 'tcp://*:6000', zmq.POLLIN) >>> for topic, data in sockets.subscribe('logs', 'output'): ... print topic, '==>', data ... output:0 ==> some data output:1 ==> more data ...
set_socket_option¶
-
SocketManager.
set_socket_option
(name, option, value)[source]¶ calls
zmq.setsockopt
on the given socket.Parameters: - name – the name of the socket where data will pad through
- option – the option from the
zmq
module - value –
Here are some examples of options:
zmq.HWM
: Set high water markzmq.AFFINITY
: Set I/O thread affinityzmq.IDENTITY
: Set socket identityzmq.SUBSCRIBE
: Establish message filterzmq.UNSUBSCRIBE
: Remove message filterzmq.SNDBUF
: Set kernel transmit buffer sizezmq.RCVBUF
: Set kernel receive buffer sizezmq.LINGER
: Set linger period for socket shutdownzmq.BACKLOG
: Set maximum length of the queue of outstanding connections- for the full list go to
http://api.zeromq.org/4-0:zmq-setsockopt
Example:
>>> import zmq >>> from agentzero.core import SocketManager >>> >>> sockets = SocketManager() >>> sockets.create('pipe-in', zmq.PULL) >>> >>> # block after 10 messages are queued >>> sockets.set_socket_option('pipe-in', zmq.HWM, 10)
set_topic¶
-
SocketManager.
set_topic
(name, topic)[source]¶ shortcut to
SocketManager.set_socket_option()
with(name, zmq.SUBSCRIBE, topic)
Parameters: - name – the name of the socket where data will pad through
- topic – the option from the
zmq
module
Example:
>>> import zmq >>> from agentzero.core import SocketManager >>> >>> sockets = SocketManager() >>> sockets.ensure_and_bind('events', zmq.SUB, 'tcp://*:6000', zmq.POLLIN) >>> >>> # subscribe only to topics beginning with "logs" >>> sockets.set_topic('events', 'logs') >>> event = sockets.recv_event_safe('events') >>> event.topic, event.data 'logs:2016-06-20', {'stdout': 'hello world'}
publish_safe¶
-
SocketManager.
publish_safe
(name, topic, data)[source]¶ serializes the data with the configured
serialization_backend
, waits for the socket to become available, then sends it to the given topic throughsocket.send_multipart
.returns
True
if the message was sent, orFalse
if the socket never became available.Note
you can safely use this function without waiting for a socket to become ready, as it already does it for you.
raises SocketNotFound when the socket name is wrong.
Parameters: - name – the name of the socket where data should be sent through
- topic – the name of the topic
- data – the data to be serialized then sent
ready¶
-
SocketManager.
ready
(name, polling_mechanism, timeout=None)[source]¶ Polls all sockets and checks if the socket with the given name is ready for either
zmq.POLLIN
orzmq.POLLOUT
.returns the socket if available, or
None
Parameters: - socket_name – the socket name
- polling_mechanism – either
zmq.POLLIN
orzmq.POLLOUT
- timeout – the polling timeout in miliseconds that will
be passed to
zmq.Poller().poll()
(optional, defaults tocore.DEFAULT_POLLING_TIMEOUT
)
wait_until_ready¶
-
SocketManager.
wait_until_ready
(name, polling_mechanism, timeout=None, polling_timeout=None)[source]¶ Briefly waits until the socket is ready to be used, yields to other greenlets until the socket becomes available.
returns the socket if available within the given timeout, or
None
Parameters: - socket_name – the socket name
- polling_mechanism – either
zmq.POLLIN
orzmq.POLLOUT
- timeout – the timeout in seconds (accepts float) in which it
should wait for the socket to become available
(optional, defaults to
core.DEFAULT_TIMEOUT_IN_SECONDS
) - polling_timeout – the polling timeout in miliseconds that will
be passed to
zmq.Poller().poll()
. (optional, defaults tocore.DEFAULT_POLLING_TIMEOUT
)
get_log_handler¶
-
SocketManager.
get_log_handler
(socket_name, topic_name=u'logs')[source]¶ returns an instance of
ZMQPubHandler
attached to a previously-created socket.Parameters: - socket_name – the name of the socket, previously created with
SocketManager.create()
- topic_name – the name of the topic in which the logs will be PUBlished
Example:
import logging import zmq.green as zmq from agentzero.core import SocketManager context = zmq.Context() sockets = SocketManager(zmq, context) handler = sockets.get_log_handler('logs', topic_name='app_logs') logger = logging.getLogger() logger.addHandler(handler) logger.info("Server is up!")
- socket_name – the name of the socket, previously created with
get_logger¶
-
SocketManager.
get_logger
(socket_name, topic_name=u'logs', logger_name=None)[source]¶ returns an instance of
Logger
that contains aZMQPubHandler
attached to.Parameters: - socket_name – the name of the socket, previously created with
create()
- topic_name – (optional) the name of the topic in which the logs will be PUBlished, defaults to “logs”
- logger_name – (optional) defaults to the given socket name
Example:
import logging import zmq.green as zmq from agentzero.core import SocketManager context = zmq.Context() sockets = SocketManager(zmq, context) logger = sockets.get_logger('logs', topic_name='logs', logger_name=__name__) logger.info("Server is up!")
- socket_name – the name of the socket, previously created with
close¶
-
SocketManager.
close
(socket_name)[source]¶ closes a socket if it exists
Parameters: - socket_name – the socket name
- address – a valid zeromq address (i.e: inproc://whatevs)
- polling_mechanism –
zmq.POLLIN
,zmq.POLLOUT
orzmq.POLLIN | zmq.POLLOUT
Example:
>>> import zmq >>> from agentzero.core import SocketManager >>> >>> sockets = SocketManager() >>> sockets.create('logs', zmq.SUB) >>> sockets.bind('logs', 'tcp://*:6000', zmq.POLLIN) >>> sockets.close('logs')
Utility Functions¶
get_free_tcp_port¶
extract_hostname_from_tcp_address¶
Exceptions¶
AgentZeroSocketError¶
SocketAlreadyExists¶
-
exception
agentzero.errors.
SocketAlreadyExists
(manager, socket_name)[source]¶ raised by
SocketManager
when trying to create a named socket that already exists>>> from agentzero.core import zmq >>> from agentzero.core import SocketManager >>> sockets = SocketManager() >>> sockets.create('foo', zmq.REP) >>> sockets.create('foo', zmq.REP) Traceback (most recent call last): ... SocketAlreadyExists: SocketManager(sockets=['foo']) already has a socket named 'foo'.
SocketNotFound¶
-
exception
agentzero.errors.
SocketNotFound
(manager, socket_name)[source]¶ raised by
SocketManager
when trying to retrieve an unexisting socket>>> from agentzero.core import zmq >>> from agentzero.core import SocketManager >>> sockets = SocketManager() >>> sockets.get_by_name('some-name', zmq.PUB) Traceback (most recent call last): ... SocketNotFound: SocketManager(sockets=['']) has no sockets named 'some-name'.
SocketBindError¶
Contributor Code of Conduct¶
As contributors and maintainers of these projects, and in the interest of fostering an open and welcoming community, we pledge to respect all people who contribute through reporting issues, posting feature requests, updating documentation, submitting pull requests or patches, and other activities.
We are committed to making participation in these projects a harassment-free experience for everyone, regardless of level of experience, gender, gender identity and expression, sexual orientation, disability, personal appearance, body size, race, ethnicity, age, religion, or nationality.
Examples of unacceptable behavior by participants include:
- The use of sexualized language or imagery
- Personal attacks
- Trolling or insulting/derogatory comments
- Public or private harassment
- Publishing other’s private information, such as physical or electronic addresses, without explicit permission
- Other unethical or unprofessional conduct.
Project maintainers have the right and responsibility to remove, edit, or reject comments, commits, code, wiki edits, issues, and other contributions that are not aligned to this Code of Conduct. By adopting this Code of Conduct, project maintainers commit themselves to fairly and consistently applying these principles to every aspect of managing this project. Project maintainers who do not follow or enforce the Code of Conduct may be permanently removed from the project team.
This code of conduct applies both within project spaces and in public spaces when an individual is representing the project or its community.
Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by opening an issue or contacting one or more of the project maintainers.
This Code of Conduct is adapted from the Contributor Covenant, version 1.2.0 available at http://contributor-covenant.org/version/1/2/0/.
Building a distributed worker pipeline¶
Let’s build a worker pipeline where Steps
that will execute
specific job types, and can be scaled individually.
Here is an overview of the socket architecture:
Coding the pipeline entity¶
The main pipeline contains the following sockets:
- a
SUB
socket where it willbind()
at the givenbind_address
and subscribe toStep
events - a
REP
socket where it will respond to pipeline executionbind()
at the givenreply_address
and reply with a job id for later status querying - a
REP
socket where it will respond to pipeline executionbind()
at the givenreply_address
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | import zmq.green as zmq
from agentzero.core import SocketManager
class Pipeline(object):
steps = []
def __init__(self):
self.context = zmq.Context()
self.sockets = SocketManager(zmq, self.context)
self.sockets.create("pipe-sub", zmq.SUB)
self.sockets.create("pipe-in", zmq.PULL)
self.sockets.create("pipe-out", zmq.PUSH)
self.children = []
|
Coding the step entity¶
A Step
contains a PUB socket where it will send the following events:
- announce its
JobType
as well as its PUSH/PULLaddress
pair - announce failed jobs, so that they can be auto-recovered later
- announce succeeded jobs
- announce exceptions and auto-schedule a later retry
- live metrics
- live logging output