osBrain - 0.6.5

osBrain logo

What is osBrain?

osBrain is a general-purpose multi-agent system module written in Python and developed by OpenSistemas. Agents run independently as system processes and communicate with each other using message passing.

osBrain uses ØMQ for efficient and flexible message passing between agents. It also uses Pyro4 to ease the configuration and deployment of complex systems.

Please read the Software License and Disclaimer.

Contents

About osBrain

osBrain logo

Feature overview

osBrain is a general-purpose multi-agent system module written in Python.

  • Agents run independently as system processes and communicate with each other using message passing.
  • Message passing is implemented using ØMQ, and in particular, the PyZMQ Python bindings.
  • ØMQ allows for efficient, asynchronous communication using different commonly used communication patterns such as request-reply, push-pull and publish-subscribe.
  • osBrain integrates Pyro4 to ease the configuration and deployment of complex systems.
  • Thanks to Pyro4, remote agents can be treated as local objects and reconfigured even when they are running. Not just variables, but also new methods can be created in the remote agents.
  • osBrain provides the base for implementing robust, highly-available, flexible multi-agent systems.
  • Being implemented in Python, osBrain can take advantage of a huge set of packages for data analysis, statistics, numerical computing, etc. available in the Python ecosystem.

In order to fully understand osBrain capabilities, it is highly recommended to read the Pyro4 documentation and the ØMQ guide.

OsBrain’s history

osBrain was initially developed in OpenSistemas based on the need to create a real-time automated-trading platform. This platform needed to be able to process real-time market data updates fast and in a parallel way. Robustness was very important as well in order to prevent running trading strategies from being affected by a failure in another strategy.

Python was chosen for being a great language for fast prototyping and for having a huge data analysis ecosystem available. It was kept for its final performance and the beautiful source code created with it.

The appearance of osBrain was a consequence of a series of steps that were taken during the development process:

  1. Isolation of agents; creating separate system processes to avoid shared memory and any problems derived from multi-threading development.
  2. Implementation of message passing; making use of the modern, efficient and flexible ØMQ library.
  3. Ease of configuration/deployment; making use of the very convenient, well implemented and documented Pyro4 package.
  4. Separation from the trading platform; what started as a basic architecture for implementing a real-time automated-trading platform, ended-up being a general-purpose multi-agent system architecture.

What can you use osBrain for?

osBrain has been successfully used to develop a real-time automated-trading platform in OpenSistemas, but being a general-purpose multi-agent system, it is not limited to this application. Other applications include:

  • Transportation.
  • Logistics.
  • Defense and military applications.
  • Networking.
  • Load balancing.
  • Self-healing networks.

In general, osBrain can be used whenever a multi-agent system architecture fits the application well:

  • Autonomy of the agents.
  • Local views.
  • Decentralization.

Performance

The performance of osBrain, just as the performance of any other system architecture, depends a lot on the actual application. The developer should always take this into account:

  1. Pyro4 is used only for configuration, deployment, updating and debugging, which means that the actual performance of the system should not depend on this package.
  2. ØMQ is used with the PyZMQ Python bindings, which means that the system performance depends on the PyZMQ performance.
  3. osBrain uses pickle for serialization by default, which means that the system performance may as well depend on this package. Serialization is configurable, though.
  4. osBrain default transport is IPC for operating systems that provide UNIX domain sockets, and TCP for the rest. It can be changed globally or configured specifically for each bind. Note, however, that when using TCP, the network may have a great impact on performance.

Introduction and Example

Installation

This tutorial is a step-by-step introduction to osBrain with examples. In order to start playing with this module, you only need to install it.

osBrain requires Python 3. Most probably, Python 3 is already packaged for your favorite distribution (and maybe even installed by default in your system). If you do not have Python 3 available, consider using Conda to create a virtual environment with Python 3.

Installing osBrain is very simple with pip:

pip install osbrain

You should now be able to import osbrain from a python console:

>>> import osbrain

Hello world

The first example is, of course, a simple hello world! program. Three steps are taken here:

  1. Run a name server.
  2. Run an agent with an alias Example.
  3. Log a Hello world message from the agent.
from osbrain import run_agent
from osbrain import run_nameserver

if __name__ == '__main__':

    # System deployment
    ns = run_nameserver()
    agent = run_agent('Example')

    # Log a message
    agent.log_info('Hello world!')

    ns.shutdown()

Running this example from your terminal should simply show you a log message saying Hello world! but, what exactly is happening there?

Note

The ns.shutdown() calls in the examples are there because we do not want examples to run indefinitely. For more information on that method, refer to the Shutting down section.

Agents and proxies

An agent, in osBrain, is an entity that runs independently from other agents in the system. When running, it executes the main loop:

  • Poll for incoming messages.
  • Process incoming messages and execute developer-defined code.
  • Repeat.

This means a single agent, as in the Hello world! example, makes little or no sense. Agents in a multi-agent system start to make sense when connected to each other.

The easiest way to run an agent in an osBrain architecture is by calling the function osbrain.run_agent:

>>> agent = run_agent(...)

This function will spawn a new agent and will return a osbrain.Proxy to it.

Proxies are simply local objects that allow us to easily have access to the remote agent. The fact that agents are run independently from each other justifies the need of a proxy.

A proxy allows us to call methods or access attributes of the remote agent in a very convenient way. See for example the previous call:

>>> agent.log_info('Hello world')

The method log_info() is implemented in osbrain.Agent so, when this method is called from the proxy, this call is actually being serialized to the remote running agent and gets executed there. The return value, if any, is then serialized back and returned by the proxy. So basically so get the impression of being working with a local object while your code is executed remotely.

The name server

A name server is just like any other agent, so it runs independently, but with a very specific role. Name servers are used as an address book. This means other agents can be run in the system and can be registered in the name server using a human-readable alias. Aliases help us accessing these agents easily even from remote locations.

Note that when calling the osbrain.run_agent() function, we are passing a string parameter. This parameter is the alias the agent will use to register itself in the name server.

When we run a name server calling the osbrain.run_nameserver(), we also get in return a proxy to this name server:

>>> ns = run_nameserver()

This proxy can be used to list the agents registered in the name server:

from osbrain import run_agent
from osbrain import run_nameserver

if __name__ == '__main__':

    # System deployment
    ns = run_nameserver()
    run_agent('Agent0')
    run_agent('Agent1')
    run_agent('Agent2')

    # Show agents registered in the name server
    for alias in ns.agents():
        print(alias)

    ns.shutdown()

The code above should simply print the aliases of all the agents registered in the name server.

A name server proxy can also be used to create proxies to registered agents. This is specially useful when accessing the multi-agent system from a different console or location, as it will reduce the number of addresses that we need to remember.

from osbrain import run_agent
from osbrain import run_nameserver

if __name__ == '__main__':

    # System deployment
    ns = run_nameserver()
    run_agent('Agent0')
    run_agent('Agent1')
    run_agent('Agent2')

    # Create a proxy to Agent1 and log a message
    agent = ns.proxy('Agent1')
    agent.log_info('Hello world!')

    ns.shutdown()

The code above creates (and registers) three different agents in a name server and then creates, through the name server proxy, a proxy to one of those agents simply using its alias. Then it uses the agent proxy to remotely call a method to log a Hello world! message.

Basic communication patterns

Push-Pull

Example

Now that we understand the basics of how proxies, agents and name servers work, let us jump into a more interesting example.

As mentioned before, a multi-agent system only makes sense if agents are connected with each other and share some information using message passing.

In this first example, we will create two agents: Alice and Bob, and we will make alice send messages to Bob using a simple push-pull communication pattern.

import time

from osbrain import run_agent
from osbrain import run_nameserver


def log_message(agent, message):
    agent.log_info('Received: %s' % message)


if __name__ == '__main__':

    # System deployment
    ns = run_nameserver()
    alice = run_agent('Alice')
    bob = run_agent('Bob')

    # System configuration
    addr = alice.bind('PUSH', alias='main')
    bob.connect(addr, handler=log_message)

    # Send messages
    for _ in range(3):
        time.sleep(1)
        alice.send('main', 'Hello, Bob!')

    ns.shutdown()

So, in this case, we are doing some more stuff. After we spawn Alice and Bob, we connect them.

First, we make Alice bind:

addr = alice.bind(‘PUSH’, alias=’main’)

