Serverless Orchestrator of Serverless Workers¶
`sosw` - a set of tools for orchestrating asynchronous invocations of AWS Lambda functions. Essential components of `sosw` are implemented as AWS Lambda functions themselves.
Note
Please pronounce sosw correctly: /ˈsɔːsəʊ/
Essential Workflow Schema¶

Workers¶
The Worker functions themselves are your functions that require Orchestration. sosw
package
has multiple components and tools that we encourage you to use in your Workers to make the code DRY and stable:
statistic aggregation, components initialization, configuration automatic assembling and more…
In order to be Orchestrated by sosw
, the functions should be able to mark tasks they receive as completed
once the job is done. If you inherit the main classes of your Lambdas from Worker this will be handled
automatically. The default behaviour of Workers is not to touch the queue, but to call a Worker Assitant
lambda to mark tasks as completed.
Read more: Worker
There could be defined three different Workflows.
Scheduling¶
Scheduling is transformation of your business jobs to specific tasks for Lambda Worker. One task = one invocation of a Worker. Scheduler provides an interface for your Business Applications, other Lambdas, Events (e.g. Scheduled Rules) to provide the Business Job.
Chunking of the Payload happens following the rules that you pre-configure. One of the built-in dimensions for chunking is calendar.
Scheduled tasks go to a stateful queue to be invoked when the time comes.
Read more: Scheduler
Orchestration¶
The Orchestrator is called automatically every minute by Scheduled Events. It evaluates the status of Workers at the current moment, the health of external metrics the Workers may depend on (e.g CPU lod of some DB, IOPS, etc.) and invokes the appropriate amount of new parallel invocations.
Read more: Orchestrator
Scavenger¶
The Scavenger is called automatically every minute by Scheduled Events.
It collects the tasks marked as completed
by the Workers and archives them.
If the task did not successfully accomplish it tries to re-invoke it with configurable exponential delay.
In case the task completely fails after several invocations, the Scavenger marks it is dead
and removes
from the queue to avoid infinite retries. In this case some external alarm system: SNS or Lambda
may be triggered as well.
Read more: Scavenger
Installation¶
`sosw` requires you to implement/deploy several Lambda functions (Essentials) using the appropriate core classes. The deployment is described in details, but assumes that you are familiar with basic AWS Serverless products.
Another deployment requirement is to create several DynamoDB tables.
Once again, the detailed guide for initial setup can be found in the Installation.
Installation¶
Steps¶
Setup AWS Account¶
sosw
implementation currently supports only AWS infrastructure. If you are running
production operations on AWS, we highly recommend setting up a standalone account for your
first experiments with sosw
. AWS Organisations
now provide an easy way to set sub-accounts from the primary one.
To setup a completely isolated new account, follow the AWS Documentation
We shall require several services, but they are all supposed to fit in the AWS Free Tier. As long as the resources are created using CloudFormation, once you delete the stacks - the related resources will also be deleted automatically to avoid unnecessary charges.
See Cleanup after tutorials instructions in the Tutorials section.
Provision Required AWS Resources¶
This document shall guide you through the setup process for sosw
Essentials and different
resources required for them. All the resources are created using Infrastructure as Code
concept and can be easily cleaned up if no longer required.
Warning
The following Guide assumes that you are running these commands from an EC2 machine using either a Key or Role with permissions to control IAM, CloudFormation, Lambda, CloudWatch, DynamoDB, S3 (and probably something else).
If you are running this in the test account - feel free to grant the IAM role of your EC2
instance the policy arn:aws:iam::aws:policy/AdministratorAccess
, but never do this in
Production.
If you plan to run tutorials after this, we recommend setting this up in us-west-2
(Oregon) Region. Some scripts in the tutorials guidelines may have the region hardcoded.
Now we assume that you have created a fresh Amazon Linux 2 machine with some IAM Role having permissions listed above. You may follow this tutorial if feeling uncertain, just create a new IAM Role on Step 3 of the instance setup Wizard.
Warning
Do not run this in Production AWS Account unless you completely understand what is going on!
The following commands are tested on a fresh EC2 instance of type t2.micro
running
on default Amazon Linux 2 AMI 64-bit.
# Install required system packages for SAM, AWS CLI and Python.
sudo yum update -y
sudo yum install zlib-devel build-essential python3.7 python3-devel git docker -y
# Update pip and ensure you have required Python packages locally for the user.
# You might not need all of them at first, but if you would like to test `sosw`
# or play with it run tests
sudo pip3 install -U pip pipenv boto3
sudo mkdir /var/app
sudo chown ec2-user:ec2-user /var/app
cd /var/app
git clone https://github.com/sosw/sosw.git
cd sosw
# Need to configure your AWS CLI environment.
# Assuming you are using a new machine we shall just copy config with default region
# `us-west-2` to $HOME. The credentials you should not keep in the profile.
# The correct secure way is to use IAM roles if running from the AWS infrastructure.
# Feel free to change or skip this step if your environment is configured.
cp -nr .aws ~/
Now you are ready to start creating AWS resources. First let us provide some shared resources
that both sosw
Essentials and sosw
-managed Lambdas will use.
# Get your AccountId from EC2 metadata. Assuming you run this on EC2.
ACCOUNT=`curl http://169.254.169.254/latest/meta-data/identity-credentials/ec2/info/ | \
grep AccountId | awk -F "\"" '{print $4}'`
# Set your bucket name
BUCKETNAME=sosw-s3-$ACCOUNT
PREFIX=/var/app/sosw/examples/yaml/initial
# Create new CloudFormation stacks
for filename in `ls $PREFIX`; do
STACK=`echo $filename | sed s/.yaml//`
aws cloudformation package --template-file $PREFIX/$filename \
--output-template-file /tmp/deployment-output.yaml --s3-bucket $BUCKETNAME
aws cloudformation deploy --template-file /tmp/deployment-output.yaml \
--stack-name $STACK --capabilities CAPABILITY_NAMED_IAM
done
Note
Now, take a break and wait for these resources to be created. You may observe the changes in the CloudFormation web-console (Services -> CloudFormation).
Warning
DO NOT continue until all stacks reach the CREATE_COMPLETE status.
If you make any changes to these files in the future (after the initial deployment), use the following script and it will update CloudFormation stacks. There is no harm in running it extra time. CloudFormation is smart enough not to take any action if there are no actual changes in the templates.
Show scriptProvision Lambda Functions for Essentials¶
In this tutorial we were first going to use AWS SAM for provisioning Lambdas, but eventually gave it up. Too much black magic is required and you eventually lose control over the Lambda. The example of deploying Essentials uses raw bash/python scripts, AWS CLI and CloudFormation templates. If you want to contribute providing examples with SAM, you are welcome to. Some sandboxes can be found in examples/sam/ in the repository.
# Get your AccountId from EC2 metadata. Assuming you run this on EC2.
ACCOUNT=`curl http://169.254.169.254/latest/meta-data/identity-credentials/ec2/info/ | \
grep AccountId | awk -F "\"" '{print $4}'`
# Set your bucket name
BUCKETNAME=sosw-s3-$ACCOUNT
for name in `ls /var/app/sosw/examples/essentials`; do
echo "Deploying $name"
FUNCTION=$name
FUNCTIONDASHED=`echo $name | sed s/_/-/g`
cd /var/app/sosw/examples/essentials/$FUNCTION
# Install sosw package locally.
pip3 install -r requirements.txt --no-dependencies --target .
# Make a source package.
zip -qr /tmp/$FUNCTION.zip *
# Upload the file to S3, so that AWS Lambda will be able to easily take it from there.
aws s3 cp /tmp/$FUNCTION.zip s3://$BUCKETNAME/sosw/packages/
# Package and Deploy CloudFormation stack for the Function.
# It will create the Function and a custom IAM role for it with permissions
# to access required DynamoDB tables.
aws cloudformation package --template-file $FUNCTIONDASHED.yaml \
--output-template-file /tmp/deployment-output.yaml --s3-bucket $BUCKETNAME
aws cloudformation deploy --template-file /tmp/deployment-output.yaml \
--stack-name $FUNCTIONDASHED --capabilities CAPABILITY_NAMED_IAM
done
If you change anything in the code or simply want to redeploy the code use the following:
Show scriptUpload Essentials Configurations¶
sosw-managed Lambdas (and Essentials themselves) will automatically try to read their
configuration from the DynamoDB table config
. Each Lambda looks for the document with
a range_key config_name = 'LAMBDA_NAME_config'
(e.g. 'sosw_orchestrator_config'
).
The config_value
should contain a JSON that will be recursively merged to the
DEFAULT_CONFIG
of each Lambda.
We have provided some very basic examples of configuring Essentials. The config files have some values that are dependant on your AWS Account ID, so we shall substitute it and then upload these configs to DynamoDB. It is much easier to do this in Python, so we shall call a python script for that. The script uses some sosw features for working with DynamoDB, so we shall have to install sosw.
cd /var/app/sosw
pipenv run pip install sosw
cd /var/app/sosw/examples/
pipenv run python3 config_updater.py
### Or alternatively use old one:
# cd /var/app/sosw/examples/essentials/.config
# python3 config_uploader.py
# cd /var/app/sosw
Please take your time to read more about Config Source and find advanced examples in the guidelines of Orchestrator, Scavenger and Scheduler.
Create Scheduled Rules¶
The usual implementation expects the Orchestrator
and Scavenger
to run every minute,
while Scheduler
and WorkerAssistant
are executed per request. Scheduler
may have
any number of cronned Business Tasks with any desired periodicity of course.
The following script will create an AWS CloudWatch Events Scheduled Rule that will invoke
the Orchestrator
and Scavenger
every minute.
Note
Make sure not to leave this rule enabled after you finish your tutorial, because after passing the free tier of AWS for Lambda functions it might cause unexpected charges.
# Set parameters:
BUCKETNAME=sosw-s3-$ACCOUNT
PREFIX=/var/app/sosw/examples/yaml
FILENAME=sosw-dev-scheduled-rules.yaml
STACK=sosw-dev-scheduled-rules
aws cloudformation package --template-file $PREFIX/$FILENAME \
--output-template-file /tmp/deployment-output.yaml --s3-bucket $BUCKETNAME
aws cloudformation deploy --template-file /tmp/deployment-output.yaml \
--stack-name $STACK --capabilities CAPABILITY_NAMED_IAM
Essentials¶
Worker¶
View Licence Agreement-
class
sosw.worker.
Worker
(custom_config=None, **kwargs)[source]¶ We recommend that you inherit your core Processor from this class in Lambdas that are orchestrated by sosw.
The
__call__
method is supposed to accept theevent
of the Lambda invocation. This is a dictionary with the payload received in the lambda_handler during invocation.Worker has all the common methods of Processor and tries to mark task as completed if received
task_id
in theevent
. Worker create a payload withstats
andresult
if exist and invoke worker assistant lambda.
Example¶
Please find the following elementary example of Worker Lambda.
import logging
from sosw import Worker
from sosw.app import LambdaGlobals, get_lambda_handler
logger = logging.getLogger()
logger.setLevel(logging.INFO)
class Processor(Worker):
DEFAULT_CONFIG = {
'init_clients': ['dynamo_db'],
'dynamo_db_config': {
'row_mapper': {
'hash_col': 'S', # Number
'range_col': 'N', # String
},
'required_fields': ['hash_col', 'range_col'],
'table_name': 'autotest_dynamo_db', # If a table is not specified, this table will be used.
}
}
dynamo_db_client = None
def __call__(self, event):
# Example of your Worker logic
row = event.get('row')
self.put_to_db(row)
# Do some basic cleaning and marking `sosw` task as completed.
super().__call__(event)
def put_to_db(self, row):
self.dynamo_db_client.put(row)
# Setting the entry point of the lambda.
global_vars = LambdaGlobals()
lambda_handler = get_lambda_handler(Processor, global_vars)
Orchestrator¶
Orchestrator does the … Orchestration.
You can use the class in your Lambda as is, just configure some settings using one of the supported ways in Config
The following diagram represents the basic Task Workflow initiated by the Orchestrator.

