paco ¶
Small and idiomatic utility library for coroutine-driven asynchronous generic programming in Python +3.4.
Built on top of asyncio, paco
provides missing capabilities from Python stdlib
in order to write asynchronous cooperative multitasking in a nice-ish way.
Also, paco aims to port some of functools and itertools standard functions to the asynchronous world.
paco
can be your utility belt to deal with asynchronous, I/O-bound, non-blocking concurrent code in a cleaner and idiomatic way.
Features¶
- Simple and idiomatic API, extending Python
stdlib
with async coroutines gotchas. - Built-in configurable control-flow concurrency support (throttle).
- No fancy abstractions: it just works with the plain asynchronous coroutines.
- Useful iterables, decorators, functors and convenient helpers.
- Coroutine-based functional helpers:
compose
,throttle
,partial
,timeout
,times
,until
,race
… - Asynchronous coroutines port of Python built-in functions: filter, map, dropwhile, filterfalse, reduce…
- Supports asynchronous iterables and generators (PEP0525)
- Concurrent iterables and higher-order functions.
- Better
asyncio.gather()
andasyncio.wait()
with optional concurrency control and ordered results. - Works with both async/await and yield from coroutines syntax.
- Reliable coroutine timeout limit handler via context manager.
- Designed for intensive I/O bound concurrent non-blocking tasks.
- Good interoperability with
asyncio
and Pythonstdlib
functions. - Composable pipelines of functors via
|
operator overloading. - Small and dependency free.
- Compatible with Python +3.4.
Installation¶
Using pip
package manager:
pip install --upgrade paco
Or install the latest sources from Github:
pip install -e git+git://github.com/h2non/paco.git#egg=paco
API¶
- paco.ConcurrentExecutor
- paco.apply
- paco.compose
- paco.concurrent
- paco.constant
- paco.curry
- paco.defer
- paco.dropwhile
- paco.each
- paco.every
- paco.filter
- paco.filterfalse
- paco.flat_map
- paco.gather
- paco.identity
- paco.interval
- paco.map
- paco.once
- paco.partial
- paco.race
- paco.reduce
- paco.repeat
- paco.run
- paco.series
- paco.some
- paco.throttle
- paco.thunk
- paco.timeout
- paco.TimeoutLimit
- paco.times
- paco.until
- paco.wait
- paco.whilst
- paco.wraps
Examples¶
Asynchronously and concurrently execute multiple HTTP requests.
import paco
import aiohttp
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as res:
return res
async def fetch_urls():
urls = [
'https://www.google.com',
'https://www.yahoo.com',
'https://www.bing.com',
'https://www.baidu.com',
'https://duckduckgo.com',
]
# Map concurrent executor with concurrent limit of 3
responses = await paco.map(fetch, urls, limit=3)
for res in responses:
print('Status:', res.status)
# Run in event loop
paco.run(fetch_urls())
Concurrent pipeline-style composition of transform functors over an iterable object.
import paco
async def filterer(x):
return x < 8
async def mapper(x):
return x * 2
async def drop(x):
return x < 10
async def reducer(acc, x):
return acc + x
async def task(numbers):
return await (numbers
| paco.filter(filterer)
| paco.map(mapper)
| paco.dropwhile(drop)
| paco.reduce(reducer, initializer=0))
# Run in event loop
number = paco.run(task((1, 2, 3, 4, 5, 6, 7, 8, 9, 10)))
print('Number:', number) # => 36
License¶
MIT - Tomas Aparicio
Contents¶
Examples¶
Asynchronously and concurrently execute multiple HTTP requests.¶
import paco
import aiohttp
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as res:
return res
async def fetch_urls():
urls = [
'https://www.google.com',
'https://www.yahoo.com',
'https://www.bing.com',
'https://www.baidu.com',
'https://duckduckgo.com',
]
# Map concurrent executor with concurrent limit of 3
responses = await paco.map(fetch, urls, limit=3)
for res in responses:
print('Status:', res.status)
# Run in event loop
paco.run(fetch_urls())
Concurrent pipeline-style chain composition of functors over any iterable object.¶
import paco
async def filterer(x):
return x < 8
async def mapper(x):
return x * 2
async def drop(x):
return x < 10
async def reducer(acc, x):
return acc + x
async def task(numbers):
return await (numbers
| paco.filter(filterer)
| paco.map(mapper)
| paco.dropwhile(drop)
| paco.reduce(reducer, initializer=0)) # noqa
# Run in event loop
number = paco.run(task((1, 2, 3, 4, 5, 6, 7, 8, 9, 10)))
print('Number:', number) # => 36
API documentation¶
-
class
paco.
ConcurrentExecutor
(limit=10, loop=None, coros=None, ignore_empty=False)[source]¶ Bases:
object
Concurrent executes a set of asynchronous coroutines with a simple throttle concurrency configurable concurrency limit.
Provides an observer pub/sub interface, allowing API consumers to subscribe normal functions or coroutines to certain events that happen internally.
ConcurrentExecutor is_running a low-level implementation that powers most of the utility functions provided in paco.
For most cases you won’t need to rely on it, instead you can use the high-level API functions that provides a simpler abstraction for the majority of the use cases.
This class is not thread safe.
- Events:
- start (executor): triggered before executor cycle starts.
- finish (executor): triggered when all the coroutine finished.
- task.start (task): triggered before coroutine starts.
- task.finish (task, result): triggered when the coroutine finished.
- task.error (task, error): triggered when a coroutined task
- raised an exception.
Parameters: - limit (int) – concurrency limit. Defaults to 10.
- coros (list[coroutine], optional) – list of coroutines to schedule.
- loop (asyncio.BaseEventLoop, optional) – loop to run. Defaults to asyncio.get_event_loop().
- ignore_empty (bool, optional) – do not raise an exception if there are no coroutines to schedule are empty.
Returns: ConcurrentExecutor
Usage:
async def sum(x, y): return x + y pool = paco.ConcurrentExecutor(limit=2) pool.add(sum, 1, 2) pool.add(sum, None, 'str') done, pending = await pool.run(return_exceptions=True) [task.result() for task in done] # => [3, TypeError("unsupported operand type(s) for +: 'NoneType' and 'str'")] # noqa
-
__len__
()[source]¶ Returns the current length of the coroutines pool queue.
Returns: current coroutines pool length. Return type: int
-
add
(coro, *args, **kw)[source]¶ Adds a new coroutine function with optional variadic argumetns.
Parameters: - coro (coroutine function) – coroutine to execute.
- *args (mixed) – optional variadic arguments
Raises: TypeError
– if the coro object is not a valid coroutineReturns: coroutine wrapped future
Return type: future
-
extend
(*coros)[source]¶ Add multiple coroutines to the executor pool.
Raises: TypeError
– if the coro object is not a valid coroutine
-
is_running
()[source]¶ Checks the executor running state.
Returns: True
if the executur is running, otherwiseFalse
.Return type: bool
-
off
(event)[source]¶ Removes event subscribers.
Parameters: event (str) – event name to remove observers.
-
on
(event, fn)[source]¶ Subscribes to a specific event.
Parameters: - event (str) – event name to subcribe.
- fn (function) – function to trigger.
-
reset
()[source]¶ Resets the executer scheduler internal state.
Raises: RuntimeError
– is the executor is still running.
-
run
(timeout=None, return_when=None, return_exceptions=None, ignore_empty=None)[source]¶ Executes the registered coroutines in the executor queue.
Parameters: - timeout (int/float) – max execution timeout. No limit by default.
- return_exceptions (bool) – in case of coroutine exception.
- return_when (str) – sets when coroutine should be resolved. See asyncio.wait for supported values.
- ignore_empty (bool, optional) – do not raise an exception if there are no coroutines to schedule are empty.
Returns: two sets of Futures:
(done, pending)
Return type: Raises: ValueError
– if there is no coroutines to schedule.RuntimeError
– if executor is still running.TimeoutError
– if execution takes more than expected.
-
submit
(coro, *args, **kw)¶ Adds a new coroutine function with optional variadic argumetns.
Parameters: - coro (coroutine function) – coroutine to execute.
- *args (mixed) – optional variadic arguments
Raises: TypeError
– if the coro object is not a valid coroutineReturns: coroutine wrapped future
Return type: future
-
wait
(timeout=None, return_when=None, return_exceptions=None, ignore_empty=None)¶ Executes the registered coroutines in the executor queue.
Parameters: - timeout (int/float) – max execution timeout. No limit by default.
- return_exceptions (bool) – in case of coroutine exception.
- return_when (str) – sets when coroutine should be resolved. See asyncio.wait for supported values.
- ignore_empty (bool, optional) – do not raise an exception if there are no coroutines to schedule are empty.
Returns: two sets of Futures:
(done, pending)
Return type: Raises: ValueError
– if there is no coroutines to schedule.RuntimeError
– if executor is still running.TimeoutError
– if execution takes more than expected.
-
paco.
apply
(coro, *args, **kw)[source]¶ Creates a continuation coroutine function with some arguments already applied.
Useful as a shorthand when combined with other control flow functions. Any arguments passed to the returned function are added to the arguments originally passed to apply.
This is similar to paco.partial().
This function can be used as decorator.
Parameters: - coro (coroutinefunction) – coroutine function to wrap.
- *args (mixed) – mixed variadic arguments for partial application.
- *kwargs (mixed) – mixed variadic keyword arguments for partial application.
Raises: TypeError
– if coro argument is not a coroutine function.Returns: wrapped coroutine function.
Return type: coroutinefunction
Usage:
async def hello(name, mark='!'): print('Hello, {name}{mark}'.format(name=name, mark=mark)) hello_mike = paco.apply(hello, 'Mike') await hello_mike() # => Hello, Mike! hello_mike = paco.apply(hello, 'Mike', mark='?') await hello_mike() # => Hello, Mike?
-
paco.
compose
(*coros)[source]¶ Creates a coroutine function based on the composition of the passed coroutine functions.
Each function consumes the yielded result of the coroutine that follows.
Composing coroutine functions f(), g(), and h() would produce the result of f(g(h())).
Parameters: *coros (coroutinefunction) – variadic coroutine functions to compose. Raises: RuntimeError
– if cannot execute a coroutine function.Returns: coroutinefunction Usage:
async def sum_1(num): return num + 1 async def mul_2(num): return num * 2 coro = paco.compose(sum_1, mul_2, sum_1) await coro(2) # => 7
-
paco.
concurrent
¶ alias of
paco.concurrent.ConcurrentExecutor
-
paco.
constant
(value, delay=None)[source]¶ Returns a coroutine function that when called, always returns the provided value.
This function has an alias: paco.identity.
Parameters: - value (mixed) – value to constantly return when coroutine is called.
- delay (int/float) – optional return value delay in seconds.
Returns: coroutinefunction
Usage:
coro = paco.constant('foo') await coro() # => 'foo' await coro() # => 'foo'
-
paco.
curry
(arity_or_fn=None, ignore_kwargs=False, evaluator=None, *args, **kw)[source]¶ Creates a function that accepts one or more arguments of a function and either invokes func returning its result if at least arity number of arguments have been provided, or returns a function that accepts the remaining function arguments until the function arity is satisfied.
This function is overloaded: you can pass a function or coroutine function as first argument or an int indicating the explicit function arity.
Function arity can be inferred via function signature or explicitly passed via arity_or_fn param.
You can optionally ignore keyword based arguments as well passsing the ignore_kwargs param with True value.
This function can be used as decorator.
Parameters: - arity_or_fn (int|function|coroutinefunction) – function arity to curry or function to curry.
- ignore_kwargs (bool) – ignore keyword arguments as arity to satisfy during curry.
- evaluator (function) – use a custom arity evaluator function.
- *args (mixed) – mixed variadic arguments for partial function application.
- *kwargs (mixed) – keyword variadic arguments for partial function application.
Raises: TypeError
– if function is not a function or a coroutine function.Returns: - function will be returned until all the
function arity is satisfied, where a coroutine function will be returned instead.
Return type: function or coroutinefunction
Usage:
# Function signature inferred function arity @paco.curry async def task(x, y, z=0): return x * y + z await task(4)(4)(z=8) # => 24 # User defined function arity @paco.curry(4) async def task(x, y, *args, **kw): return x * y + args[0] * args[1] await task(4)(4)(8)(8) # => 80 # Ignore keyword arguments from arity @paco.curry(ignore_kwargs=True) async def task(x, y, z=0): return x * y await task(4)(4) # => 16
-
paco.
defer
(coro, delay=1)[source]¶ Returns a coroutine function wrapper that will defer the given coroutine execution for a certain amount of seconds in a non-blocking way.
This function can be used as decorator.
Parameters: - coro (coroutinefunction) – coroutine function to defer.
- delay (int/float) – number of seconds to defer execution.
Raises: TypeError
– if coro argument is not a coroutine function.Returns: ordered list of resultant values.
Return type: filtered values (list)
Usage:
# Usage as function await paco.defer(coro, delay=1) await paco.defer(coro, delay=0.5) # Usage as decorator @paco.defer(delay=1) async def mul_2(num): return num * 2 await mul_2(2) # => 4
-
paco.
dropwhile
(coro, iterable, loop=None)[source]¶ Make an iterator that drops elements from the iterable as long as the predicate is true; afterwards, returns every element.
Note, the iterator does not produce any output until the predicate first becomes false, so it may have a lengthy start-up time.
This function is pretty much equivalent to Python standard itertools.dropwhile(), but designed to be used with async coroutines.
This function is a coroutine.
This function can be composed in a pipeline chain with
|
operator.Parameters: - coro (coroutine function) – coroutine function to call with values to reduce.
- iterable (iterable|asynchronousiterable) – an iterable collection yielding coroutines functions.
- loop (asyncio.BaseEventLoop) – optional event loop to use.
Raises: TypeError
– if coro argument is not a coroutine function.Returns: ordered list of resultant values.
Return type: filtered values (list)
Usage:
async def filter(num): return num < 4 await paco.dropwhile(filter, [1, 2, 3, 4, 5, 1]) # => [4, 5, 1]
-
paco.
each
(coro, iterable, limit=0, loop=None, collect=False, timeout=None, return_exceptions=False, *args, **kw)[source]¶ Concurrently iterates values yielded from an iterable, passing them to an asynchronous coroutine.
You can optionally collect yielded values passing collect=True param, which would be equivalent to paco.map()`.
Mapped values will be returned as an ordered list. Items order is preserved based on origin iterable order.
Concurrency level can be configurable via limit param.
All coroutines will be executed in the same loop.
This function is a coroutine.
This function can be composed in a pipeline chain with
|
operator.Parameters: - coro (coroutinefunction) – coroutine iterator function that accepts iterable values.
- iterable (iterable|asynchronousiterable) – an iterable collection yielding coroutines functions.
- limit (int) – max iteration concurrency limit. Use
0
for no limit. - collect (bool) – return yielded values from coroutines. Default False.
- loop (asyncio.BaseEventLoop) – optional event loop to use.
- return_exceptions (bool) – enable/disable returning exceptions in case of error. collect param must be True.
- timeout (int|float) – timeout can be used to control the maximum number of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.
- *args (mixed) – optional variadic arguments to pass to the coroutine iterable function.
Returns: ordered list of values yielded by coroutines
Return type: results (list)
Raises: TypeError
– in case of invalid input arguments.Usage:
async def mul_2(num): return num * 2 await paco.each(mul_2, [1, 2, 3, 4, 5]) # => None await paco.each(mul_2, [1, 2, 3, 4, 5], collect=True) # => [2, 4, 6, 8, 10]
-
paco.
every
(coro, iterable, limit=1, loop=None)[source]¶ Returns True if every element in a given iterable satisfies the coroutine asynchronous test.
If any iteratee coroutine call returns False, the process is inmediately stopped, and False will be returned.
You can increase the concurrency limit for a fast race condition scenario.
This function is a coroutine.
This function can be composed in a pipeline chain with
|
operator.Parameters: - coro (coroutine function) – coroutine function to call with values to reduce.
- iterable (iterable) – an iterable collection yielding coroutines functions.
- limit (int) – max concurrency execution limit. Use
0
for no limit. - loop (asyncio.BaseEventLoop) – optional event loop to use.
Raises: TypeError
– if input arguments are not valid.Returns: True if all the values passes the test, otherwise False.
Return type: Usage:
async def gt_10(num): return num > 10 await paco.every(gt_10, [1, 2, 3, 11]) # => False await paco.every(gt_10, [11, 12, 13]) # => True
-
paco.
filter
(coro, iterable, assert_fn=None, limit=0, loop=None)[source]¶ Returns a list of all the values in coll which pass an asynchronous truth test coroutine.
Operations are executed concurrently by default, but results will be in order.
You can configure the concurrency via limit param.
This function is the asynchronous equivalent port Python built-in filter() function.
This function is a coroutine.
This function can be composed in a pipeline chain with
|
operator.Parameters: - coro (coroutine function) – coroutine filter function to call accepting iterable values.
- iterable (iterable|asynchronousiterable) – an iterable collection yielding coroutines functions.
- assert_fn (coroutinefunction) – optional assertion function.
- limit (int) – max filtering concurrency limit. Use
0
for no limit. - loop (asyncio.BaseEventLoop) – optional event loop to use.
Raises: TypeError
– if coro argument is not a coroutine function.Returns: - ordered list containing values that passed
the filter.
Return type: Usage:
async def iseven(num): return num % 2 == 0 async def assert_false(el): return not el await paco.filter(iseven, [1, 2, 3, 4, 5]) # => [2, 4] await paco.filter(iseven, [1, 2, 3, 4, 5], assert_fn=assert_false) # => [1, 3, 5]
-
paco.
filterfalse
(coro, iterable, limit=0, loop=None)[source]¶ Returns a list of all the values in coll which pass an asynchronous truth test coroutine.
Operations are executed concurrently by default, but results will be in order.
You can configure the concurrency via limit param.
This function is the asynchronous equivalent port Python built-in filterfalse() function.
This function is a coroutine.
This function can be composed in a pipeline chain with
|
operator.Parameters: - coro (coroutine function) – coroutine filter function to call accepting iterable values.
- iterable (iterable) – an iterable collection yielding coroutines functions.
- assert_fn (coroutinefunction) – optional assertion function.
- limit (int) – max filtering concurrency limit. Use
0
for no limit. - loop (asyncio.BaseEventLoop) – optional event loop to use.
Raises: TypeError
– if coro argument is not a coroutine function.Returns: - ordered list containing values that do not
passed the filter.
Return type: filtered values (list)
Usage:
async def iseven(num): return num % 2 == 0 await paco.filterfalse(coro, [1, 2, 3, 4, 5]) # => [1, 3, 5]
-
paco.
flat_map
(coro, iterable, limit=0, loop=None, timeout=None, return_exceptions=False, initializer=None, *args, **kw)[source]¶ Concurrently iterates values yielded from an iterable, passing them to an asynchronous coroutine.
This function is the flatten version of to
paco.map()
.Mapped values will be returned as an ordered list. Items order is preserved based on origin iterable order.
Concurrency level can be configurable via
limit
param.All coroutines will be executed in the same loop.
This function is a coroutine.
This function can be composed in a pipeline chain with
|
operator.Parameters: - coro (coroutinefunction) – coroutine iterator function that accepts iterable values.
- iterable (iterable|asynchronousiterable) – an iterable collection yielding coroutines functions.
- limit (int) – max iteration concurrency limit. Use
0
for no limit. - collect (bool) – return yielded values from coroutines. Default False.
- loop (asyncio.BaseEventLoop) – optional event loop to use.
- return_exceptions (bool) – enable/disable returning exceptions in case of error. collect param must be True.
- timeout (int|float) – timeout can be used to control the maximum number of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.
- *args (mixed) – optional variadic arguments to pass to the coroutine iterable function.
Returns: ordered list of values yielded by coroutines
Return type: results (list)
Raises: TypeError
– in case of invalid input arguments.Usage:
async def mul_2(num): return num * 2 await paco.flat_map(mul_2, [1, [2], [3, [4]], [(5,)]]) # => [2, 4, 6, 8, 10] # Pipeline style await [1, [2], [3, [4]], [(5,)]] | paco.flat_map(mul_2) # => [2, 4, 6, 8, 10]
-
paco.
gather
(*coros_or_futures, limit=0, loop=None, timeout=None, preserve_order=False, return_exceptions=False)[source]¶ Return a future aggregating results from the given coroutine objects with a concurrency execution limit.
If all the tasks are done successfully, the returned future’s result is the list of results (in the order of the original sequence, not necessarily the order of results arrival).
If return_exceptions is True, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future.
All futures must share the same event loop.
This functions is mostly compatible with Python standard
asyncio.gather
, but providing ordered results and concurrency control flow.This function is a coroutine.
Parameters: - *coros_or_futures (coroutines|list) – an iterable collection yielding coroutines functions or futures.
- limit (int) – max concurrency limit. Use
0
for no limit. - can be used to control the maximum number (timeout) – of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.
- preserve_order (bool) – preserves results order.
- return_exceptions (bool) – returns exceptions as valid results.
- loop (asyncio.BaseEventLoop) – optional event loop to use.
Returns: coroutines returned results.
Return type: Usage:
async def sum(x, y): return x + y await paco.gather( sum(1, 2), sum(None, 'str'), return_exceptions=True) # => [3, TypeError("unsupported operand type(s) for +: 'NoneType' and 'str'")] # noqa
-
paco.
identity
(value, delay=None)[source]¶ Returns a coroutine function that when called, always returns the provided value.
This function is an alias to paco.constant.
Parameters: - value (mixed) – value to constantly return when coroutine is called.
- delay (int/float) – optional return value delay in seconds.
Returns: coroutinefunction
Usage:
coro = paco.identity('foo') await coro() # => 'foo' await coro() # => 'foo'
-
paco.
interval
(coro, interval=1, times=None, loop=None)[source]¶ Schedules the execution of a coroutine function every x amount of seconds.
The function returns an asyncio.Task, which implements also an asyncio.Future interface, allowing the user to cancel the execution cycle.
This function can be used as decorator.
Parameters: - coro (coroutinefunction) – coroutine function to defer.
- interval (int/float) – number of seconds to repeat the coroutine execution.
- times (int) – optional maximum time of executions. Infinite by default.
- loop (asyncio.BaseEventLoop, optional) – loop to run. Defaults to asyncio.get_event_loop().
Raises: TypeError
– if coro argument is not a coroutine function.Returns: - coroutine wrapped as task future.
Useful for cancellation and state checking.
Return type: future (asyncio.Task)
Usage:
# Usage as function future = paco.interval(coro, 1) # Cancel it after a while... await asyncio.sleep(5) future.cancel() # Usage as decorator @paco.interval(10) async def metrics(): await send_metrics() future = await metrics()
-
paco.
map
(coro, iterable, limit=0, loop=None, timeout=None, return_exceptions=False, *args, **kw)[source]¶ Concurrently maps values yielded from an iterable, passing then into an asynchronous coroutine function.
Mapped values will be returned as list. Items order will be preserved based on origin iterable order.
Concurrency level can be configurable via
limit
param.This function is the asynchronous equivalent port Python built-in map() function.
This function is a coroutine.
This function can be composed in a pipeline chain with
|
operator.Parameters: - coro (coroutinefunction) – map coroutine function to use.
- iterable (iterable|asynchronousiterable) – an iterable collection yielding coroutines functions.
- limit (int) – max concurrency limit. Use
0
for no limit. - loop (asyncio.BaseEventLoop) – optional event loop to use.
- timeout (int|float) – timeout can be used to control the maximum number of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.
- return_exceptions (bool) – returns exceptions as valid results.
- *args (mixed) – optional variadic arguments to be passed to the coroutine map function.
Returns: ordered list of values yielded by coroutines
Return type: Usage:
async def mul_2(num): return num * 2 await paco.map(mul_2, [1, 2, 3, 4, 5]) # => [2, 4, 6, 8, 10]
-
paco.
once
(coro, raise_exception=False, return_value=None)[source]¶ Wrap a given coroutine function that is restricted to one execution.
Repeated calls to the coroutine function will return the value of the first invocation.
This function can be used as decorator.
Parameters: - coro (coroutinefunction) – coroutine function to wrap.
- raise_exception (bool) – raise exception if execution times exceeded.
- return_value (mixed) – value to return when execution times exceeded, instead of the memoized one from last invocation.
Raises: TypeError
– if coro argument is not a coroutine function.Returns: coroutinefunction
Usage:
async def mul_2(num): return num * 2 once = paco.once(mul_2) await once(2) # => 4 await once(3) # => 4 once = paco.once(mul_2, return_value='exceeded') await once(2) # => 4 await once(3) # => 'exceeded'
-
paco.
partial
(coro, *args, **kw)[source]¶ Partial function implementation designed for coroutines, allowing variadic input arguments.
This function can be used as decorator.
Parameters: - coro (coroutinefunction) – coroutine function to wrap.
- *args (mixed) – mixed variadic arguments for partial application.
Raises: TypeError
– ifcoro
is not a coroutine function.Returns: coroutinefunction
Usage:
async def pow(x, y): return x ** y pow_2 = paco.partial(pow, 2) await pow_2(4) # => 16
-
paco.
race
(iterable, loop=None, timeout=None, *args, **kw)[source]¶ Runs coroutines from a given iterable concurrently without waiting until the previous one has completed.
Once any of the tasks completes, the main coroutine is immediately resolved, yielding the first resolved value.
All coroutines will be executed in the same loop.
This function is a coroutine.
Parameters: - iterable (iterable) – an iterable collection yielding coroutines functions or coroutine objects.
- *args (mixed) – mixed variadic arguments to pass to coroutines.
- loop (asyncio.BaseEventLoop) – optional event loop to use.
- timeout (int|float) – timeout can be used to control the maximum number of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.
- *args – optional variadic argument to pass to coroutine function, if provided.
Raises: TypeError
– ifiterable
argument is not iterable.asyncio.TimoutError
– if wait timeout is exceeded.
Returns: ordered list of resultant values.
Return type: filtered values (list)
Usage:
async def coro1(): await asyncio.sleep(2) return 1 async def coro2(): return 2 async def coro3(): await asyncio.sleep(1) return 3 await paco.race([coro1, coro2, coro3]) # => 2
-
paco.
reduce
(coro, iterable, initializer=None, limit=1, right=False, loop=None)[source]¶ Apply function of two arguments cumulatively to the items of sequence, from left to right, so as to reduce the sequence to a single value.
Reduction will be executed sequentially without concurrency, so passed values would be in order.
This function is the asynchronous coroutine equivalent to Python standard functools.reduce() function.
This function is a coroutine.
This function can be composed in a pipeline chain with
|
operator.Parameters: - coro (coroutine function) – reducer coroutine binary function.
- iterable (iterable|asynchronousiterable) – an iterable collection yielding coroutines functions.
- initializer (mixed) – initial accumulator value used in the first reduction call.
- limit (int) – max iteration concurrency limit. Use
0
for no limit. - right (bool) – reduce iterable from right to left.
- loop (asyncio.BaseEventLoop) – optional event loop to use.
Raises: TypeError
– if input arguments are not valid.Returns: accumulated final reduced value.
Return type: mixed
Usage:
async def reducer(acc, num): return acc + num await paco.reduce(reducer, [1, 2, 3, 4, 5], initializer=0) # => 15
-
paco.
repeat
(coro, times=1, step=1, limit=1, loop=None)[source]¶ Executes the coroutine function
x
number of times, and accumulates results in order as you would use withmap
.Execution concurrency is configurable using
limit
param.This function is a coroutine.
Parameters: Raises: TypeError
– if coro is not a coroutine function.Returns: accumulated yielded values returned by coroutine.
Return type: Usage:
async def mul_2(num): return num * 2 await paco.repeat(mul_2, times=5) # => [2, 4, 6, 8, 10]
-
paco.
run
(coro, loop=None)[source]¶ Convenient shortcut alias to
loop.run_until_complete
.Parameters: - coro (coroutine) – coroutine object to schedule.
- loop (asyncio.BaseEventLoop) – optional event loop to use.
Defaults to:
asyncio.get_event_loop()
.
Returns: returned value by coroutine.
Return type: mixed
Usage:
async def mul_2(num): return num * 2 paco.run(mul_2(4)) # => 8
-
paco.
series
(*coros_or_futures, timeout=None, loop=None, return_exceptions=False)[source]¶ Run the given coroutine functions in series, each one running once the previous execution has completed.
If any coroutines raises an exception, no more coroutines are executed. Otherwise, the coroutines returned values will be returned as list.
timeout
can be used to control the maximum number of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.If
return_exceptions
is True, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future.All futures must share the same event loop.
This functions is basically the sequential execution version of
asyncio.gather()
. Interface compatible withasyncio.gather()
.This function is a coroutine.
Parameters: - *coros_or_futures (iter|list) – an iterable collection yielding coroutines functions.
- timeout (int/float) – maximum number of seconds to wait before returning.
- return_exceptions (bool) – exceptions in the tasks are treated the same as successful results, instead of raising them.
- loop (asyncio.BaseEventLoop) – optional event loop to use.
- *args (mixed) – optional variadic argument to pass to the coroutines function.
Returns: coroutines returned results.
Return type: Raises: TypeError
– in case of invalid coroutine object.ValueError
– in case of empty set of coroutines or futures.TimeoutError
– if execution takes more than expected.
Usage:
async def sum(x, y): return x + y await paco.series( sum(1, 2), sum(2, 3), sum(3, 4)) # => [3, 5, 7]
-
paco.
some
(coro, iterable, limit=0, timeout=None, loop=None)[source]¶ Returns True if at least one element in the iterable satisfies the asynchronous coroutine test. If any iteratee call returns True, iteration stops and True will be returned.
This function is a coroutine.
This function can be composed in a pipeline chain with
|
operator.Parameters: - coro (coroutine function) – coroutine function for test values.
- iterable (iterable|asynchronousiterable) – an iterable collection yielding coroutines functions.
- limit (int) – max concurrency limit. Use
0
for no limit. - can be used to control the maximum number (timeout) – of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.
- loop (asyncio.BaseEventLoop) – optional event loop to use.
Raises: TypeError
– if input arguments are not valid.Returns: True if at least on value passes the test, otherwise False.
Return type: Usage:
async def gt_3(num): return num > 3 await paco.some(test, [1, 2, 3, 4, 5]) # => True
-
paco.
throttle
(coro, limit=1, timeframe=1, return_value=None, raise_exception=False)[source]¶ Creates a throttled coroutine function that only invokes
coro
at most once per every time frame of seconds or milliseconds.Provide options to indicate whether func should be invoked on the leading and/or trailing edge of the wait timeout.
Subsequent calls to the throttled coroutine return the result of the last coroutine invocation.
This function can be used as decorator.
Parameters: - coro (coroutinefunction) – coroutine function to wrap with throttle strategy.
- limit (int) – number of coroutine allowed execution in the given time frame.
- timeframe (int|float) – throttle limit time frame in seconds.
- return_value (mixed) – optional return if the throttle limit is reached. Returns the latest returned value by default.
- raise_exception (bool) – raise exception if throttle limit is reached.
Raises: RuntimeError
– if cannot throttle limit reached (optional).Returns: coroutinefunction
Usage:
async def mul_2(num): return num * 2 # Use as simple wrapper throttled = paco.throttle(mul_2, limit=1, timeframe=2) await throttled(2) # => 4 await throttled(3) # ignored! # => 4 await asyncio.sleep(2) await throttled(3) # executed! # => 6 # Use as decorator @paco.throttle(limit=1, timeframe=2) async def mul_2(num): return num * 2 await mul_2(2) # => 4 await mul_2(3) # ignored! # => 4 await asyncio.sleep(2) await mul_2(3) # executed! # => 6
-
paco.
thunk
(coro)[source]¶ A thunk is a subroutine that is created, often automatically, to assist a call to another subroutine.
Creates a thunk coroutine which returns coroutine function that accepts no arguments and when invoked it schedules the wrapper coroutine and returns the final result.
See Wikipedia page for more information about Thunk subroutines: https://en.wikipedia.org/wiki/Thunk
Parameters: value (coroutinefunction) – wrapped coroutine function to invoke. Returns: coroutinefunction Usage:
async def task(): return 'foo' coro = paco.thunk(task) await coro() # => 'foo' await coro() # => 'foo'
-
paco.
timeout
(coro, timeout=None, loop=None)[source]¶ Wraps a given coroutine function, that when executed, if it takes more than the given timeout in seconds to execute, it will be canceled and raise an asyncio.TimeoutError.
This function is equivalent to Python standard asyncio.wait_for() function.
This function can be used as decorator.
Parameters: - coro (coroutinefunction|coroutine) – coroutine to wrap.
- timeout (int|float) – max wait timeout in seconds.
- loop (asyncio.BaseEventLoop) – optional event loop to use.
Raises: TypeError
– if coro argument is not a coroutine function.Returns: wrapper coroutine function.
Return type: coroutinefunction
Usage:
await paco.timeout(coro, timeout=10)
-
class
paco.
TimeoutLimit
(timeout, loop=None)[source]¶ Bases:
object
Timeout limit context manager.
Useful in cases when you want to apply timeout logic around block of code or in cases when asyncio.wait_for is not suitable.
Originally based on: https://github.com/aio-libs/async-timeout
Parameters: - timeout (int) – value in seconds or None to disable timeout logic.
- loop (asyncio.BaseEventLoop) – asyncio compatible event loop.
Usage:
with paco.TimeoutLimit(0.1): await paco.wait(task1, task2)
-
paco.
times
(coro, limit=1, raise_exception=False, return_value=None)[source]¶ Wraps a given coroutine function to be executed only a certain amount of times.
If the execution limit is exceeded, the last execution return value will be returned as result.
You can optionally define a custom return value on exceeded via return_value param.
This function can be used as decorator.
Parameters: Raises: TypeError
– if coro argument is not a coroutine function.RuntimeError
– if max execution excedeed (optional).
Returns: coroutinefunction
Usage:
async def mul_2(num): return num * 2 timed = paco.times(mul_2, 3) await timed(2) # => 4 await timed(3) # => 6 await timed(4) # => 8 await timed(5) # ignored! # => 8
-
paco.
until
(coro, coro_test, assert_coro=None, *args, **kw)[source]¶ Repeatedly call coro coroutine function until coro_test returns True.
This function is the inverse of paco.whilst().
This function is a coroutine.
Parameters: - coro (coroutinefunction) – coroutine function to execute.
- coro_test (coroutinefunction) – coroutine function to test.
- assert_coro (coroutinefunction) – optional assertion coroutine used to determine if the test passed or not.
- *args (mixed) – optional variadic arguments to pass to coro function.
Raises: TypeError
– if input arguments are invalid.Returns: result values returned by coro.
Return type: Usage:
calls = 0 async def task(): nonlocal calls calls += 1 return calls async def calls_gt_4(): return calls > 4 await paco.until(task, calls_gt_4) # => [1, 2, 3, 4, 5]
-
paco.
wait
(*coros_or_futures, limit=0, timeout=None, loop=None, return_exceptions=False, return_when='ALL_COMPLETED')[source]¶ Wait for the Futures and coroutine objects given by the sequence futures to complete, with optional concurrency limit. Coroutines will be wrapped in Tasks.
timeout
can be used to control the maximum number of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.If
return_exceptions
is True, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future.return_when
indicates when this function should return. It must be one of the following constants of the concurrent.futures module.All futures must share the same event loop.
This functions is mostly compatible with Python standard
asyncio.wait()
.Parameters: - *coros_or_futures (iter|list) – an iterable collection yielding coroutines functions.
- limit (int) – optional concurrency execution limit. Use
0
for no limit. - timeout (int/float) – maximum number of seconds to wait before returning.
- return_exceptions (bool) – exceptions in the tasks are treated the same as successful results, instead of raising them.
- return_when (str) – indicates when this function should return.
- loop (asyncio.BaseEventLoop) – optional event loop to use.
- *args (mixed) – optional variadic argument to pass to the coroutines function.
Returns: Returns two sets of Future: (done, pending).
Return type: Raises: TypeError
– in case of invalid coroutine object.ValueError
– in case of empty set of coroutines or futures.TimeoutError
– if execution takes more than expected.
Usage:
async def sum(x, y): return x + y done, pending = await paco.wait( sum(1, 2), sum(3, 4)) [task.result() for task in done] # => [3, 7]
-
paco.
whilst
(coro, coro_test, assert_coro=None, *args, **kw)[source]¶ Repeatedly call coro coroutine function while coro_test returns True.
This function is the inverse of paco.until().
This function is a coroutine.
Parameters: - coro (coroutinefunction) – coroutine function to execute.
- coro_test (coroutinefunction) – coroutine function to test.
- assert_coro (coroutinefunction) – optional assertion coroutine used to determine if the test passed or not.
- *args (mixed) – optional variadic arguments to pass to coro function.
Raises: TypeError
– if input arguments are invalid.Returns: result values returned by coro.
Return type: Usage:
calls = 0 async def task(): nonlocal calls calls += 1 return calls async def calls_lt_4(): return calls > 4 await paco.until(task, calls_lt_4) # => [1, 2, 3, 4, 5]
-
paco.
wraps
(fn)[source]¶ Wraps a given function as coroutine function.
This function can be used as decorator.
Parameters: fn (function) – function object to wrap. Returns: wrapped function as coroutine. Return type: coroutinefunction Usage:
def mul_2(num): return num * 2 # Use as function wrapper coro = paco.wraps(mul_2) await coro(2) # => 4 # Use as decorator @paco.wraps def mul_2(num): return num * 2 await mul_2(2) # => 4
History¶
v0.2.3 / 2018-10-23¶
- Merge pull request #42 from dylanjw/fix_syntax_error
- Use getattr to avoid async keyword
0.2.2 / 2018-10-09¶
- Merge pull request #40 from thatmattbone/master
- add loop param to paco.interval()
- fix(setup): use space based indentation
- fix(travis): use cpython 3.7-dev release
- refactor(errors): use “paco” prefix in exception messages
- chore(History): add version changes
v0.2.1 / 2018-03-21¶
- fix(#37): allow overload function signatures with variadic arguments
- refactor(timeout_test): remove print statement
- fix(docs): bad link to API reference
- refactor(docs): remove codesponsor
v0.2.0 / 2017-10-21¶
- refactor(api): API breaking change that modifies behavior by raising any legit exception generated by a coroutine.
- feat(examples): add examples file
- feat(v2): v2 pre-release, propagate raise exception if return_exceptions is False
- refactor(tests): add map error exception assertion test
- Merge branch ‘master’ of https://github.com/h2non/paco
- refactor(tests): add map error exception assertion test
- feat(docs): add sponsor banner
- feat(docs): add sponsor banner
- feat(LICENSE): update copyright year
- Update setup.py
v0.1.11 / 2017-01-28¶
- feat(api): add
paco.interval
function.
v0.1.10 / 2017-01-11¶
- fix(each.py,map.py): fixed return_exceptions kwarg
- fix(setup.py): add author email
- fix(Makefile): remove package file
v0.1.9 / 2017-01-06¶
- feat(api): add identity function
- feat(#31): add thunk function
- feat(package): add wheel package distribution
- refactor(wraps): simplify implementation
- fix(History): remove indentation
v0.1.8 / 2016-12-29¶
- feat(requirements): force upgrade setuptools
- feat(#29): support async iterable and generators
- fix(docs): link to examples
- chore(travis): use Python 3.6 stable release
0.1.7 / 2016-12-18¶
- feat(#26): add curry function.
0.1.6 / 2016-12-11¶
- feat(pipe): isolate pipe operator overload code
- refactor: decorator and util functions
- feat(#11): timeout limit context manager.
- refactor(core): several minor refactors
- fix(docs): comment out latex sphinx settings
- fix(docs): use current package version
- Documentation examples improvements (#27)
- feat(history): update
- feat: add pool length magic method
0.1.5 (2016-12-04)¶
- fix(#25): allow empty iterables in iterators functions, such as
map
,filter
,reduce
.
0.1.4 (2016-11-28)¶
- fix(#24): explicitly pass loop instance to
asyncio.wait
.
0.1.3 (2016-10-27)¶
- feat(#17): add
flat_map
function. - feat(#18): add pipeline-style operator overloading composition.
0.1.2 (2016-10-25)¶
- fix(setup.py): fix pip installation.
- refactor(api): minor refactors in several functions and tests.
0.1.1 (2016-10-24)¶
- refactor(name): use new project name.
0.1.0 (2016-10-23)¶
- First version (beta)