There are three things to remark in that line:

  1. The first parameter 'PUSH' represents the communication pattern we want to use. In this case we are using a simple push-pull (unidirectional) pattern to allow Alice to send messages to Bob.
  2. The second parameter is, once again, an alias. We can use this alias to refer to this communication channel in an easier way.
  3. The binding, as you already guessed, takes place in the remote agent, but it actually returns a value, which is the address the agent binded to. This address is serialized back to us so we can use it to connect other agents to it.

The next interesting line of code is the one in which Bob connects to Alice:

bob.connect(addr, handler=log_message)

There are two things to remark in here:

  1. Calling connect() from an agent requires, first, an address. This address is, in this case, the one we got after binding Alice. This method will automatically select the appropriate communication pattern to connect to this pattern ('PULL' in this case).
  2. Bob will be receiving messages from Alice, so we must set a handler function that will be executed when a message from Alice is received. This handler will be serialized and stored in the remote agent to be executed there when needed.

The handler function, in its most basic form, accepts two parameters:

def handler(agent, message):
    ...
  1. The actual agent (can be named self as well, in an OOP way).
  2. The message that is received.

In the example above, the handler simply logs the message received.

List of handlers

When using push-pull communication patterns we are allowed to set multiple handlers using a list. In example:

agent.connect('PULL', handler=[handler1, handler2, handler3])

Note that in this case all handlers will be executed in sequence.

Request-Reply

Another common communication pattern is the request-reply, in which a requester sends a message to the replier and always expects a reply. It is sometimes useful, specially when some kind of synchronization is required.

Example
from osbrain import run_agent
from osbrain import run_nameserver


def reply(agent, message):
    return 'Received ' + str(message)


if __name__ == '__main__':

    ns = run_nameserver()
    alice = run_agent('Alice')
    bob = run_agent('Bob')

    addr = alice.bind('REP', alias='main', handler=reply)
    bob.connect(addr, alias='main')

    for i in range(10):
        bob.send('main', i)
        reply = bob.recv('main')
        print(reply)

    ns.shutdown()

The main difference with respect to the push-pull pattern is that, in this case, Bob must run the recv method in order to get the reply back from Alice.

Note

Although the requester is not required to immediately await for the reply (i.e.: can do other stuff after sending the request and before receiving the response), it is required to receive a reply back before making another request through the same communication channel. Multiple requests can be made from the same agent as long as it uses different communication channels for each request.

Return versus yield

The easiest way to reply to a request is to return a value from the handler, as seen in Request-Reply:

def reply(agent, message):
    return 'Received ' + str(message)

However, using return the agent can only send a response after executing the handler. Instead, an agent can use yield to reply earlier if needed:

def reply(agent, message):
    yield 'Received' + str(message)  # Reply now
    agent.log_info('Already sent a reply back!')   # Do some stuff later

Publish-Subscribe

One of the most useful communication patterns between agents is the publish and subscribe pattern. The publisher will send messages to all subscribed agents.

Example

Here is an example in which Alice is the publisher and Bob and Eve subscribe to Alice. This way, when Alice sends a message, both Bob and Eve will receive it:

import time

from osbrain import run_agent
from osbrain import run_nameserver


def log_message(agent, message):
    agent.log_info('Received: %s' % message)


if __name__ == '__main__':

    # System deployment
    ns = run_nameserver()
    alice = run_agent('Alice')
    bob = run_agent('Bob')
    eve = run_agent('Eve')

    # System configuration
    addr = alice.bind('PUB', alias='main')
    bob.connect(addr, handler=log_message)
    eve.connect(addr, handler=log_message)

    # Send messages
    for _ in range(3):
        time.sleep(1)
        alice.send('main', 'Hello, all!')

    ns.shutdown()

Note the similarities between this example and the Sender-Receiver example. The only differences are that Alice is now binding using the 'PUB' pattern and that, instead of having just Bob connecting to Alice, we now have Eve as well connecting to Alice.

This communication pattern allows for easy filtering. Refer to the Filtering section in the tutorial for more details.

Filtering

The publish-subscribe pattern is very useful, but it is also very powerful when combined with filtering.

Any time we publish a message from an agent, a topic can be specified. If a topic is specified, then only the agents that are subscribed to that topic will receive the message. This filtering is done in the publisher side, meaning that the network does not suffer from excessive message passing.

In the following example we have Alice publishing messages using topic a or b at random. Then we have Bob subscribed to both topics, Eve subscribed to topic a only and Dave subscribed to topic b only.

import random
import time

from osbrain import run_agent
from osbrain import run_nameserver


def log_a(agent, message):
    agent.log_info('Log a: %s' % message)


def log_b(agent, message):
    agent.log_info('Log b: %s' % message)


if __name__ == '__main__':

    # System deployment
    ns = run_nameserver()
    alice = run_agent('Alice')
    bob = run_agent('Bob')
    eve = run_agent('Eve')
    dave = run_agent('Dave')

    # System configuration
    addr = alice.bind('PUB', alias='main')
    bob.connect(addr, handler={'a': log_a, 'b': log_b})
    eve.connect(addr, handler={'a': log_a})
    dave.connect(addr, handler={'b': log_b})

    # Send messages
    for _ in range(6):
        time.sleep(1)
        topic = random.choice(['a', 'b'])
        message = 'Hello, %s!' % topic
        alice.send('main', message, topic=topic)

    ns.shutdown()

Note how we can specify different handlers for different topics when subscribing agents.

More on filtering

We can also easily modify the subscriptions at run-time.

In the following example, Alice will be publishing messages using topics a and b at the same time. Meanwhile, Bob will first subscribe to topic a. After a few seconds, he will subscribe to topic b while unsubscribing from topic a.

import time

from osbrain import run_agent
from osbrain import run_nameserver


def log_a(agent, message):
    agent.log_info('Log a: %s' % message)


def log_b(agent, message):
    agent.log_info('Log b: %s' % message)


def send_messages(agent):
    agent.send('main', 'Apple', topic='a')
    agent.send('main', 'Banana', topic='b')


if __name__ == '__main__':

    # System deployment
    ns = run_nameserver()
    alice = run_agent('Alice')
    bob = run_agent('Bob')

    # System configuration
    addr = alice.bind('PUB', alias='main')
    alice.each(0.5, send_messages)
    bob.connect(addr, alias='listener', handler={'a': log_a})

    time.sleep(2)

    bob.unsubscribe('listener', 'a')
    bob.subscribe('listener', handler={'b': log_b})

    time.sleep(2)

    ns.shutdown()

Note

Syntax regarding handlers in the subscribe() and unsubscribe() methods is the same that the one used when specifying the handlers on the connect()/bind() call of the SUB socket.

Warning

Calls to the subscribe() method will always override the previous handler for each of the specified topics, if any.

Considerations

Clients versus servers

When using Basic communication patterns we have a lot of flexibility:

  • We are allowed to connect multiple clients to a server.
  • The server can play any role (i.e.: does not need to be always REP, but can be REQ as well). Servers are only defined by the action of binding, not by the role they play in the communication pattern.

For example, if we bind using PUSH and we connect multiple clients to this server, then messages pushed will be distributed among the clients in a Round-robin fashion, which means the first message will be received by the first client, the second message will be received by the second client, and so on.

If we bind using PULL and we connect multiple clients to this server, then messages pushed will all be received by the single server, as expected.

For more information simply refer to the ØMQ guide.

Closing connections

For closing a specific connection from an agent we need to call the close() method, which takes the alias of the socket from the connection we want to close as a parameter.

agent.bind('PUB', alias='connection')
...
agent.close('connection')

There is also a close_all() method, which takes no parameters and will close all user-defined connections of the agent.

Remember that the linger value from the osBrain configuration will be used for the actual socket.close() calls in both methods. For more information, simply refer to the ØMQ guide.

Note

Closing a connection within an agent will have no effect on any possible agents at the other end of the connection. Remember to manually close them as well if the connection is not going to be reused.

Adding new methods

Note that proxies can not only be used to execute methods remotely in the agent, but they can also be used to add new methods or change already existing methods in the remote agent.

In the following example you can see how we can create a couple of functions that are then added to the remote agent as new methods.

In order to add new methods (or change current methods) we only need to call set_method() from the proxy.

from osbrain import run_agent
from osbrain import run_nameserver


def set_x(self, value):
    self.x = value