Workers Invocation Workflow
TASKS_TABLE_CONFIG = {
'row_mapper': {
'task_id': 'S',
'labourer_id': 'S',
'greenfield': 'N',
'attempts': 'N',
'closed_at': 'N',
'completed_at': 'N',
'desired_launch_time': 'N',
'arn': 'S',
'payload': 'S'
},
'required_fields': ['task_id', 'labourer_id', 'created_at', 'greenfield'],
'table_name': 'sosw_tasks',
'index_greenfield': 'sosw_tasks_greenfield',
'field_names': {
'task_id': 'task_id',
'labourer_id': 'labourer_id',
'greenfield': 'greenfield',
}
}
TASK_CLIENT_CONFIG = {
'dynamo_db_config': TASKS_TABLE_CONFIG,
'sosw_closed_tasks_table': 'sosw_closed_tasks',
'sosw_retry_tasks_table': 'sosw_retry_tasks',
'sosw_retry_tasks_greenfield_index': 'labourer_id_greenfield',
'ecology_config': {},
'labourers': {
'some_function': {
'arn': f"arn:aws:lambda:us-west-2:737060422660:function:some_function",
'max_simultaneous_invocations': 10,
'health_metrics': {
'SomeDBCPU': {
'details': {
'Name': 'CPUUtilization',
'Namespace': 'AWS/RDS',
'Period': 60,
'Statistics': ['Average'],
'Dimensions': [
{
'Name': 'DBInstanceIdentifier',
'Value': 'YOUR-DB'
},
],
},
# These is the mapping of how the Labourer should "feel" about this metric.
# See EcologyManager.ECO_STATUSES.
# This is just a mapping ``ECO_STATUS: value`` using ``feeling_comparison_operator``.
'feelings': {
3: 50,
4: 25,
},
'feeling_comparison_operator': '<='
},
},
},
},
}
ORCHESTRATOR_CONFIG = {
'task_config': TASK_CLIENT_CONFIG,
}
Example CloudFormation template for Orchestrator
See also Greenfield
View Licence Agreement-
class
sosw.orchestrator.
Orchestrator
(custom_config=None, **kwargs)[source]¶ - Orchestrator class.Iterates the pre-configured Labourers and invokes appropriate number of Tasks for each one.
-
get_desired_invocation_number_for_labourer
(labourer: sosw.labourer.Labourer) → int[source]¶ Decides the desired maximum number of simultaneous invocations for a specific Labourer. The decision is based on the ecology status of the Labourer and the configs.
Returns: Number of invocations
-
Scheduler¶
Scheduler is the public interface of sosw
for any applications who want to invoke some orchestrated Lambdas.
It’s main role is to transform some business job to the actual payload of Lambda invocations. It respects the
configurable rules for chunking specific for different workers.