def set_y(self, value):
    self.y = value


def add_xy(self):
    return self.x + self.y


if __name__ == '__main__':

    # System deployment
    ns = run_nameserver()
    agent = run_agent('Example')

    # System configuration
    agent.set_method(set_x, set_y, add=add_xy)

    # Trying the new methods
    agent.set_x(1)
    agent.set_y(2)
    print(agent.add())

    ns.shutdown()

Note that set_method() accepts any number of parameters:

  • In case they are not named parameters, the function names will be used as the method names in the remote agent.
  • In case they are named parameters, then the method in the remote agent will be named after the parameter name.

Lambdas

osBrain uses cloudpickle when communicating with remote agents through a proxy. This means that almost anything can be serialized to an agent using a proxy.

In order to further simplify some tasks, lambda functions can be used to configure remote agents:

from osbrain import run_agent
from osbrain import run_nameserver

if __name__ == '__main__':

    ns = run_nameserver()
    alice = run_agent('Alice')
    bob = run_agent('Bob')

    addr = alice.bind('REP', handler=lambda agent, msg: 'Received ' + str(msg))
    bob.connect(addr, alias='main')

    for i in range(10):
        bob.send('main', i)
        reply = bob.recv('main')
        print(reply)

    ns.shutdown()

See the similarities between this example and the one showed in Request-Reply. In fact, the only difference is the binding from Alice, in which we are using a lambda function for the handler.

Shutting down

If we want to end the execution of a specific agent in our system, we can do it by calling the Agent.shutdown() method:

import time

from osbrain import run_agent
from osbrain import run_nameserver


def tick(agent):
    agent.log_info('tick')


if __name__ == '__main__':

    ns = run_nameserver()
    a0 = run_agent('Agent0')
    a1 = run_agent('Agent1')

    a0.each(1, tick)
    a1.each(1, tick)
    time.sleep(3)

    a0.shutdown()
    time.sleep(3)

    ns.shutdown()

Shutting down a name server will result in all agents registered in that name server being shut down as well. This allows us to easily shutdown groups of agents at the same time.

Note

We can establish connections between agents registered in different name servers.

OOP

Although the approach of using proxies for the whole configuration process is valid, sometimes the developer may prefer to use OOP to define the behavior of an agent.

This, of course, can be done with osBrain:

import time

from osbrain import Agent
from osbrain import run_agent
from osbrain import run_nameserver


class Greeter(Agent):
    def on_init(self):
        self.bind('PUSH', alias='main')

    def hello(self, name):
        self.send('main', 'Hello, %s!' % name)


class Bob(Agent):
    def custom_log(self, message):
        self.log_info('Received: %s' % message)


if __name__ == '__main__':

    # System deployment
    ns = run_nameserver()
    alice = run_agent('Alice', base=Greeter)
    bob = run_agent('Bob', base=Bob)

    # System configuration
    bob.connect(alice.addr('main'), handler='custom_log')

    # Send messages
    for _ in range(3):
        alice.hello('Bob')
        time.sleep(1)

    ns.shutdown()

Most of the code is similar to the one presented in the Push-Pull example, however you may notice some differences:

  1. When running Alice, a new parameter base is passed to the osbrain.run_agent() function. This means that, instead of running the default agent class, the user-defined agent class will be used instead. In this case, this class is named Greeter.
  2. The Greeter class implements two methods:
    1. on_init(): which is executed on initialization and will, in this case, simply bind a 'PUSH' communication channel.
    2. hello(): which simply logs a Hello message when it is executed.
  3. When connecting Bob to Alice, we need the address where Alice binded to. As the binding was executed on initialization, we need to use the addr() method, which will return the address associated to the alias passed as parameter (in the example above it is main).
  4. When setting a handler that is a method already defined in the agent we simply pass a string with the method name.

Setting initial attributes

Many times, after spawning an agent, we want to set some attributes, which may be used to configure the agent before it starts working with the rest of the multi-agent system:

a0 = run_agent('foo')
a0.set_attr(x=1, y=2)

It is such a common task that a parameter attributes can be used when running the agent for exactly that:

a0 = run_agent('foo', attributes=dict(x=1, y=2))

As you can see, this parameter accepts a dictionary in which the keys are the name of the attributes to be set in the agent and the values are the actual values that this attributes will take.

Note

If you find yourself setting a lot of attributes through a proxy then you might use OOP instead (set attributes on initialization or create a method for that purpose).

Creating proxies to existing name servers

Many times, specially if we are not working with distributed systems, we want to spawn a single name server and run all the agents from a single script. If that is the case, simply by executing the run_nameserver function we would obtain a proxy to the name server.

Sometimes, however, we may need to access name servers that are already running and of which we do not have a proxy available. To do so, we definitely need to know the address of the name server, so make sure you spawn it with a well-known address or save it somewhere to read it later.

You can create a proxy to an already-running name server using the NSProxy class:

from osbrain import NSProxy


ns = NSProxy(nsaddr='127.0.0.1:1234')

Note how we need to specify the name server address.

Note

This might be useful for attaching yourself to an already-running system for manual configuration/update, debugging… If you are just planning to launch and configure your architecture from multiple scripts, then think it twice, as normally you would not need to do so.

Timers

Repeated actions

Timers can be used to repeat an action after a period of time. To illustrate this, let us modify the Push-Pull example a bit and make use of the .each() method:

import time

from osbrain import run_agent
from osbrain import run_nameserver


def log_message(agent, message):
    agent.log_info('Received: %s' % message)


def annoy(agent, say, more=None):
    message = say if not more else say + ' ' + more + '!'
    agent.send('annoy', message)


if __name__ == '__main__':

    ns = run_nameserver()
    orange = run_agent('Orange')
    apple = run_agent('Apple')
    addr = orange.bind('PUSH', alias='annoy')
    apple.connect(addr, handler=log_message)

    # Multiple timers with parameters
    orange.each(1.0, annoy, 'Hey')
    orange.each(1.4142, annoy, 'Apple')
    orange.each(3.1415, annoy, 'Hey', more='Apple')

    time.sleep(10)

    ns.shutdown()

Note that if an action takes longer to run than the time available before the next execution, the timer will simply fall behind.

Note

To be able to start timers on agent initialization, the agent must be already running, which means you cannot do it from .on_init(). Instead, you can use .before_loop(), which will be called inside the .run() method and before starting the main loop.

Delayed actions

Timers can be used to execute an action after a defined time delay using the .after() method:

import time

from osbrain import run_agent
from osbrain import run_nameserver


def delayed(agent):
    agent.log_info('Logged later')


if __name__ == '__main__':

    ns = run_nameserver()
    agent = run_agent('a0')

    agent.after(2, delayed)
    agent.log_info('Logged now')

    time.sleep(2.5)

    ns.shutdown()

Note that if an action takes longer to run than the time available before the next execution, the timer will simply fall behind.

Stopping timers

When executing the .each() or .after() methods, a timer is created and an identifier is returned that can be used later to refer to that timer (i.e.: for stopping it).

In the following example we can see how the returned identifier and an alias parameter can be used to identify the timer and stop it calling the .stop_timer() method:

import time

from osbrain import run_agent
from osbrain import run_nameserver


def delayed(agent, message):
    agent.log_info(message)


if __name__ == '__main__':

    ns = run_nameserver()
    agent = run_agent('a0')

    agent.after(1, delayed, 'Hello!')
    # Timer ID returned
    timer0 = agent.after(1, delayed, 'Never logged')
    # Timer alias set
    agent.after(1, delayed, 'Never logged either', alias='timer_alias')

    # Stop timers by ID and alias
    agent.stop_timer(timer0)
    agent.stop_timer('timer_alias')

    time.sleep(2)

    ns.shutdown()

You can call the .stop_all_timers() method if you would rather simply stop all started timers.

If you want to list the timers that are currently running, you can call the .list_timers() method.

Transport protocol

Available transports

Although the default transport protocol is IPC for operating systems that provide UNIX domain sockets and TCP for the rest, there are other transport protocols that can be used in osBrain:

  • tcp: common TCP. Can always be used, and must be used to communicate agents running in different machines.
  • ipc: common IPC. It is the default and the best suited for communication between agents that run on the same machine
  • inproc: for in-process communication (between threads). The fastest protocol, although it is limited to communication between threads that share the same Agent.context

The transport protocol can be changed on a per-socket basis, per-agent basis and also globally.

Changing the transport

It is possible to change the default global transport protocol by setting the osbrain.config['TRANSPORT'] configuration variable. So, for example, to set TCP as the default transport, we could set:

osbrain.config['TRANSPORT'] = 'tcp'

We can also set the default transport that a particular agent should use by default by passing the transport parameter to run_agent:

agent = run_agent('a0', transport='tcp')

If we do not want to change the global default nor any agent’s default, then we can still change the transport protocol when binding, passing the transport parameter again:

agent = run_agent('a0')
agent.bind('PULL', transport='inproc')

Note

It is also possible to change the global default transport protocol setting the OSBRAIN_DEFAULT_TRANSPORT environment variable.

Serialization

Introduction

osBrain uses by default the pickle module for serialization when passing messages between agents. Serialization is, however, configurable. osBrain can also use dill, cloudpickle, json and raw. Where raw means, actually, no serialization (i.e.: the user is expected to send only raw bytes over those sockets).

Warning

Using some serializers such as pickle, dill or cloudpickle can be security risk. Those serializers allow arbitrary code execution while deserializing data and therefore may wreck or compromise your system. Only use those when communications between agents are secured (i.e.: encrypted or in a local area network).

Note

Note that different serializers might have different limitations. For example, 'json' does not support serializing an object of bytes type, while 'pickle' does support it.

Defining the serializer

Specifying the serializer only makes sense in server sockets, since clients will automatically detect and set the type they need in order to communicate accordingly with the server.

There are three ways in which the serializer can be specified:

  • Global configuration.
  • Specifying it at per-agent configuration.
  • Specifying it at per-socket configuration.

Global configuration

By setting the osbrain.config['SERIALIZER'] configuration variable, we can change the default serializer. For example:

osbrain.config['SERIALIZER'] = 'json'

Note

It is also possible to change the global default serializer by setting the OSBRAIN_DEFAULT_SERIALIZER environment variable.

Per-agent configuration

Specifying the serializer at per-agent level will override the global configuration. This can be done as follows:

a1 = run_agent('a1', serializer='json')

Per-socket configuration

Finally, we can specify the serializer at per-socket level. This will override any other configuration (global/per-agent). For example:

a1 = run_agent('a1', serializer='json')
# Raw serialization will override json for this socket
addr1 = a1.bind('PUB', 'alias1', serializer='raw')

PUBSUB messaging pattern

For the PUBSUB pattern, there is a special character (b'\x80' as of now, even though it could change at any time) that we use so as to let the agents know what is the topic and what is the message itself. Note that the special separator character is only required if there is a topic and the serialization option is NOT set to raw (read below for more information).

Considerations when using raw serialization and PUBSUB pattern

Special care must be taken when working with raw serialization and the PUBSUB messaging pattern. Under those conditions, we decided to replicate the raw ZeroMQ PUBSUB communication, in which the topic is sent along with the message and is the handler the one that must take care of separating the topic from the message it self.

Note that if we are using other type of serialization, it is safe to assume that what we are receiving only the original message, without any traces of the topic.

Advanced proxy handling

Note

Before reading this section make sure you have already understood Agents and proxies.

Understanding proxies better

When an agent is run, there is one thread executing the method Agent.run, which simply listens forever for incoming messages to process them. At the same time, the Agent object is served so it can be accessed through proxies.

In order to prevent concurrent access to the agent, proxy calls are by default serialized and sent to the main thread to be executed there. This way we avoid any concurrent access to memory (only the main thread will make changes to the agent).

However, the user has control and can change that default behavior.

Warning

The user can decide to change the default behavior and make unsafe calls. However, they should be aware of the possible risks they may bring. If not carefully selected, unsafe calls can lead to undefined behavior and hard-to-debug problems.

Executing unsafe calls

You can change the default behavior and make unsafe calls, which means the method call will be executed on another thread separate from the main thread. In order to do so, the Proxy provides us with access to the safe and unsafe attributes, which allows us to force safe or unsafe calls for single remote method calls:

agent = run_agent('test')

agent.unsafe.method_call()  # This forces an unsafe method_call()
agent.safe.method_call()  # This forces a safe method_call()

agent.method_call()  # This is a default (safe) method_call() again

The Proxy behavior can also be changed on a per-proxy basis, either when executing the run_agent function:

agent = run_agent('test', safe=False)

agent.method_call()  # This is now by default an unsafe method_call()

Or when creating a new Proxy to the agent:

run_agent('a0')
agent = Proxy('a0', safe=False)

agent.method_call()  # This is now by default an unsafe method_call()

Note

It is also possible, although totally inadvisable, to change the default proxy behavior globally. Setting the OSBRAIN_DEFAULT_SAFE environment variable to false would result in all proxies making unsafe calls by default. Another option is to change the osbrain.config['SAFE'] configuration variable.

Executing one-way, unsafe calls

In some situations, you might want to execute unsafe calls in parallel without waiting for any return. In that case, you can make use of the oneway proxy attribute:

agent_proxy.oneway.method_call()

This method call will return immediately, and the code will be executed in the remote agent concurrently with the main thread and any other unsafe or one-way calls.

Warning

Do note that oneway calls are actually executed in a separate thread, which means they behave like unsafe calls. If not used with care, this concurrency may result in unexpected hard-to-debug behavior.

Advanced communication patterns

Some advanced communication patterns are also implemented in osBrain. They are called channels, rather than sockets, as they are formed with multiple sockets.

Asynchronous Request-Reply channel

An asynchronous request-reply channel is very similar to a normal request-reply pattern, only in this case we want to process the reply asynchronously (i.e.: assign a handler for this reply and forget about it until it is received).

This means that, when using an asynchronous request-reply channel, a handler must be specified for both the replier and the requester.

Example

See the following example in which Bob sends an asynchronous request to Alice. Note that:

  • When Alice binds, she uses ASYNC_REP instead of REP.
  • Bob is assigned a handler too to process Alice’s reply when it is received.
  • Bob sends the request and is automatically freed. It can log messages or do other stuff.
  • When the reply is received, Bob automatically processes it.
import time

from osbrain import run_agent
from osbrain import run_nameserver


def reply_late(agent, message):
    time.sleep(1)
    return 'Hello, Bob!'


def process_reply(agent, message):
    agent.log_info('Processed reply: %s' % message)


if __name__ == '__main__':

    ns = run_nameserver()
    alice = run_agent('Alice')
    bob = run_agent('Bob')

    addr = alice.bind('ASYNC_REP', handler=reply_late)
    bob.connect(addr, alias='alice', handler=process_reply)

    bob.send('alice', 'Hello, Alice!')
    bob.log_info('I am done!')

    bob.log_info('Waiting for Alice to reply...')
    time.sleep(2)

    ns.shutdown()

That is the most basic example, but channels are a bit more flexible. When sending an asynchronous request we may want to manually specify a handler for the reply. This will overwrite the default handler specified when connecting to the ASYNC_REP server. Try changing the .send() call:

def deaf(agent, message):
    agent.log_info('I am deaf...')


bob.send('alice', 'Hello, Alice!', handler=deaf)

We can also specify a maximum wait time for the reply and some code to be executed in case the reply was not received after that time. Try the example above, but setting wait=0.5:

def no_reply_in_time(agent):
    agent.log_warning('No reply received!')


bob.send('alice', 'Hello, Alice!', wait=0.5, on_error=no_reply_in_time)

Note

If on_error is not specified, by default the agent will simply log a warning.

Synced Publish-Subscribe channel

Another common use case is the synced publish-subscribe channel. It is similar to the normal publish-subscribe pattern but has some extra functionality. The server is configured to publish messages through a PUB socket and clients can connect to it to receive these messages. Using the synced publish-subscribe channel, however, allows the clients to send requests to the publisher.

This can be really useful when we want to share some data from the publisher with all the subscribers but we want the subscribers to be able to interact (and perhaps even modify) this data.

Note

Synchronization is possible because all replies are sent to the requester through the same PUB-SUB socket as the one used for normal publications. Note that, even though this reply is sent through the PUB-SUB socket, it will only be received by the requester and not by any other subscribed client.

Example