Scheduler Workflow
TASKS_TABLE_CONFIG = {
'row_mapper': {
'task_id': 'S',
'labourer_id': 'S',
'greenfield': 'N',
'attempts': 'N',
'closed_at': 'N',
'completed_at': 'N',
'desired_launch_time': 'N',
'arn': 'S',
'payload': 'S'
},
'required_fields': ['task_id', 'labourer_id', 'created_at', 'greenfield'],
'table_name': 'sosw_tasks',
'index_greenfield': 'sosw_tasks_greenfield',
'field_names': {
'task_id': 'task_id',
'labourer_id': 'labourer_id',
'greenfield': 'greenfield',
}
}
TASK_CLIENT_CONFIG = {
'dynamo_db_config': TASKS_TABLE_CONFIG,
'sosw_closed_tasks_table': 'sosw_closed_tasks',
'sosw_retry_tasks_table': 'sosw_retry_tasks',
'sosw_retry_tasks_greenfield_index': 'labourer_id_greenfield',
'ecology_config': {},
'labourers': {
'some_function': {
'arn': 'arn:aws:lambda:us-west-2:0000000000:function:some_function',
'max_simultaneous_invocations': 10,
},
},
}
SCHEDULER_CONFIG = {
'queue_bucket': 'some-bucket',
'task_config': TASK_CLIENT_CONFIG,
'job_schema': {
'chunkable_attrs': [
]
}
}
-
class
sosw.scheduler.
Scheduler
(*args, **kwargs)[source]¶ Scheduler is converting business jobs to one or multiple Worker tasks.
Job supports a lot of dynamic settings that will coordinate the chunking.
Parameters:
max_YOURATTRs_per_batch: int
This is applicable only to the lowest level of chunking. If isolation of this parameters is not required, and all values of this parameter are simple strings/integers the scheduler shall chunk them in batches of given size. By default will chunk to 1kk objects in a list.
-
chunk_dates
(job: Dict, skeleton: Dict = None) → List[Dict][source]¶ There is a support for multiple not nested parameters to chunk. Dates is one very specific of them.
-
chunk_job
(job: dict, skeleton: Dict = None, attr: str = None) → List[Dict][source]¶ Recursively parses a job, validates everything and chunks to simple tasks what should be chunked. The Scenario of chunking and isolation is worth another story, so you should put a link here once it is ready.
-
construct_job_data
(job: Dict, skeleton: Dict = None) → List[Dict][source]¶ Chunks the job to tasks using several layers. Each layer is represented with a chunker method. All chunkers should accept job and optional skeleton for tasks and return a list of tasks. If there is nothing to chunk for some chunker, return same job (with injected skeleton) wrapped in a list.
Default chunkers:
- Date list chunking
- Recursive chunking for chunkable_attrs
-
get_and_lock_queue_file
() → str[source]¶ Either take a new (recently created) file in local /tmp/, or download the version of queue file from S3. We move the file in S3 to locked_ by prefix state or simply upload the new one there in locked_ state.
Returns: Local path to the file.
-
static
get_index_from_list
(attr, data)[source]¶ Finds the index ignoring the ‘s’ at the end of attribute.
-
last_week
(pattern: str = 'last_week') → List[str][source]¶ Returns list of dates (YYYY-MM-DD) as strings for last week (Sunday - Saturday) :param pattern: :return:
-
needs_chunking
(attr: str, data: Dict) → bool[source]¶ Recursively analyses the data and identifies if the current level of data should be chunked. This could happen if either isolate_attr marker in the current scope or recursively in any of sub-elements.
Parameters: - attr – Name of attribute you want to check for chunking.
- data – Input dictionary to analyse.
-
parse_job_to_file
(job: Dict)[source]¶ Splits the Job to multiple tasks and writes them down in self.local_queue_file.
Parameters: job (dict) – Payload from Scheduled Rule. Should be already parsed from whatever payload to dict and contain the raw job
-
static
pop_rows_from_file
(file_name: str, rows: Optional[int] = 1) → List[str][source]¶ Reads the rows from the top of file. Along the way removes them from original file.
Parameters: - file_name (str) – File to read.
- rows (int) – Number of rows to read. Default: 1
Returns: List of strings read from file top.
-
previous_x_days
(pattern: str) → List[str][source]¶ Returns a list of string dates from today - x - x
For example, consider today’s date as 2019-04-30. If I call for previous_x_days(pattern=’previous_2_days’), I will receive a list of string dates equal to: [‘2019-04-26’, ‘2019-04-27’]
-
process_file
()[source]¶ Process a file for creating tasks, then uploading it to S3. In case of execution time reached its limit, spawning a new sibling to continue the processing.
-
remote_queue_file
¶ Full S3 Key of file with queue of tasks not yet in DynamoDB.
-
remote_queue_locked_file
¶ Full S3 Key of file with queue of tasks not yet in DynamoDB in the locked state. Concurrent processes should not touch it.
-
set_queue_file
(name: str = None)[source]¶ Initialize a unique file_name to store the queue of tasks to write.
-
sufficient_execution_time_left
¶ Return if there is a sufficient execution time for processing (‘shutdown period’ is in seconds).
-
today
(pattern: str = 'today') → List[str][source]¶ Returns list with one datetime string (YYYY-MM-DD) equal to today’s date.
-
upload_and_unlock_queue_file
()[source]¶ Upload the local queue file to S3 and remove the locked_ by prefix copy if it exists.
-
validate_list_of_vals
(data: Union[list, set, tuple, Dict]) → list[source]¶ Supported resulting values: str, int, float.
Expects a simple iterable of supported values or a dictionary with values = None. The keys then are treated as resulting values and if they validate, are returned as a list.
-
Scavenger¶
The main roles of Scavenger are:
- Find Completed tasks and archive them
- Find Expired tasks (ones invoked, but not successfully completed by Workers) and either retry or mark them as failed.
-
class
sosw.scavenger.
Scavenger
(custom_config=None, **kwargs)[source]¶ Scavenger main class performes the following operations:
- archive_tasks(labourer)
- handle_expired_tasks(labourer)
- retry_tasks(labourer)
-
archive_tasks
(labourer: sosw.labourer.Labourer)[source]¶ Read from sosw_tasks the ones successfully marked as completed by Workers and archive them.
-
get_db_field_name
(key: str) → str[source]¶ Could be useful if you overwrite field names with your own ones (e.g. for tests).
Worker Assitant¶
View Licence Agreement-
class
sosw.worker_assistant.
WorkerAssistant
(custom_config=None, **kwargs)[source]¶ Worker Assistant is the interface Worker Lambdas should call to mark their tasks completed.
The future versions will also accept the remaining workload to process to call Siblings in case the worker is about to time out and wants to finish healthy.
This Essential is supposed to be called synchronously by the Worker Lambdas. Should pass the
action
andtask_id
attributes in the payload of the call.See example of the usage in Worker.
Processor¶
Core class for other components to inherit.
View Licence Agreement-
class
sosw.app.
Processor
(custom_config=None, **kwargs)[source]¶ Core Processor class template. All the main components (Worker, Orchestrator and Scheduler) inherit from this one. You can also use this class as parent for some of your standalone Lambdas, but we strictly encourage you to use Worker class in case you are running functions under sosw orchestration.
-
die
(message='Unknown Failure')[source]¶ Logs current Processor stats and message. Then raises RuntimeError with message.
If there is access to publish SNS messages, the method will also try to publish to the topic configured as dead_sns_topic or ‘SoswWorkerErrors’.
Parameters: message (str) – Description of failure.
-
static
get_config
(name)[source]¶ Returns config by name from SSM. Override this to provide your config handling method.
Parameters: name – Name of the config Return type: dict
-
get_stats
(recursive: bool = True)[source]¶ Return statistics of operations performed by current instance of the Class.
Statistics of custom clients existing in the Processor is also aggregated by default. Clients must be initialized as self.some_client ending with _client suffix (e.g. self.dynamo_client). Clients must also have their own get_stats() methods implemented.
Be careful about circular get_stats() calls from child classes. If required overwrite get_stats() with recursive = False.
Parameters: recursive – Merge stats from self.***_client. Return type: dict Returns: Statistics counter of current Processor instance.
-
reset_stats
(recursive: bool = True)[source]¶ Cleans statistics other than specified for the lifetime of processor. All the parameters with prefix ‘total_’ are also preserved.
The function makes sense if your Processor lives outside the scope of lambda_handler.
Be careful about circular get_stats() calls from child classes. If required overwrite reset_stats() with recursive = False.
Parameters: recursive – Reset stats from self.***_client.
-
-
class
sosw.app.
LambdaGlobals
[source]¶ Global placeholder for global_vars that we want to preserve in the lifetime of the Lambda Container. e.g. once initiailised the given Processor, we keep it alive in the container to minimize warm-run time.
This namespace also contains the lambda_context which should be reset by get_lambda_handler method. See Worker examples in documentation for more info.
-
sosw.app.
get_lambda_handler
(processor_class, global_vars=None, custom_config=None)[source]¶ Return a reference to the entry point of the lambda function.
Parameters: - processor_class – Callable processor class.
- global_vars – Lambda’s global variables (processor, context).
- custom_config – Custom configuration to pass the processor constructor.
Returns: Function reference for the lambda handler.
Components¶
Config Source¶
View Licence AgreementConfig manager component. Has methods for getting configuration for lambdas. Use Config class to get the correct methods we use for each action. Using SSMConfig or DynamoConfig directly is discouraged. Especially SSMConfig, because SSM throttles and has limits we reached in the past. Using these methods requires the Role to have permissions to access SSM and/or Dynamo for requested resources.
-
class
sosw.components.config.
ConfigSource
(test=False, sources=None, config=None)[source]¶ A strategy adapter for config. Returns config from the selected config source. You can implement your own functions using these clients, and they can even call different configurations.
Parameters: - sources (str) – Config clients to initialize. Supported: SSM and Dynamo (default). Could be both, comma-separated. The first one then becomes default.
- config (dict) – Custom configurations for clients. Should be in ssm_config, dynamo_config, etc. Don’t be confused, but sometimes configs also need their own configs. :)
DynamoDB Client¶
View Licence Agreement-
class
sosw.components.dynamo_db.
DynamoDbClient
(config)[source]¶ Has default methods for different types of DynamoDB tables.
The current implementation supports only one fixed table during initialization, but you are free to initialize multiple simultaneous dynamo_clients in your Lambda with different configs.
Config should have a mapping for the field types and required fields. Config example:
{ 'row_mapper': { 'col_name_1': 'N', # Number 'col_name_2': 'S', # String }, 'required_fields': ['col_name_1'] 'table_name': 'some_table_name', # If a table is not specified, this table will be used. 'hash_key': 'the_hash_key', 'dont_json_loads_results': True # Use this if you don't want to convert json strings into json }
-
batch_get_items_one_table
(keys_list, table_name=None, max_retries=0, retry_wait_base_time=0.2, strict=None, fetch_all_fields=None)[source]¶ Gets a batch of items from a single dynamo table. Only accepts keys, can’t query by other columns.
Parameters: keys_list (list) – A list of the keys of the items we want to get. Gets the items that match the given keys. If some key doesn’t exist - it just skips it and gets the others. e.g. [{‘hash_col’: ‘1, ‘range_col’: 2}, {‘hash_col’: 3}] - will get a row where hash_col is 1 and range_col is 2, and also all rows where hash_col is 3. Optional
Parameters: - table_name (str) –
- max_retries (int) – If failed to get some items, retry this many times. Waiting between retries is multiplied by 2 after each retry, so retries shouldn’t be a big number. Default is 1.
- retry_wait_base_time (int) – Wait this much time after first retry. Will wait twice longer in each retry.
- strict (bool) – DEPRECATED.
- fetch_all_fields (bool) – If False, will only get the attributes specified in the row mapper. If True, will get all attributes. Default is False.
Returns: List of items from the table
Return type: list
-
delete
(keys: Dict, table_name: Optional[str] = None)[source]¶ Parameters: - keys (dict) – Keys and values of the row we delete.
- table_name –
-
dict_to_dynamo
(row_dict, add_prefix=None, strict=True)[source]¶ Convert the row from regular dictionary to the ugly DynamoDB syntax. Takes settings from row_mapper.
e.g. {‘key1’: ‘value1’, ‘key2’: ‘value2’} will convert to: {‘key1’: {‘Type1’: ‘value1’}, ‘key2’: {‘Type2’: ‘value2’}}
Parameters: - row_dict (dict) – A row we want to convert to dynamo syntax.
- add_prefix (str) – A string prefix to add to the key in the result dict. Useful for queries like update.
- strict (bool) – If False, will get the type from the value in the dict (this works for numbers and strings). If True, won’t add them if they’re not in the required_fields, and if they are, will raise an error.
Returns: DynamoDB Task item
Return type: dict
-
dynamo_to_dict
(dynamo_row: Dict, strict: bool = None, fetch_all_fields: Optional[bool] = None) → Dict[source]¶ Convert the ugly DynamoDB syntax of the row, to regular dictionary. We currently support only String or Numeric values. Latest ones are converted to int or float. Takes settings from row_mapper.
e.g.: {‘key1’: {‘N’: ‘3’}, ‘key2’: {‘S’: ‘value2’}} will convert to: {‘key1’: 3, ‘key2’: ‘value2’}
Parameters: - dynamo_row (dict) – DynamoDB row item
- strict (bool) – DEPRECATED.
- fetch_all_fields (bool) – If False only row_mapper fields will be extracted from dynamo_row, else, all fields will be extracted from dynamo_row.
Returns: The row in a key-value format
Return type: dict
-
get_by_query
(keys: Dict, table_name: Optional[str] = None, index_name: Optional[str] = None, comparisons: Optional[Dict] = None, max_items: Optional[int] = None, filter_expression: Optional[str] = None, strict: bool = None, return_count: bool = False, desc: bool = False, fetch_all_fields: bool = None, expr_attrs_names: list = None) → Union[List[Dict], int][source]¶ Get an item from a table, by some keys. Can specify an index. If an index is not specified, will query the table. IMPORTANT: You must specify the rows you expect to be converted in row mapper in config, otherwise you won’t get them in the result. If you want to get items from dynamo by non-key attributes, this method is not for you.
Parameters: keys (dict) – Keys and values of the items we get. You must specify the hash key, and can optionally also add the range key. Example, in a table where the hash key is ‘hk’ and the range key is ‘rk’: * {‘hk’: ‘cat’, ‘rk’: ‘123’} * {‘hk’: ‘cat’} Optional
Parameters: - table_name (str) – Name of the dynamo table. If not specified, will use table_name from the config.
- index_name (str) – Name of the secondary index in the table. If not specified, will query the table itself.
- comparisons (dict) – Type of comparison for each key. If a key is not mentioned, comparison type will be =. Valid values: =, <, <=, >, >=, begins_with. Comparisons only work for the range key. Example: if keys={‘hk’: ‘cat’, ‘rk’: 100} and comparisons={‘rk’: ‘<=’} -> will get items where rk <= 100
- max_items (int) – Limit the number of items to fetch.
- filter_expression (str) – Supports regular comparisons and between. Input must be a regular human string e.g. ‘key <= 42’, ‘name = marta’, ‘foo between 10 and 20’, etc.
- strict (bool) – DEPRECATED.
- return_count (bool) – If True, will return the number of items in the result instead of the items themselves
- desc (bool) – By default (False) the the values will be sorted ascending by the SortKey. To reverse the order set the argument desc = True.
- fetch_all_fields (bool) – If False, will only get the attributes specified in the row mapper. If True, will get all attributes. Default is False.
- expr_attrs_names (list) – List of attributes names, in case if an attribute name begins with a number or contains a space, a special character, or a reserved word, you must use an expression attribute name to replace that attribute’s name in the expression. Example, if the list [‘session’, ‘key’] is received, then a new dict will be assigned to ExpressionAttributeNames: {‘#session’: ‘session’, ‘#key’: ‘key’}
Returns: List of items from the table, each item in key-value format OR the count if return_count is True
-
get_by_scan
(attrs=None, table_name=None, strict=None, fetch_all_fields=None)[source]¶ Scans a table. Don’t use this method if you want to select by keys. It is SLOW compared to get_by_query. Careful - don’t make queries of too many items, this could run for a long time.
Optional:
Parameters: - attrs (dict) – Attribute names and values of the items we get. Can be empty to get the whole table.
- table_name (str) – Name of the dynamo table. If not specified, will use table_name from the config.
- strict (bool) – DEPRECATED.
- fetch_all_fields (bool) – If False, will only get the attributes specified in the row mapper. If True, will get all attributes. Default is False.
Returns: List of items from the table, each item in key-value format
Return type: list
-
get_by_scan_generator
(attrs=None, table_name=None, strict=None, fetch_all_fields=None)[source]¶ Scans a table. Don’t use this method if you want to select by keys. It is SLOW compared to get_by_query. Careful - don’t make queries of too many items, this could run for a long time. Same as get_by_scan, but yields parts of the results.
Optional:
Parameters: - attrs (dict) – Attribute names and values of the items we get. Can be empty to get the whole table.
- table_name (str) – Name of the dynamo table. If not specified, will use table_name from the config.
- strict (bool) – DEPRECATED.
- fetch_all_fields (bool) – If False, will only get the attributes specified in the row mapper. If false, will get all attributes. Default is True.
Returns: List of items from the table, each item in key-value format
Return type: list
-
get_capacity
(table_name=None)[source]¶ Fetches capacity for data tables
- Keyword Arguments:
- table_name {str} – DynamoDB (default: {None})
- Returns:
- dict – read/write capacity for the table requested
-
get_stats
()[source]¶ Return statistics of operations performed by current instance of the Class.
Returns: - dict - key: int statistics.
-
get_table_indexes
(table_name: Optional[str] = None) → Dict[source]¶ Returns active indexes of the table: their hash key, range key, and projection type.
{ 'index_1_name': { 'projection_type': 'ALL', # One of: 'ALL'|'KEYS_ONLY'|'INCLUDE' 'hash_key': 'the_hash_key_column_name', 'range_key': 'the_range_key_column_name', # Can be None if the index has no range key 'provisioned_throughput': { 'write_capacity': 5, 'read_capacity': 10 } }, 'index_2_name': ... }
-
get_table_keys
(table_name: Optional[str] = None) → Tuple[str, Optional[str]][source]¶ Returns table’s hash key name and range key name
Parameters: table_name – Returns: hash key and range key names
-
identify_dynamo_capacity
(table_name=None)[source]¶ Identify and store the table capacity for a given table on the object
- Arguments:
- table_name {str} – short name of the dynamo db table to analyze
-
patch
(keys: Dict, attributes_to_update: Optional[Dict] = None, attributes_to_increment: Optional[Dict] = None, table_name: Optional[str] = None, attributes_to_remove: Optional[List[str]] = None)[source]¶ Updates an item in DynamoDB. Will fail if an item with these keys does not exist.
-
put
(row, table_name=None, overwrite_existing=True)[source]¶ Adds a row to the database
Parameters: - row (dict) – The row to add to the table. key is column name, value is value.
- table_name (string) – Name of the dynamo table to add the row to.
- overwrite_existing (bool) – Overwrite the existing row if True, otherwise will raise an exception if exists.
-
sleep_db
(last_action_time: datetime.datetime, action: str)[source]¶ Sleeps between calls to dynamodb (if it needs to). Uses the table’s capacity to decide how long it needs to sleep.
Parameters: - last_action_time – Last time when we did this action (read/write) to this dynamo table
- action – “read” or “write”
-
transact_write
(*transactions)[source]¶ Executes many write transaction. Can execute operations on different tables. Will split transactions to chunks - because transact_write_items accepts up to 10 actions. WARNING: If you’re expecting a transaction on more than 10 operations - AWS DynamoDB doesn’t support it.
dynamo_db_client = DynamoDbClient(config) t1 = dynamo_db_client.make_put_transaction_item(row, table_name='table1') t2 = dynamo_db_client.make_delete_transaction_item(row, table_name='table2') dynamo_db_client.transact_write(t1, t2)
-
update
(keys: Dict, attributes_to_update: Optional[Dict] = None, attributes_to_increment: Optional[Dict] = None, table_name: Optional[str] = None, condition_expression: Optional[str] = None, attributes_to_remove: Optional[List[str]] = None)[source]¶ Updates an item in DynamoDB. Will create a new item if doesn’t exist. IMPORTANT - If you want to make sure it exists, use
patch
methodParameters: - keys (dict) – Keys and values of the row we update. Example, in a table where the hash key is ‘hk’ and the range key is ‘rk’: {‘hk’: ‘cat’, ‘rk’: ‘123’}
- attributes_to_update (dict) – Dict of the attributes to be updated. Can contain both existing attributes and new attributes. Will update existing, and create new attributes. Example: {‘col_name’: ‘some_value’}
- attributes_to_increment (dict) – Attribute names to increment, and the value to increment by. If the attribute doesn’t exist, will create it. Example: {‘some_counter’: ‘3’}
- attributes_to_remove (list) – Will remove these attributes from the record
- condition_expression (str) – Condition Expression that must be fulfilled on the object to update.
- table_name (str) – Name of the table
-
-
sosw.components.dynamo_db.
clean_dynamo_table
(table_name='autotest_dynamo_db', keys=('hash_col', 'range_col'), filter_expression=None)[source]¶ Cleans the DynamoDB Table. Only for autotest tables.
Parameters: - table_name (str) – name of the table
- keys (tuple) – the keys of the table
- filter_expression (str) – Supports regular comparisons and between. Input must be a regular human string e.g. ‘key <= 42’, ‘name = marta’, ‘foo between 10 and 20’, etc.
Warning
There are some reserved words that woud not work with Filter Expression in case they are attribute names. Fix this one day.
Helpers¶
View Licence AgreementStatic helper methods which you can use in any Lambdas. Must be completely independent with no specific requirements.
-
sosw.components.helpers.
validate_account_to_dashed
(account)[source]¶ Validates the the provided string is in valid AdWords account format and converts it to dashed format.
Parameters: account (str) – AdWords Account Return type: str Returns: Dashed format
-
sosw.components.helpers.
validate_account_to_int
(account)[source]¶ Validates the the provided string is in valid AdWords account format and converts it to integer format.
Parameters: int) account ((str,) – AdWords Account Returns: Account ID as integer
-
sosw.components.helpers.
validate_list_of_numbers_from_csv
(data)[source]¶ Converts a comma separated string of numeric values to a list of sorted unique integers. The values that do not match are skipped.
Parameters: iterable) data ((str,) – - str | iterable
Returns: - list(int)
-
sosw.components.helpers.
camel_case_to_underscore
(name)[source]¶ Converts attribute to string and formats it as underscored.
Parameters: name – - str - CamelCase string (or something convertable to CamelCase with __str__() method.
Returns: - str - underscore_formatted_value
-
sosw.components.helpers.
validate_uuid4
(uuid_string)[source]¶ Validate that a UUID string is in fact a valid uuid4. Happily, the uuid module does the actual checking for us. It is vital that the ‘version’ kwarg be passed to the UUID() call, otherwise any 32-character hex string is considered valid.
-
sosw.components.helpers.
rstrip_all
(input, patterns)[source]¶ Strips all of the patterns from the right of the input. Order and spaces do not matter.
Parameters: - input –
- str - String to modify
- patterns –
- list|set|tuple|str - Pattern[-s] to remove.
Returns: - str
- input –
-
sosw.components.helpers.
get_one_or_none_from_dict
(input, name, vtype=None)[source]¶ Extracts object by ‘name’ from the ‘input’. Tries also plural name in case not found by single ‘name’. In case found an iterable by plural name, validates that it has one or zero values in it. If vtype is specified, tries to convert result to it.
Parameters: - input (dict) – Input dictionary. Event of Lambda for example.
- name (str) – Name of attribute (in singular form).
- vtype (type) – Type to be converted to. Must be callable. Tested types: str, int, float
Returns: - instance of vtype | something else | None
Raises: ValueError – In all cases something is wrong.
-
sosw.components.helpers.
get_one_from_dict
(input, name, vtype=None)[source]¶ Extracts object by ‘name’ from the ‘input’. Tries also plural name in case not found by single ‘name’. In case found an iterable by plural name, validates that it has exactly one value in it. If vtype is specified, tries to convert result to it.
Parameters: - input –
- dict - Input dictionary. Event of Lambda for example.
- name –
- str - Name of attribute (in singular form).
- vtype –
- type - Type to be converted to. Must be callable. Tested types: str, int, float
Returns: - instance of vtype | something else | None
Raises: ValueError –
- In all cases something is wrong.
- input –
-
sosw.components.helpers.
get_list_of_multiple_or_one_or_empty_from_dict
(input, name, vtype=None)[source]¶ Extracts objects by ‘name’ from the ‘input’ and returns as a list. Tries both plural and singular names from the input. If vtype is specified, tries to convert each of the elements in the result to this type.
Parameters: - input –
- dict - Input dictionary. Event of Lambda for example.
- name –
- str - Name of attribute (in plural form).
- vtype –
- type - Type to be converted to. Must be callable. Tested types: str, int, float
Returns: - list - List of vtypes, or list of whatever was in input, or empty list.
Raises: ValueError – In all cases something is wrong.
- input –
-
sosw.components.helpers.
validate_date_list_from_event_or_days_back
(input, days_back=0, key_name='date_list')[source]¶ Takes from input the date_list and extracts date_list. Validates and converts to datetime.date. Input should have date_list as list of strings or comma-separated string.
- Format:
YYYY-MM-DD
- Examples:
['2018-01-01', '2018-02-01'] '2018-01-01, 2018-02-01'
Parameters: - input (dict) – This is supposed to be your whole Lambda event.
- days_back (int) – Optional Number of days to take back from today. Ex: days_back=1 is yesterday. Default: today.
- key_name (str) – Optional custom name of key to extract from ‘input’.
Returns: list(datetime.date)
- Format:
-
sosw.components.helpers.
validate_date_from_something
(d)[source]¶ Convert valid input to datetime.date() or raise either AttributeError or ValueError.
Parameters: d – Some input. Supported types: * datetime.datetime * datetime.date * int - Epoch or Epoch milliseconds * float - Epoch or Epoch milliseconds * str (YYYY-MM-DD) * str (YYYY-MM-DD HH:MM:SS) Returns: Transformed d Return type: datetime.date Raises: ValueError
-
sosw.components.helpers.
validate_datetime_from_something
(d)[source]¶ Converts the input d to datetime.datetime.
Parameters: d – Some input. Supported types: * datetime.datetime * datetime.date * int - Epoch or Epoch milliseconds * float - Epoch or Epoch milliseconds * str (YYYY-MM-DD) * str (YYYY-MM-DD HH:MM:SS) Returns: Transformed d Return type: datetime.datetime Raises: ValueError
-
sosw.components.helpers.
validate_string_matches_datetime_format
(date_str, date_format, field_name='date')[source]¶ Validate string, make sure it’s of the given datetime format
Parameters: - date_str (str) – a date or time or both, Example: ‘2018/09/16’
- date_format (str) – datetime format, that is acceptable for datetime.strptime. Example: ‘%Y/%m/%d’ (https://docs.python.org/3.6/library/datetime.html#strftime-and-strptime-behavior)
- field_name (str) – name of the field (for the error)
Raises: ValueError
-
sosw.components.helpers.
is_valid_date
(date_str, date_formats)[source]¶ Validate string to be at least one of the given datetime formats.
Parameters: - date_str (str) – a date or time or both, Example: ‘2018/09/16’
- date_formats (list) – List of datetime format, that is acceptable for datetime.strptime. Example: ‘%Y/%m/%d’
Return type: bool
Returns: True if the date string is valid for any of the datetime formats, False otherwise.
-
sosw.components.helpers.
recursive_matches_soft
(src, key, val, **kwargs)[source]¶ Searches the ‘src’ recursively for nested elements provided in ‘key’ with dot notation. In case some levels are iterable (list, tuple) it checks every element. In case the full path is inaccessible returns False. If any of the elements addressed by ‘key’ matches the ‘val’ - Bingo! Return True.
You might also be interested in recursive_exists_strict() helper.
Parameters: - src (dict) – Input dictionary. Can contain nested dictionaries and lists.
- key (str) – Path to search with dot notation.
- val (any) – Value to match in some elements specified by path.
In order to check not just that some element exists, but to check for duplicates, you might want to use optional ‘exclude’ attributes. If attributes are specified and the last level element following the path (dot notation) will have a key-value, the check for the main key-value will be skipped. See unittests to understand the bahaviour better.
Parameters: - exclude_key (str) – Key to check in last level element to exclude.
- exclude_val (srt) – Value to match in last level element to exclude.
Return type: bool
-
sosw.components.helpers.
recursive_matches_strict
(src, key, val, **kwargs)[source]¶ Searches the ‘input’ recursively for nested elements provided in ‘key’ with dot notation. In case some levels are iterable (list, tuple) it checks every element. In case the full path is inaccessible raises AttributeError or KeyError.
Parameters: - src (dict) – Input dictionary. Can contain nested dictionaries and lists.
- key (str) – Path to search with dot notation.
- val (any) – Value to match in some elements specified by path.
Return type: bool
-
sosw.components.helpers.
recursive_matches_extract
(src, key, separator=None, **kwargs)[source]¶ Searches the ‘src’ recursively for nested elements provided in ‘key’ with dot notation. In case some levels are iterable (list, tuple) it checks every element in it till finds it.
Returns the first found element or None. In case the full path is inaccessible also returns None.
If you are just checking if some elements exist, you might be interested in recursive_exists_strict() or recursive_exists_soft() helpers.
Parameters: - src (dict) – Input dictionary. Can contain nested dictionaries and lists.
- key (str) – Path to search with dot notation.
- separator (str) – Custom separator for recursive extraction. Default: ‘.’
In order to filter out some specific elements, you might want to use the optional ‘exclude’ attributes. If attributes are specified and the last level element following the path (dot notation) will have a key-value, the check for the main key-value will be skipped. See unittests to understand the bahaviour better.
Parameters: - exclude_key (str) – Key to check in last level element to exclude.
- exclude_val (str) – Value to match in last level element to exclude.
Returns: Value from structure extracted by specified path
-
sosw.components.helpers.
dunder_to_dict
(data: dict, separator=None)[source]¶ Converts the flat dict with keys using dunder notation for nesting elements to regular nested dictionary.
E.g.:
data = {'a': 'v1', 'b__c': 'v2', 'b__d__e': 'v3'} result = dunder_to_dict(data) # result: { 'a': 'v1', 'b': { 'c': 'v2', 'd': {'e': 'v3'} } }
Parameters: - data – A dictionary that is converted to Nested.
- separator (str) – Custom separator for recursive extraction. Default: ‘.’
-
sosw.components.helpers.
nested_dict_from_keys
(keys: List, value: Optional = None) → Dict[source]¶ Constructs a nested dictionary using a list of keys to embed recursively. If value is provided it is assigned to the last subkey.
Examples:
nested_dict_from_keys(['a', 'b', 'c']) == {'a': {'b': {'c': None}}} nested_dict_from_keys(['a', 'b', 'c'], value=42) == {'a': {'b': {'c': 42}}}
Parameters: - keys – List of keys to embed.
- value – Optional value to set to lowest level
-
sosw.components.helpers.
convert_string_to_words
(string)[source]¶ Convert string to comma separated words.
Parameters: string (str) – String to convert into words. Return type: str Returns: Comma separated words.
-
sosw.components.helpers.
construct_dates_from_event
(event: dict) → tuple[source]¶ Processes given event dictionary for start and end points of time. Otherwise takes the default settings.
The end date of the period may be specified as en_date in the event. The default value is today.
Also the event should have either st_date or days_back numeric parameter. If provided the days_back it will be substracted from end date.
Both st_date and en_date might be either date, datetime or string (‘YYYY-MM-DD’) types. In case of datetime, the hours/minutes/etc are ignored.
Parameters: event (dict) – Lambda payload. Returns: start_date, end_date as datetime.date
-
sosw.components.helpers.
validate_list_of_words_from_csv_or_list
(data: (<class 'str'>, <class 'list'>)) → list[source]¶ Splits a CSV string to list of stripped words. In case the data is already a list of strings - splits it’s elements and flattens the result.
All resulting elements must be single words, if any of the elements contains spaces (i.e. multiple words) the validation fails with ValueError.
Parameters: data – CSV string of list of strings (possibly CSV themselves) Returns: List of stripped and split words
-
sosw.components.helpers.
first_or_none
(items: Iterable, condition: Callable = None)[source]¶ Return first element in iterable to match condition or None
-
sosw.components.helpers.
recursive_update
(d: Dict, u: Mapping) → Dict[source]¶ Recursively updates the dictionary d with another one u. Values of u overwrite in case of type conflict.
List, set and tuple values of d and u are merged, preserving only unique values. Returned as List.
-
sosw.components.helpers.
trim_arn_to_name
(arn: str) → str[source]¶ Extract just the name of function from full ARN. Supports versions, aliases or raw name (without ARN).
More information about ARN Format: https://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html#genref-arns
-
sosw.components.helpers.
trim_arn_to_account
(arn: str) → str[source]¶ Extract just the ACCOUNT_ID from full ARN. Supports versions, aliases or raw name (without ARN).
More information about ARN Format: https://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html#genref-arns
-
sosw.components.helpers.
make_hash
(o)[source]¶ Makes a hash from a dictionary, list, tuple or set to any level, that contains only other hashable types (including any lists, tuples, sets, and dictionaries).
Original idea from this user: https://stackoverflow.com/users/660554/jomido
Plus some upgrades to work with sets and dicts having different types of keys appropriately. See source unittests of this function for some more details.
Siblings¶
Warning
This components has performance issues unless running with force=True
.
Can cause CloudWatch requests throttling. Requires some refactoring.
SiblingsManager provides Lambda executions an option of a “healthy” shutdown. They may pass the remaining payload to another execution automatically. See example:
import logging
import time
from sosw import Processor as SoswProcessor
from sosw.app import LambdaGlobals, get_lambda_handler
from sosw.components.siblings import SiblingsManager
logger = logging.getLogger()
logger.setLevel(logging.INFO)
class Processor(SoswProcessor):
DEFAULT_CONFIG = {
'init_clients': ['Siblings'], # Automatically initialize Siblings Manager
'shutdown_period': 10, # Some time to shutdown in a healthy manner.
}
siblings_client: SiblingsManager = None
def __call__(self, event):
cursor = event.get('cursor', 0)
while self.sufficient_execution_time_left:
self.process_data(cursor)
cursor += 1
if cursor == 20:
return f"Reached the end of data"
else:
# Spawning another sibling to continue the processing
payload = {'cursor': cursor}
self.siblings_client.spawn_sibling(global_vars.lambda_context, payload=payload, force=True)
self.stats['siblings_spawned'] += 1
def process_data(self, cursor):
""" Your custom logic respecting current cursor. """
logger.info(f"Processing data at cursor: {cursor}")
time.sleep(1)
@property
def sufficient_execution_time_left(self) -> bool:
""" Return whether there is a sufficient execution time for processing ('shutdown period' is in seconds). """
return global_vars.lambda_context.get_remaining_time_in_millis() > self.config['shutdown_period'] * 1000
global_vars = LambdaGlobals()
lambda_handler = get_lambda_handler(Processor, global_vars)
Here is an example use-case when you can store the remaining payload for example in S3 and call the sibling with a pointer to it.
View Licence Agreement-
class
sosw.components.siblings.
SiblingsManager
(custom_config=None, **kwargs)[source]¶ This set of helpers can be used for Lambdas that want to invoke some siblings of self. Very useful for Lambdas processing queues and running out of time.
The Role of your Lambda must have the following extra permissions to run correctly. Please note that we hardcode the Arn in the policy to avoid circular dependency when parsing YAML. This dependency is absolutely valid, but CloudFormation doesn’t know how to parse it.
Policies: - PolicyName: "YOUR_FUNCTION_NAME" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: "cloudwatch:GetMetricStatistics" Resource: "*" - Effect: "Allow" Action: "lambda:InvokeFunction" Resource: "arn:aws:lambda:us-west-2:737060422660:function:YOUR_FUNCTION_NAME"
-
any_events_rules_enabled
(lambda_context)[source]¶ Checks the Status of CloudWatch Events Rules. It is very important to use this checker before launching siblings. Otherwise, you can create an infinite autorespawning loop and waste A LOT of money.
Parameters: lambda_context – Context object from your lambda_handler. Return type: bool Raises: ResourceNotFoundException – If Rule with the given name doesn’t exist.
-
get_approximate_concurrent_executions
(minutes_back=5, name=None)[source]¶ Get approximate concurrent executions from CloudWatch Metrics. The value is very approximate and calculated as count of invocations during minutes_back divided by average duration in same period. Return value is rounded to integer using ceil.
We assume that the Role has permissions to read CloudWatch.
Parameters: - minutes_back (int) – Aggregate statistics for this number of minutes.
- name (str) – Name of the function to check. Default: currently running lambda.
Return type: int
Returns: Approximate number of concurrent executions.
-
spawn_sibling
(lambda_context, payload=None, force=False)[source]¶ Asynchronously invokes a copy of same function to continue working. Should be called if there is still work left to do (ex: messages in the queue).
Can optionally send some payload for example remaining unprocessed rows of something. Should be formatted as dictionary.
Parameters: - lambda_context – Context object from your lambda_handler.
- payload (dict) – The payload to be put to event.
- force (bool) – If specified True it will ignore the checks of enabled Events Rules.
-
SNS Manager¶
View Licence Agreement-
class
sosw.components.sns.
SnsManager
(**kwargs)[source]¶ AWS Simple Notification System Manager helper. Requires the Role to have permissions to access SSM for requested resources, something like.
Must have a recipient specified either during initialization or later using set_recipient method. Messages received by send_message() are batched and will be actually send to SNS only during the call of commit() or during the destruction of the class.
The method doesn’t yet support batching messages for multiple recipients, so in case you try to change the recipient it automatically sends all the currently batched messages to the previous recipient.
-
commit
()[source]¶ Combines messages from self.queue and pushes them to self.recipient. Cleans the queue after that.
-
create_subscription
(topic_arn, protocol, endpoint)[source]¶ Create a subscription to the topic
Parameters: - topic_arn (str) – ARN of a topic
- protocol (str) – The type of endpoint to subscribe
- endpoint (str) – Endpoint that can receive notifications from Amazon SNS
-
create_topic
(topic_name)[source]¶ Create a new topic name
Parameters: topic_name (str) – New topic name to create Returns: New topic ARN Return type: str
-
send_message
(message, subject=None, forse_commit=False)[source]¶ If the subject is not yet set (for example during __init__() of the class) - then require subject to be set. Otherwize we accept None subject and simply append messages to queue. Once the subject changes - the queue is commite automatically.
Parameters: - message (str) – Message to be send in body of SNS message. Queued.
- subject (str) – Optional. Custom subject for message.
-
tasks_api_client_for_workers¶
Warning
This components is a not generalized for broad usage. Requires refactoring.
Managers¶
TaskManager¶
View Licence Agreement-
class
sosw.managers.task.
TaskManager
(custom_config=None, **kwargs)[source]¶ TaskManager is the core class used by most
sosw
Lambdas. It handles all the operations with tasks thus the configuration of this Manager is essential during yoursosw
implementation.The default version of TaskManager works with DynamoDB tables to store and analyze the state of Tasks. This could be upgraded in future versions to work with other persistent storage or DBs.
The very important concept to understand about Task workflow is greenfield. Read more.
-
construct_payload_for_task
(**kwargs) → str[source]¶ Combines remaining kwargs to a singular JSON payload.
-
create_task
(labourer: sosw.labourer.Labourer, strict: bool = True, **kwargs)[source]¶ Schedule a new task.
Parameters: - labourer – Labourer object of Lambda to execute the task.
- strict (bool) – By default (True) prohibits specifying in the task (kwargs) the fields that are supposed to be autogenerated. Only if they match with autogen - then pass. You can override this and pass custom task properties setting strict = False
-
get_completed_tasks_for_labourer
(labourer: sosw.labourer.Labourer) → List[Dict][source]¶ Return a list of tasks of the Labourer marked as completed. Scavenger is supposed to archive them all so no special filtering is required here.
In order to be able to use the already existing index_greenfield, we sort tasks only in invoked stages (greenfield > now()). This number is supposed to be small, so filtering by an un-indexed field will be fast.
-
get_count_of_running_tasks_for_labourer
(labourer: sosw.labourer.Labourer) → int[source]¶ Returns a number of tasks we assume to be still running. Theoretically they can be dead with Exception, but not yet expired.
-
get_db_field_name
(key: str) → str[source]¶ Could be useful if you overwrite field names with your own ones (e.g. for tests).
-
get_expired_tasks_for_labourer
(labourer: sosw.labourer.Labourer) → List[Dict][source]¶ Return a list of tasks of Labourer previously invoked, and expired without being closed.
-
get_invoked_tasks_for_labourer
(labourer: sosw.labourer.Labourer, completed: Optional[bool] = None) → List[Dict][source]¶ Return a list of tasks of current Labourer invoked during the current run of the Orchestrator.
If completed is provided: * True - filter completed ones * False - filter NOT completed ones * None (default) - do not care about completed status.
-
get_labourers
() → List[sosw.labourer.Labourer][source]¶ Return configured Labourers. Config of the TaskManager expects ‘labourers’ as a dict ‘name_of_lambda’: {‘some_setting’: ‘value1’}
-
get_length_of_queue_for_labourer
(labourer: sosw.labourer.Labourer) → int[source]¶ Approximate count of tasks still in queue for labourer. Tasks with greenfield <= now()
Parameters: labourer – Returns:
-
get_newest_greenfield_for_labourer
(labourer: sosw.labourer.Labourer) → int[source]¶ Return value of the newest greenfield in queue. This means the end of the queue or latest added.
-
get_next_for_labourer
(labourer: sosw.labourer.Labourer, cnt: int = 1, only_ids: bool = False) → List[Union[str, Dict]][source]¶ Fetch the next task(s) from the queue for the Labourer.
Parameters: - labourer – Labourer to get next tasks for.
- cnt – Optional number of Tasks to fetch.
- only_ids – If explicitly set True, then returns only the IDs of tasks. This could save some transport if you are sending big batches of tasks between Lambdas.
-
get_oldest_greenfield_for_labourer
(labourer: sosw.labourer.Labourer, reverse: bool = False) → int[source]¶ Return value of oldest greenfield in queue. This means the beginning of the queue if you need FIFO behaviour.
-
get_running_tasks_for_labourer
(labourer: sosw.labourer.Labourer, count: bool = False) → Union[List[Dict], int][source]¶ Return a list of tasks of Labourer previously invoked, but not yet closed or expired. We assume they are still running.
If count is specified as True will return just the number of tasks, not the items themselves. Much cheaper.
-
invoke_task
(labourer: sosw.labourer.Labourer, task_id: Optional[str] = None, task: Optional[Dict] = None)[source]¶ Invoke the Lambda Function execution for task. Providing the ID is more expensive, but safer from “task injection” attacks method that prefetches Task from the table before trying to invoke.
Skips already running tasks with no exception, thus concurrent Orchestrators (or whoever else) should not duplicate invocations.
-
mark_task_invoked
(labourer: sosw.labourer.Labourer, task: Dict, check_running: Optional[bool] = True)[source]¶ Update the greenfield with the latest invocation timestamp + invocation_delta
By default updates with a conditional expression that fails in case the current greenfield is already in invoked state. If this check fails the function raises RuntimeError that should be handled by the Orchestrator. This is very important to help duplicate invocations of the Worker by simultaneously running Orchestrators.
Parameters: - labourer – Labourer for the task
- task – Task dictionary
- check_running – If True (default) updates with conditional expression.
:raises RuntimeError
-
move_task_to_retry_table
(task: Dict, wanted_delay: int)[source]¶ Put the task to a Dynamo table sosw_retry_tasks, with the wanted delay: labourer.max_runtime * attempts. Delete it from sosw_tasks table.
-
register_labourers
() → List[sosw.labourer.Labourer][source]¶ Sets timestamps, health status and other custom attributes on Labourer objects passed for registration.
We also send a pointer to the TaskManager (aka self) to Ecology Manager. The latter will have to make some queries, and we don’t want him to initialise another TaskManager for himself.
-
EcologyManager¶
View Licence Agreement-
class
sosw.managers.ecology.
EcologyManager
(*args, **kwargs)[source]¶ -
add_running_tasks_for_labourer
(labourer: sosw.labourer.Labourer, count: int = 1)[source]¶ Adds to the current counter of running tasks the given count. Invokes the getter first in case the original number was not yet calculated from DynamoDB.
-
count_running_tasks_for_labourer
(labourer: sosw.labourer.Labourer) → int[source]¶ TODO Refactor this to cache the value in the Labourer object itself. Should also update add_running_tasks_for_labourer() for that.
-
fetch_metric_stats
(metric: Dict) → List[Dict][source]¶ Fetches from CloudWatch Datapoints of aggregated metric statistics. Fields in metric are the attributes of get_metric_statistics. Additional parameter: MetricAggregationTimeSlice in seconds is used to calculate the Start and EndTime.
If some fields are missing in the metric, the defaults come from
config['default_metric_values']
-
get_health
(value: Union[int, float], metric: Dict) → int[source]¶ Checks the value against the health_metric configuration.
-
get_labourer_average_duration
(labourer: sosw.labourer.Labourer) → int[source]¶ Calculates the average duration of labourer executions.
The operation consumes DynamoDB RCU . Normally this method is called for each labourer only once during registration of Labourers. If you want to learn this value, you should ask Labourer object.
-
get_labourer_status
(labourer: sosw.labourer.Labourer) → int[source]¶ Get the worst (lowest) health status according to preconfigured health metrics of the Labourer.
Current ECO_STATUSES:
- (0, ‘Bad’)
- (1, ‘Poor’)
- (2, ‘Moderate’)
- (3, ‘Good’)
- (4, ‘High’)
-
get_max_labourer_duration
(labourer: sosw.labourer.Labourer) → int[source]¶ Maximum duration of labourer executions.
-
get_stats
(recursive=False)[source]¶ Return statistics of operations performed by current instance of the Class.
Statistics of custom clients existing in the Processor is also aggregated by default. Clients must be initialized as self.some_client ending with _client suffix (e.g. self.dynamo_client). Clients must also have their own get_stats() methods implemented.
Be careful about circular get_stats() calls from child classes. If required overwrite get_stats() with recursive = False.
Parameters: recursive – Merge stats from self.***_client. Return type: dict Returns: Statistics counter of current Processor instance.
-
register_task_manager
(task_manager: sosw.managers.task.TaskManager)[source]¶ We will have to make some queries, and don’t want to initialise another TaskManager locally. Just receive the pointer to TaskManager from whoever needs.
This could be in __init__, but I don’t want to update the initialization workflows for every function initialising me. They usually use built-in in core Processor mechanism to register_clients().
-
reset_stats
(recursive=False)[source]¶ Cleans statistics other than specified for the lifetime of processor. All the parameters with prefix ‘total_’ are also preserved.
The function makes sense if your Processor lives outside the scope of lambda_handler.
Be careful about circular get_stats() calls from child classes. If required overwrite reset_stats() with recursive = False.
Parameters: recursive – Reset stats from self.***_client.
-
Greenfield¶
Greenfield is a numeric field of the task
used mainly by
TaskManager to identify the current state of the task
.
The values in most states represent timestamps
.
TaskManager can easily identify the state by
comparing the current time with the greenfield
.
Possible states:
- Queued
- Invoked
- Completed
- Expired
- Running
The following diagram represents different states.

Greenfield Timeline
Tutorials¶
Tutorial Pull Tweeter Hashtags¶
In this tutorial we are going to pull the data about popularity of several different topics in the hash tags of tweets. Imagine that we have already classified some popular hashtags to several specific groups. Now we want to pull the data about their usage in the last day.
Every day we would want to read the data only for the last 3 previous days, so we add the chunking parameters:
"period": "last_2_days", "isolate_days": true
We shall pretend that pulling the data for every keyword takes several minutes.
In this case we add another parameter: “isolate_words”: true
to make sure that each word shall be processed
in an different Lambda execution.
The following payload shall become our Job. This means that we shall chunk it per specific word / day combination and create independent tasks for the Worker Lambda for each chunk.
{
"topics": {
"cars": {
"words": ["toyota", "mazda", "nissan", "racing", "automobile", "car"]
},
"food": {
"words": ["recipe", "cooking", "eating", "food", "meal", "tasty"]
},
"shows": {
"words": ["opera", "cinema", "movie", "concert", "show", "musical"]
}
},
"period": "last_2_days",
"isolate_days": true,
"isolate_words": true
}
Now the time has come to create the actual Lambda.
Register Twitter App¶
Package Lambda Code¶
Creating the Lambda is very similar to the way we deployed sosw
Essentials. We use the same scripts and deployment
workflow. Feel free to use your own favourite method or contribute to upgrade this one.
# Get your AccountId from EC2 metadata. Assuming you run this on EC2.
ACCOUNT=`curl http://169.254.169.254/latest/meta-data/identity-credentials/ec2/info/ | \
grep AccountId | awk -F "\"" '{print $4}'`
# Set your bucket name
BUCKETNAME=sosw-s3-$ACCOUNT
FUNCTION="sosw_tutorial_pull_tweeter_hashtags"
FUNCTIONDASHED=`echo $FUNCTION | sed s/_/-/g`
cd /var/app/sosw/examples/workers/$FUNCTION
# Install sosw package locally. It's only dependency is boto3, but we have it in Lambda
# containter already. Saving a lot of packages size ignoring this dependency.
# Install other possible requirements directly into package.
pip3 install sosw --no-dependencies --target .
pip3 install -r requirements.txt --target .
# Make a source package. TODO is skip 'dist-info' and 'test' paths.
zip -qr /tmp/$FUNCTION.zip *
# Upload the file to S3, so that AWS Lambda will be able to easily take it from there.
aws s3 cp /tmp/$FUNCTION.zip s3://$BUCKETNAME/sosw/packages/
# Package and Deploy CloudFormation stack for the Function.
# It will create the Function and a custom IAM role for it with permissions to
# acces the required DynamoDB tables.
aws cloudformation package --template-file $FUNCTION.yaml \
--output-template-file /tmp/deployment-output.yaml --s3-bucket $BUCKETNAME
aws cloudformation deploy --template-file /tmp/deployment-output.yaml \
--stack-name $FUNCTIONDASHED --capabilities CAPABILITY_NAMED_IAM
This pattern has created the IAM Role for the function, the Lambda function itself and a DynamoDB table to save data to. All these resources are still falling under the AWS free tier if you do not abuse them.
In case you will later make any changes to the application and need to re-deploy a new version, you may use the following script. It will validate changes in CloudFormation template and also publish the new version of the Lambda code package:
Show scriptUpload configs¶
In order for this function to be managed by sosw
, we have to register in as a Labourer
in the configs of sosw-Essentials. As you probably remember the configs are in the
config
DynamoDB table.
Specially for this tutorial we have a nice script to inject configs. It finds the JSON files
of the worker in FUNCTION/config
and “injects” the labourer.json
contents to the
existing configs of Essentials. It will also create a config for the Worker Lambda itself
out of the self.json
. You shall add twitter credentials in the placeholders there once
you receive them and re-run the uploader.
cd /var/app/sosw/examples
pipenv run python3 config_updater.py sosw_tutorial_pull_tweeter_hashtags
After updating the configs we must reset the Essentials so that they read fresh configs from the DynamoDB. There is currently no special AWS API endpoint for this, so we just re-deploy the essentials.
# Get your AccountId from EC2 metadata. Assuming you run this on EC2.
ACCOUNT=`curl http://169.254.169.254/latest/meta-data/identity-credentials/ec2/info/ | \
grep AccountId | awk -F "\"" '{print $4}'`
# Set your bucket name
BUCKETNAME=sosw-s3-$ACCOUNT
for name in `ls /var/app/sosw/examples/essentials`; do
echo "Deploying $name"
FUNCTIONDASHED=`echo $name | sed s/_/-/g`
cd /var/app/sosw/examples/essentials/$name
zip -qr /tmp/$name.zip *
aws lambda update-function-code --function-name $name --s3-bucket $BUCKETNAME \
--s3-key sosw/packages/$name.zip --publish
done
Schedule task¶
sosw_scheduler
Lambda
with the Job that we constructed at the very beginning. The payload for the Scheduler
must have the labourer_id
which is the name of Worker function and the optional job
.This JSON payload is also available in the file FUNCTION/config/task.json
.
cd /var/app/sosw/examples
PAYLOAD=`cat workers/sosw_tutorial_pull_tweeter_hashtags/config/task.json`
aws lambda invoke --function-name sosw_scheduler \
--payload "$PAYLOAD" /tmp/output.txt && cat /tmp/output.txt
Cleanup after tutorials¶
Most of the elements in the tutorials were created following Infrastructure as Code technique. Thus temoving AWS CloudFormation stacks will recursively remove all the resources that were created from them.
# Get your AccountId from EC2 metadata. Assuming you run this on EC2.
ACCOUNT=`curl http://169.254.169.254/latest/meta-data/identity-credentials/ec2/info/ | \
grep AccountId | awk -F "\"" '{print $4}'`
# You can't remove non-empty S3 bucket, so first clean it.
BUCKETNAME=sosw-s3-$ACCOUNT
aws s3 rm s3://$BUCKETNAME --recursive
# Remove CloudFormation stacks
cd /var/app/sosw
python3 examples/cleanup.py
Note
In some cases you might need to run the script python3 examples/cleanup.py
several
times until all stacks are removed. This is because of ImportValue
dependencies.
After you run this it is highly recommended to check manually that the resources were indeed removed:
- CloudFormation stacks
- S3 Bucket
- Lambda Functions
- IAM Roles
- DynamoDB Tables
- CloudWatch Events Scheduled Rules
Last thing - terminate your EC2 instance if you were running the tutorial from it.
Warning
Please be aware of different regions (selector in upper right corner of web-console). Most of the scripts were region-inspecific and resources were created in same region where your were running them from. But for this tutorial we recommended using us-west-2 (Oregon) region and some scripts might have this region hardcoded.
Contribution Guidelines¶
Documentation Convention¶
This document states the convention that we follow for writing Documentation and especially docstrings for
classes and functions for sosw
package. This convention is based on
[https://www.python.org/dev/peps/pep-0008/](PEP8), with minor styling changes listed below.
Basics¶
- We use
sphinx
package to compile documentation. - We use
sphinx.autodoc
extention to automatically parse the Python code and extract docstrings. .rst
wins against.md
- Make docstrings readable both in the code and in HTML. Use new lines and tabs to beautify the source of docstrings even if they are not really required.
- Richard wins against Winnie.
Common Sense Boosters¶
- Document for humans. You will have to read this one day later.
- Read other good docs to notice some good practices.
- Do not be afraid to use some new markup features not yet used in our documentation.
- Create labels where required, if you feel that you will need it one day from other doc.
- Do not make pull requests if your docstrings do not compile in Sphinx without warnings.
Structure¶
index.rst
of each Lambda should reside in ./docs/
. It should have the basic Readme information and links
to the documentation of external components used in the Lambda. At the end of the file, you should include the
autodoc directives for each module of your function (the minimum of app.py
)
If you add schemas or images (good practice) include them in ./docs/images/
and use appropriately.
Example of Docstring¶
def hello(name, age, tags=None):
"""
User friendly welcome function.
Uses `name` and `age` to salute user.
This is still same line of documentation.
While this is a new paragraph.
Note that `rst` is sensitive to empty lines and spaces.
Some code Example:
.. code-block:: python
def hello():
return "world"
This is paragraph 3.
* And some bullet list
* With couple rows
Now go parameters. PyCharm adds them automatically.
:param str name: User name of type string.
:param tags: Types are not required, but this is a good
practice to show what you expect.
:param age: You can also specify multiple types, with a
little different syntax.
Note that empty lines do not matter here for
`sphinx`, but good for code readability.
:type age: int | float
:rtype: dict
:return: You can specify type of return value in
`rtype` (if it is uncertain).
"""
return f"Hello {'bro' if age > 10 else 'kid'}"
I hope this example above was useful. Note the indention and spacing again. Now we are out of code-block. Do not get frustrated with the 80 chars width that I used in the example. This is just to show this code-block nicely when displayed as code in rendered HTML. Our convention is 120 characters max width.
Here is the compiled version of the example docstring from above:
-
docs.hello.
hello
(name, age, tags=None)[source]¶ User friendly welcome function. Uses name and age to salute user. This is still same line of documentation.
While this is a new paragraph. Note that rst is sensitive to empty lines and spaces.
Some code Example:
def hello(): return "world"
This is paragraph 3.
- And some bullet list
- With couple rows
- Even 3 rows
Now go parameters. PyCharm adds them automatically.
Parameters: - name (str) – User name of type string.
- tags – Types are not required, but this is a good practice to show what you expect.
- age (int | float) – You can also specify multiple types, with a little different syntax. Note that empty lines do not matter here for sphinx, but good for code readability.
Return type: dict
Returns: You can specify type of return value in rtype (if it is uncertain).
End of the compiled example.
Configuration¶
There are some bugs with compiling documentation for components. Sphinx recognises the module components correctly, but then in notices the same module during import from autodoc of lambdas. And fails to import manually.
- One workaround - create a symlink inside the lambdas (as init-lambda would normally do) and then include :automodule: for components directly in Lambdas index.
- Another option is to rename the components to smth else like components-tmp and compile the documentation for it. But you will have to take care about the links directly in the documentation of lambdas in the second case.
Sprinting¶
Welcome to the sosw
sprint in PyCon. You are probably eager to contribute, so try to make the
introduction as short as possible.
Initialization¶
- Fork the repository: https://github.com/sosw/sosw
- Register Account in AWS: SignUp
- Create DynamoDB Tables: * You can find the Cloudformation template for the databases in the example. * If you are not familiar with CloudFormation, we highly recommend at least learning the basics from the tutorial.
- Create Sandbox Lambda with Scheduler
- Play with it.
- Read the Documentation Convention
Release cycle¶
- Master branch commits are automatically packaged and published to PyPI.
- Branches for staging versions follow the pattern:
X_X_X
- Make your pull requests to the staging branch with highest number
- Latest documentation is compiled from branch
docme
. It should be up to date with latest staging branch, not the master. Make PRs with documentation change directly todocme
.
Initialization¶
- Fork the repository: https://github.com/sosw/sosw
- Register Account in AWS: SignUp
- Run
pipenv sync --dev
to setup your virtual environment and download the required dependencies - If you are not familiar with CloudFormation, we highly recommend at least learning the basics from the tutorial.
- Follow the Installation to setup your environment.
- Create some Sandbox Lambda.
- Play with it.
- Read the Documentation Convention
Building the docs¶
To build the docs locally, run: sphinx-build -ab html ./docs ./sosw-rtd
You can also use the built in python web server to view the html version directly from localhost in your preferred browser.
sphinx-build -ab html ./docs ./sosw-rtd; (cd sosw-rtd && python -m http.server)