In the following example we have:

  • A synced publisher that requires a handler for requests when binding.
  • Two subscribers that connect to the publisher and subscribe to all topics.
  • The publisher starts sending publications, which are received in both subscribers.
  • At some point one of the clients makes a request, which is processed by the publisher. The client asynchronously receives and processes the reply from the publisher. Only the agent that made the request receives this message.
import time

from osbrain import run_agent
from osbrain import run_nameserver


def publish(agent):
    agent.send('publisher', 'Publication...')


def reply_back(agent, message):
    return 'Received %s' % message


def read_subscription(agent, message):
    agent.log_info('Read: "%s"' % message)


def process_reply(agent, message):
    agent.log_info('Publisher replied with: "%s"' % message)


if __name__ == '__main__':

    ns = run_nameserver()
    publisher = run_agent('Publisher')
    client_a = run_agent('Client-A')
    client_b = run_agent('Client-B')

    addr = publisher.bind('SYNC_PUB', alias='publisher', handler=reply_back)
    client_a.connect(addr, alias='publisher', handler=read_subscription)
    client_b.connect(addr, alias='publisher', handler=read_subscription)

    publisher.each(0.1, publish)
    time.sleep(1)
    client_a.send('publisher', 'Request from A!', handler=process_reply)
    time.sleep(1)

    ns.shutdown()

Note

Even though we have not used topic filtering in this example, you can definitely make use of it, just like with a normal publish-subscribe pattern.

Just like with Asynchronous Request-Reply channel, you can further customize the requests by specifying a maximum wait time and a function/method to be executed in case no reply was received after that wait time.

Distributed systems

Note

Before diving into osBrain distributed systems, make sure to read and understand the Serialization and Security sections.

Name servers

Normally you would want to have a centralized name server when working with distributed systems, but that is not mandatory, for some applications it might be more convenient to have multiple name servers in different machines, each handling different agents.

If you want to create a proxy to a remote name server, simply refer to the Creating proxies to existing name servers section.

Binding with distributed systems

Working with distributed systems is easy, you just basically need to make sure you use TCP transport and to specify the network interface IP address that you want to bind to.

This can be done when binding (note that we are using 127.0.0.1 but you may use any other IP address available in your system):

address = agent.bind('PULL', transport='tcp', addr='127.0.0.1')

Defining just the IP address means the agent will bind to a random port, which is usually fine. However, you can also specify the port to bind to:

address = agent.bind('PULL', transport='tcp', addr='127.0.0.1:1234')

Remember that the default transport can also be changed globally or per-agent (read the Serialization section).

Name server, proxies and addresses

Name servers are very useful, and even more when working with distributed systems. Where is that agent? Which was that address?

If we have access to a name server through a name server proxy, remember that we can very easily create a proxy to one of its registered agents:

agent_proxy = name_server_proxy.proxy('Agent_name')

And then, once we have a proxy to the agent, we can very easily retrieve any address by its alias:

address = agent_proxy.addr('address_alias')

As you can see, aliases become specially useful in distributed systems!

Proxies and multiple scripts

With distributed systems, it is very common to have multiple scripts (many times spread across multiple machines). If those are to interact together, you should consider whether a single/shared name server should be used. This simplifies the way you can get proxies to every agent and also the way they may share information between them.

If you are just starting with distributed systems and osBrain, you may approach it the following way:

  • You create one script, in which you run some agents and configure them.
  • You create another script, in which you run some more agents and configure those as well.
  • From the second script, you access agents that you created on the first and expect them to be ready and completely configured. But it turns out that might not be the case.

To avoid this issue, try to do all the configuration of agents from a single script, even if those agents are created from different ones. Remember that proxies allow you to treat remote agents just like local objects, which means it does not really matter if they are running in one machine or another.

Note

If you want to make sure the agent is running before using it, you can use the Proxy.wait_for_running() method.

Security

Warning

osBrain should be considered unsafe when used with remote machines. This package has some security risks. Understanding the risks is very important to avoid creating systems that are very easy to compromise by malicious entities.

Serialization in osBrain

osBrain can use different serializers when passing messages between agents and when configuring and deploying the multi-agent architectures. Among this serializers, some are considered unsafe (pickle, cloudpickle…).

Using these unsafe serializers is a security risk. The main problem is that allowing a program to deserialize arbitrary data can cause arbitrary code execution and this may wreck or compromise your system. Therefore, osBrain is meant to be run only within trusted networks (i.e.: LANs) or with properly encrypted/safe communications (see Protocol encryption).

Network interface binding

By default osBrain binds every server on localhost, to avoid exposing things on a public network or over the internet by mistake. If you want to expose your osBrain agents to anything other than localhost, you have to explicitly tell osBrain the network interface address it should use. This means it is a conscious effort to expose agents to remote machines.

Protocol encryption

osBrain doesn’t encrypt the data it sends over the network. This means you must not transfer sensitive data on untrusted networks (especially user data, passwords, and such) because it is possible to eavesdrop. Either encrypt the data yourself before passing, or run osBrain over a secure network (VPN or SSH tunnel).

Developers

Workflow

We are happy you like osBrain and we would love to receive contributions from you! Note that contributions do not necessarily need to include code: you can help telling us what problems you are having with osBrain or suggesting improvements to the documentation.

If you would like to help us with code, proceed to fork the project, make the changes you want and then submit a pull request to start a discussion. Take into account that we follow some rules for development:

  • We like to follow style standards (i.e.: PEP8). But do not worry, our test suite will help you with that and tell you if you wrote something wrong.
  • We like tests, so any new functionality should also include the corresponding tests.
  • We like documentation, so changes in the code should also include changes in the code docstrings and changes in the user documentation if they affect how the user may use osBrain.

Installing dependencies

To install the required dependencies for developing osBrain, you can make use of the provided requirements.txt file:

pip install -r requirements.txt

Running the tests

Running the tests locally is very simple, first install Tox:

pip install tox

And run it from the top-level path of the project:

tox

That single command will run all the tests for all the supported Python versions available in your system or environment.

For faster results you may want to run all the tests just against a single Python version. This command will run all tests against Python 3.6 only:

tox -e py36

When running Tox, tests are actually executed with pytest. Although not recommended, you might want to directly use that tool for finer control:

pytest -n 8

If you just want to run a handful of behavior tests (common when developing new functionality), just run:

pytest -k keyword

Note

Before submitting your changes for review, make sure all tests pass with tox, as the continuous integration system will run all those checks as well.

Generating documentation

Documentation is generated with Sphinx. In order to generate the documentation locally you need to run make from the docs/ directory:

make html

osBrain library API

This chapter describes osBrain’s library API. All osBrain classes and functions are defined in sub packages such as osbrain.agent, but for ease of use, the most important ones are also placed in the osbrain package scope.

osbrain — Main API package

osbrain is the main package of osBrain. It imports most of the other packages that it needs and provides shortcuts to the most frequently used objects and functions from those packages. This means you can mostly just import osbrain in your code to start using osBrain.

The classes and functions provided are:

symbol in osbrain referenced location
class osbrain.Agent
osbrain.agent.Agent
osbrain.run_agent()
osbrain.agent.run_agent()
osbrain.run_nameserver()
osbrain.nameserver.run_nameserver()
class osbrain.Proxy
osbrain.proxy.Proxy
class osbrain.NSProxy
osbrain.proxy.NSProxy
class osbrain.Logger
osbrain.logging.Logger
osbrain.run_logger()
osbrain.logging.run_logger()
class osbrain.SocketAddress
osbrain.address.SocketAddress
class osbrain.AgentAddress
osbrain.address.AgentAddress

See also

Module osbrain.agent
The agent classes and functions.
Module osbrain.nameserver
The name server logic.
Module osbrain.proxy
The proxy classes and functions.
Module osbrain.address
The address classes and functions.
Module osbrain.logging
The logging classes and functions.

osbrain.agent — osBrain agent logic

Core agent classes.

class osbrain.agent.Agent(name='', host=None, serializer=None, transport=None, attributes=None)

Bases: object

A base agent class which is to be served by an AgentProcess.

An AgentProcess runs a Pyro multiplexed server and serves one Agent object.

Parameters:
  • name (str, default is None) – Name of the Agent.
  • host (str, default is None) – Host address where the agent will bind to. When not set, '127.0.0.1' (localhost) is used.
  • transport (str, AgentAddressTransport, default is None) – Transport protocol.
  • attributes (dict, default is None) – A dictionary that defines initial attributes for the agent.
name

Name of the agent.

Type:str
_host

Host address where the agent is binding to.

Type:str
_uuid

Globally unique identifier for the agent.

Type:bytes
_running

Set to True if the agent is running (executing the main loop).

Type:bool
_serializer

Default agent serialization format.

Type:str
_transport

Default agent transport protocol.

Type:str, AgentAddressTransport, default is None
_socket

A dictionary in which the key is the address or the alias and the value is the actual socket.

Type:dict
_address

A dictionary in which the key is the address or the alias and the value is the actual address.

Type:dict
_handler

A dictionary in which the key is the socket and the values are the handlers for each socket.

Type:dict
_context

ZMQ context to create ZMQ socket objects.

Type:zmq.Context
_poller

ZMQ poller to wait for incoming data from sockets.

Type:zmq.Poller
_poll_timeout

Polling timeout, in milliseconds. After this timeout, if no message is received, the agent executes de idle() method before going back to polling.

Type:int
_keep_alive

When set to True, the agent will continue executing the main loop.

Type:bool
_async_req_uuid

Stores the UUIDs of the asynchronous request sockets (used in communication channels).

Type:dict
_async_req_handler

Stores the handler for every asynchronous request sockets (used in communication channels).

Type:dict
_die_now

During shutdown, this attribute is set for the agent to die.

Type:bool
_DEBUG

Whether to print debug level messages.

Type:bool
_pending_requests

Stores pending (waiting for reply) asynchronous requests. The asynchronous request UUID is used as key and its handler as the value.

Type:dict
_timer

Stores all the current active timers, using their aliases as keys.

Type:dict
addr(alias)

Return the address of a socket given by its alias.

Parameters:alias (str) – Alias of the socket whose address is to be retrieved.
Returns:Address of the agent socket associated with the alias.
Return type:AgentAddress
after(delay, method, *args, alias=None, **kwargs)

Execute an action after a delay.

Parameters:
  • delay (float) – Execute the action after delay seconds.
  • method – Method (action) to be executed by the agent.
  • alias (str, default is None) – An alias for the generated timer.
  • *args (tuple) – Parameters to pass for the method execution.
  • **kwargs (dict) – Named parameters to pass for the method execution.
Returns:

The timer alias or identifier.

Return type:

str

before_loop()

This user-defined method is to be executed right before the main loop.

bind(kind, alias=None, handler=None, addr=None, transport=None, serializer=None)

Bind to an agent address.

Parameters:
  • kind (str, AgentAddressKind) – The agent address kind: PUB, REQ…
  • alias (str, default is None) – Optional alias for the socket.
  • default is None (handler,) – If the socket receives input messages, the handler/s is/are to be set with this parameter.
  • addr (str, default is None) – The address to bind to.
  • transport (str, AgentAddressTransport, default is None) – Transport protocol.
Returns:

The address where the agent binded to.

Return type:

AgentAddress

close(alias, linger=None)

Close a socket given its alias and clear its entry from the Agent._socket dictionary.

close_all(linger=None)

Close all non-internal zmq sockets.

connect(server, alias=None, handler=None)

Connect to a server agent address.

Parameters:
  • server (AgentAddress) – Agent address to connect to.
  • alias (str, default is None) – Optional alias for the new address.
  • default is None (handler,) – If the new socket receives input messages, the handler/s is/are to be set with this parameter.
each(period, method, *args, alias=None, **kwargs)

Execute a repeated action with a defined period.

Parameters:
  • period (float) – Repeat the action execution with a delay of period seconds between executions.
  • method – Method (action) to be executed by the agent.
  • alias (str, default is None) – An alias for the generated timer.
  • *args (tuple) – Parameters to pass for the method execution.
  • **kwargs (dict) – Named parameters to pass for the method execution.
Returns:

The timer alias or identifier.

Return type:

str

execute_as_function(function, *args, **kwargs)

Execute a function passed as parameter.

execute_as_method(function, *args, **kwargs)

Execute a function as a method, without adding it to the set of agent methods.

get_attr(name: str)

Return the specified attribute of the agent.

Parameters:name – Name of the attribute to be retrieved.
has_socket(alias)

Return whether the agent has the passed socket internally stored.

idle()

This function is to be executed when the agent is idle.

After a timeout occurs when the agent’s poller receives no data in any of its sockets, the agent may execute this function.

Note

The timeout is set by the agent’s poll_timeout attribute.

is_running()

Returns a boolean indicating whether the agent is running or not.

kill()

Force shutdown of the agent.

If the agent is running the ZMQ context is terminated to allow the main thread to quit and do the tear down.

list_timers()

Return a list with all timer aliases currently running.

Returns:A list with all the timer aliases currently running.
Return type:list (str)
log_debug(message, logger='_logger')

Log a debug message.

Parameters:
  • message (str) – Message to log.
  • logger (str) – Alias of the logger.
log_error(message, logger='_logger')

Log an error message.

Parameters:
  • message (str) – Message to log.
  • logger (str) – Alias of the logger.
log_info(message, logger='_logger')

Log an info message.

Parameters:
  • message (str) – Message to log.
  • logger (str) – Alias of the logger.
log_warning(message, logger='_logger')

Log a warning message.

Parameters:
  • message (str) – Message to log.
  • logger (str) – Alias of the logger.
on_init()

This user-defined method is to be executed after initialization.

ping()

A test method to check the readiness of the agent. Used for testing purposes, where timing is very important. Do not remove.

raise_exception()

Raise an exception (for testing purposes).

recv(address)

Receive a message from the specified address.

This method is only used in REQREP communication patterns.

Parameters:address
Returns:The content received in the address.
Return type:anything
run()

Start the main loop.

safe_call(method, *args, **kwargs)

A safe call to a method.

A safe call is simply sent to be executed by the main thread.

Parameters:
  • method (str) – Method name to be executed by the main thread.
  • *args (arguments) – Method arguments.
  • *kwargs (keyword arguments) – Method keyword arguments.
send(address, message, topic=None, handler=None, wait=None, on_error=None)

Send a message through the specified address.

Note that replies in a REQREP pattern do not use this function in order to be sent.

Parameters:
  • address (AgentAddress or AgentChannel) – The address to send the message through.
  • message – The message to be sent.
  • topic (str) – The topic, in case it is relevant (i.e.: for PUB sockets).
  • handler (function, method or string) – Code that will be executed on input messages if relevant (i.e.: for asynchronous requests in channels).
  • wait (float) – For channel requests, wait at most this number of seconds for a response from the server.
  • on_error (function, method or string) – Code to be executed if wait is passed and the response is not received.
send_recv(address, message)

This method is only used in REQREP communication patterns.

set_attr(**kwargs)

Set object attributes.

Parameters:kwargs ([name, value]) – Keyword arguments will be used to set the object attributes.
set_logger(logger, alias='_logger')

Connect the agent to a logger and start logging messages to it.

set_method(*args, **kwargs)

Set object methods.

Parameters:
  • args ([function]) – New methods will be created for each function, taking the same name as the original function.
  • kwargs ([name, function]) – New methods will be created for each function, taking the name specified by the parameter.
Returns:

Name of the registered method in the agent.

Return type:

str

shutdown()

Cleanly stop and shut down the agent assuming the agent is running.

Will let the main thread do the tear down.

stop()

Stop the agent. Agent will stop running.

stop_all_timers()

Stop all currently running timers.

stop_timer(alias)

Stop a currently running timer.

Parameters:alias (str) – The alias or identifier of the timer.
subscribe(alias: str, handler: Dict[Union[bytes, str], Any]) → None

Subscribe a SUB/SYNC_SUB socket given by its alias to the given topics, and leave the handlers prepared internally.

Parameters:
  • alias – Alias of the new subscriber socket.
  • handler – A dictionary in which the keys represent the different topics and the values the actual handlers. If, instead of a dictionary, a single handler is given, it will be used to subscribe the agent to any topic.
unsubscribe(alias: str, topic: Union[bytes, str]) → None

Unsubscribe a SUB/SYNC_SUB socket given by its alias from a given specific topic, and delete its entry from the handlers dictionary.

If instead of a single topic, a tuple or a list of topics is passed, the agent will unsubscribe from all the supplied topics.

class osbrain.agent.AgentProcess(name='', nsaddr=None, addr=None, serializer=None, transport=None, base=<class 'osbrain.agent.Agent'>, attributes=None)

Bases: multiprocessing.context.Process

Agent class. Instances of an Agent are system processes which can be run independently.

kill()

Force kill the agent process.

run()

Begin execution of the agent process and start the main loop.

start()

Start the system process.

Raises:RuntimeError – If an error occurred when initializing the daemon.
osbrain.agent.compose_message(message: bytes, topic: bytes, serializer: osbrain.address.AgentAddressSerializer) → bytes

Compose a message and leave it ready to be sent through a socket.

This is used in PUB-SUB patterns to combine the topic and the message in a single bytes buffer.

Parameters:
  • message – Message to be composed.
  • topic – Topic to combine the message with.
  • serializer – Serialization for the message part.
Returns:

Return type:

The bytes representation of the final message to be sent.

osbrain.agent.deserialize_message(message, serializer)

Check if a message needs to be deserialized and do it if that is the case.

Parameters:
  • message (bytes, memoryview) – The serialized message.
  • serializer (AgentAddressSerializer) – The type of (de)serializer that should be used.
Returns:

The deserialized message, or the same message in case no deserialization is needed.

Return type:

anything

osbrain.agent.execute_code_after_yield(generator)

Some responses are dispatched with yield (generator handler). In those cases we still want to execute the remaining code in the generator, and also make sure it does not yield any more.

Parameters:generator – The handler that already yielded one result and is not expected to yield again.
Raises:ValueError – If the generator yielded once more, which is unexpected.
osbrain.agent.run_agent(name='', nsaddr=None, addr=None, base=<class 'osbrain.agent.Agent'>, serializer=None, transport=None, safe=None, attributes=None)

Ease the agent creation process.

This function will create a new agent, start the process and then run its main loop through a proxy.

Parameters:
  • name (str, default is '') – Agent name or alias.
  • nsaddr (SocketAddress, default is None) – Name server address.
  • addr (SocketAddress, default is None) – New agent address, if it is to be fixed.
  • transport (str, AgentAddressTransport, default is None) – Transport protocol.
  • safe (bool, default is None) – Use safe calls by default from the Proxy.
  • attributes (dict, default is None) – A dictionary that defines initial attributes for the agent.
Returns:

A proxy to the new agent.

Return type:

proxy

osbrain.agent.serialize_message(message, serializer)

Check if a message needs to be serialized and do it if that is the case.

Parameters:
  • message (anything) – The message to serialize.
  • serializer (AgentAddressSerializer) – The type of serializer that should be used.
Returns:

The serialized message, or the same message in case no serialization is needed.

Return type:

bytes

osbrain.address — osBrain address logic

Implementation of address-related features.

class osbrain.address.AgentAddress(transport, address, kind, role, serializer)

Bases: object

Agent address information consisting on the transport protocol, address, kind and role.

Parameters:
transport

Agent transport protocol.

Type:str, AgentAddressTransport
address

Agent address.

Type:str, SocketAddress
kind

Agent kind.

Type:AgentAddressKind
role

Agent role.

Type:AgentAddressRole
serializer

Agent serializer.

Type:AgentAddressSerializer
twin()

Return the twin address of the current one.

While the host and port are kept for the twin, the kind and role change to their corresponding twins, according to the rules defined in the respective classes.

Returns:The twin address of the current one.
Return type:AgentAddress
class osbrain.address.AgentAddressKind

Bases: str

Agent’s address kind class.

This kind represents the communication pattern being used by the agent address: REP, PULL, PUB…

REQUIRE_HANDLER = ('REP', 'PULL', 'SUB', 'PULL_SYNC_PUB')
TWIN = {'PUB': 'SUB', 'PULL': 'PUSH', 'PULL_SYNC_PUB': 'PUSH_SYNC_SUB', 'PUSH': 'PULL', 'PUSH_SYNC_SUB': 'PULL_SYNC_PUB', 'REP': 'REQ', 'REQ': 'REP', 'SUB': 'PUB'}
ZMQ_KIND_CONVERSION = {'PUB': 1, 'PULL': 7, 'PULL_SYNC_PUB': 7, 'PUSH': 8, 'PUSH_SYNC_SUB': 8, 'REP': 4, 'REQ': 3, 'SUB': 2}
requires_handler()

Whether the Agent’s address kind requires a handler or not. A socket which processes incoming messages would require a handler (i.e. ‘REP’, ‘PULL’, ‘SUB’…).

Returns:
Return type:bool
twin()

Get the twin kind of the current one.

REQ would be the twin of REP and viceversa, PUB would be the twin of SUB and viceversa, etc.

Returns:The twin kind of the current one.
Return type:AgentAddressKind
zmq()

Get the equivalent ZeroMQ socket kind.

Returns:
Return type:int
class osbrain.address.AgentAddressRole

Bases: str

Agent’s address role class. It can either be 'server' or 'client'.

twin()

Get the twin role of the current one. 'server' would be the twin of 'client' and viceversa.

Returns:The twin role.
Return type:AgentAddressRole
class osbrain.address.AgentAddressSerializer(value)

Bases: str

Agent’s address serializer class.

Each communication channel will have a serializer.

Note that for raw message passing, everything must be on bytes, and the programmer is the one responsible for converting data to bytes.

Parameters:serializer_type (str) – Serializer type (i.e.: ‘raw’, ‘pickle’, ‘cloudpickle’, ‘dill’, ‘json’).
SERIALIZER_SEPARATOR = ('pickle', 'cloudpickle', 'dill', 'json')
SERIALIZER_SIMPLE = ('raw',)
class osbrain.address.AgentAddressTransport

Bases: str

Agent’s address transport class. It can be ‘tcp’, ‘ipc’ or ‘inproc’.

class osbrain.address.AgentChannel(kind, receiver, sender, twin_uuid=None)

Bases: object

Agent channel information.

Channels are communication means with sender and receiver in both sides (i.e.: PULL+PUB - PUSH-SUB or PULL+PUSH - PUSH+PULL).

Parameters:
  • kind (AgentChannelKind) – Agent kind.
  • sender (str) – First AgentAddress.
  • receiver (str) – Second AgentAddress.
kind

Agent kind.

Type:AgentChannelKind
sender

First AgentAddress.

Type:str
receiver

Second AgentAddress.

Type:str
twin()

Get the twin channel of the current one.

Returns:The twin channel.
Return type:AgentChannel
class osbrain.address.AgentChannelKind

Bases: str

Agent’s channel kind class.

This kind represents the communication pattern being used by the agent channel: ASYNC_REP, STREAM…

TWIN = {'ASYNC_REP': 'ASYNC_REQ', 'ASYNC_REQ': 'ASYNC_REP', 'SYNC_PUB': 'SYNC_SUB', 'SYNC_SUB': 'SYNC_PUB'}
twin()

Get the twin kind of the current one.

REQ would be the twin of REP and viceversa, PUB would be the twin of SUB and viceversa, etc.

Returns:
Return type:AgentChannelKind
class osbrain.address.SocketAddress(host, port)

Bases: object

Socket address information consisting on the host and port.

Parameters:
  • host (str, ipaddress.IPv4Address) – IP address.
  • port (int) – Port number.
host

IP address.

Type:ipaddress.IPv4Address
port

Port number.

Type:int
osbrain.address.address_to_host_port(addr)

Try to convert an address to a (host, port) tuple.

Parameters:addr (str, SocketAddress) –
Returns:A (host, port) tuple formed with the corresponding data.
Return type:tuple
osbrain.address.guess_kind(kind)

Guess if a kind string is an AgentAddressKind or AgentChannelKind.

Parameters:kind (str) – The AgentAddressKind or AgentChannelKind in string format.
Returns:The actual kind type.
Return type:AgentAddressKind or AgentChannelKind

osbrain.common — osBrain common logic

Miscellaneous utilities.

class osbrain.common.LogLevel

Bases: str

Identifies the log level: ERROR, WARNING, INFO, DEBUG.

osbrain.common.after(delay, action, *args)

Execute an action after a given number of seconds.

This function is executed in a separate thread.

Parameters:
  • delay (float) – Number of seconds to delay the action.
  • action – To be taken after the interval.
  • args (tuple, default is ()) – Arguments for the action.
Returns:

A timer object that can be terminated using the stop() method.

Return type:

Event

osbrain.common.format_exception()

Represent a traceback exception as a string in which all lines start with a | character.

Useful for differentiating remote from local exceptions and exceptions that where silenced.

Returns:A formatted string containing an exception traceback information.
Return type:str
osbrain.common.format_method_exception(error, method, args, kwargs)

Represent an exception as a formatted string that includes the name and arguments of the method where it occurred, followed by the output of format_exception.

Parameters:
  • error (Error) – The exception that was raised.
  • method (function) – The method where the exception was raised.
  • args – The arguments of the method call.
  • kwargs (dict) – The keyword arguments of the method call.
Returns:

The formatted string with the method call and traceback information.

Return type:

str

osbrain.common.get_linger(seconds=None)

Wrapper to get the linger option from the environment variable.

Parameters:seconds (float, default is None.) – Linger seconds, in seconds.
Returns:Number of seconds to linger. Note that -1 means linger forever.
Return type:int
osbrain.common.repeat(interval, action, *args)

Repeat an action forever after a given number of seconds.

If a sequence of events takes longer to run than the time available before the next event, the repeater will simply fall behind.

This function is executed in a separate thread.

Parameters:
  • interval (float) – Number of seconds between executions.
  • action – To be taken after the interval.
  • args (tuple, default is ()) – Arguments for the action.
Returns:

A timer object that can be terminated using the stop() method.

Return type:

Event

osbrain.common.topic_to_bytes(topic: Union[bytes, str]) → bytes

Return the passed topic as a bytes object.

osbrain.common.topics_to_bytes(handlers: Dict[Union[bytes, str], Any], uuid: bytes = b'')

Given some pairs topic/handler, leaves them prepared for making the actual ZeroMQ subscription.

Parameters:
  • handlers – Contains pairs “topic - handler”.
  • uuid – uuid of the SYNC_PUB/SYNC_SUB channel (if applies). For normal PUB/SUB communication, this should be b''.
Returns:

Return type:

Dict[bytes, Any]

osbrain.common.unbound_method(method)
Returns:Unbounded function.
Return type:function
osbrain.common.unique_identifier() → bytes
Returns:
  • A unique identifier that is safe to use in PUB-SUB communication
  • patterns (i.e. (does not contain the)
  • osbrain.agent.TOPIC_SEPARATOR character).
osbrain.common.validate_handler(handler, required)

Raises a ValueError exception when a required is handler but not present.

osbrain.logging — osBrain logging logic

Implementation of logging-related features.

class osbrain.logging.Logger(name='', host=None, serializer=None, transport=None, attributes=None)

Bases: osbrain.agent.Agent

Specialized Agent for logging. Binds a SUB socket and starts logging incoming messages.

log_handler(message, topic)

Handle incoming log messages.

on_init()

Initialize attributes.

osbrain.logging.pyro_log()

Set environment variables to activate Pyro logging. The log level is set to “DEBUG”.

osbrain.logging.run_logger(name, nsaddr=None, addr=None, base=<class 'osbrain.logging.Logger'>)

Ease the logger creation process.

This function will create a new logger, start the process and then run its main loop through a proxy.

Parameters:
  • name (str) – Logger name or alias.
  • nsaddr (SocketAddress, default is None) – Name server address.
  • addr (SocketAddress, default is None) – New logger address, if it is to be fixed.
Returns:

A proxy to the new logger.

Return type:

proxy

osbrain.proxy — osBrain proxy logic

Implementation of proxy-related features.

class osbrain.proxy.NSProxy(nsaddr=None, timeout=3)

Bases: Pyro4.core.Proxy

A proxy to access a name server.

Parameters:
  • nsaddr (SocketAddress, str) – Name server address.
  • timeout (float) – Timeout, in seconds, to wait until the name server is discovered.
addr(agent_alias=None, address_alias=None)

Return the name server address or the address of an agent’s socket.

Parameters:
  • agent_alias (str, default is None) – The alias of the agent to retrieve its socket address.
  • address_alias (str, default is None) – The alias of the socket address to retrieve from the agent.
Returns:

The name server or agent’s socket address.

Return type:

SocketAddress or AgentAddress

proxy(name, timeout=3.0)

Get a proxy to access an agent registered in the name server.

Parameters:
  • name (str) – Proxy name, as registered in the name server.
  • timeout (float) – Timeout, in seconds, to wait until the agent is discovered.
Returns:

A proxy to access an agent registered in the name server.

Return type:

Proxy

release()

Release the connection to the Pyro daemon.

shutdown(timeout=10.0)

Shutdown the name server. All agents will be shutdown as well.

Parameters:timeout (float, default is 10.) – Timeout, in seconds, to wait for the agents to shutdown.
shutdown_agents(timeout=10.0)

Shutdown all agents registered in the name server.

Parameters:timeout (float, default is 10.) – Timeout, in seconds, to wait for the agents to shutdown.
class osbrain.proxy.Proxy(name, nsaddr=None, timeout=3.0, safe=None)

Bases: Pyro4.core.Proxy

A proxy to access remote agents.

Parameters:
  • name (str) – Proxy name, as registered in the name server.
  • nsaddr (SocketAddress, str) – Name server address.
  • timeout (float) – Timeout, in seconds, to wait until the agent is discovered.
  • safe (bool, default is None) – Use safe calls by default. When not set, osbrain default’s osbrain.config['SAFE'] is used.
nsaddr()

Get the socket address of the name server.

Returns:The socket address.
Return type:SocketAddress
oneway

Make the next remote method call be one way.

Returns:
Return type:The proxy itself.
release()

Release the connection to the Pyro daemon.

safe

Make the next remote method call be safe.

Returns:
Return type:The proxy itself.
unsafe

Make the next remote method call be unsafe.

Returns:
Return type:The proxy itself.
wait_for_running(timeout=3.0)

Wait until the agent is running.

Parameters:timeout (float) – Raise and exception if the agent is not running after this number of seconds. Use a negative value to wait forever.
Raises:TimeoutError – If the agent is not running after the given timeout.
Returns:The object itself.
Return type:Proxy
osbrain.proxy.locate_ns(nsaddr, timeout=3.0)

Locate a name server to ensure it actually exists.

Parameters:
  • nsaddr (SocketAddress) – The address where the name server should be up and running.
  • timeout (float) – Timeout in seconds before aborting location.
Returns:

The address where the name server was located.

Return type:

nsaddr

Raises:

NamingError – If the name server could not be located.

osbrain.nameserver — osBrain nameserver logic

Implementation of name server.

class osbrain.nameserver.NameServer(*args, **kwargs)

Bases: Pyro4.naming.NameServer

agents()

List agents registered in the name server.

async_kill_agents(nsaddr)

Kill all agents registered in the name server, with no mercy.

async_shutdown_agents(nsaddr)

Shutdown all agents registered in the name server.

daemon_shutdown()

Shutdown the name server daemon.

ping()

A simple test method to check if the name server is running correctly.

class osbrain.nameserver.NameServerProcess(addr=None, base=<class 'osbrain.nameserver.NameServer'>)

Bases: multiprocessing.context.Process

Name server class. Instances of a name server are system processes which can be run independently.

agents()

List agents registered in the name server.

run()

Begin execution of the name server process and start the main loop.

shutdown()

Shutdown the name server. All agents will be shutdown as well.

shutdown_all()

Shutdown all agents registered in the name server.

start()

Start the system process.

Raises:RuntimeError – If an error occurred when initializing the daemon.
osbrain.nameserver.random_nameserver_process(host='127.0.0.1', port_start=10000, port_stop=20000, timeout=3.0, base=<class 'osbrain.nameserver.NameServer'>)

Start a random NameServerProcess.

Parameters:
  • host (str, default is '127.0.0.1') – Host address where the name server will bind to.
  • port_start (int) – Lowest port number allowed.
  • port_stop (int) – Highest port number allowed.
Returns:

The name server process started.

Return type:

NameServerProcess

osbrain.nameserver.run_nameserver(addr=None, base=<class 'osbrain.nameserver.NameServer'>)

Ease the name server creation process.

This function will create a new nameserver, start the process and then run its main loop through a proxy.

Parameters:addr (SocketAddress, default is None) – Name server address.
Returns:A proxy to the name server.
Return type:proxy

Software License and Disclaimer

Copyright 2016 Open Sistemas de Información Internet S.L.

Licensed under the Apache License, Version 2.0 (the “License”);
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an “AS IS” BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Indices and tables