Serverless Orchestrator of Serverless Workers

Travis - Build Read the Docs - Build PyPI - Installs / month PyPI - License

`sosw` - a set of tools for orchestrating asynchronous invocations of AWS Lambda functions. Essential components of `sosw` are implemented as AWS Lambda functions themselves.

Note

Please pronounce sosw correctly: /ˈsɔːsəʊ/

Essential Workflow Schema

Simple sosw workflows

Workers

The Worker functions themselves are your functions that require Orchestration. sosw package has multiple components and tools that we encourage you to use in your Workers to make the code DRY and stable: statistic aggregation, components initialization, configuration automatic assembling and more…

In order to be Orchestrated by sosw, the functions should be able to mark tasks they receive as completed once the job is done. If you inherit the main classes of your Lambdas from Worker this will be handled automatically. The default behaviour of Workers is not to touch the queue, but to call a Worker Assitant lambda to mark tasks as completed.

Read more: Worker


There could be defined three different Workflows.

Scheduling

Scheduling is transformation of your business jobs to specific tasks for Lambda Worker. One task = one invocation of a Worker. Scheduler provides an interface for your Business Applications, other Lambdas, Events (e.g. Scheduled Rules) to provide the Business Job.

Chunking of the Payload happens following the rules that you pre-configure. One of the built-in dimensions for chunking is calendar.

Scheduled tasks go to a stateful queue to be invoked when the time comes.

Read more: Scheduler

Orchestration

The Orchestrator is called automatically every minute by Scheduled Events. It evaluates the status of Workers at the current moment, the health of external metrics the Workers may depend on (e.g CPU lod of some DB, IOPS, etc.) and invokes the appropriate amount of new parallel invocations.

Read more: Orchestrator

Scavenger

The Scavenger is called automatically every minute by Scheduled Events. It collects the tasks marked as completed by the Workers and archives them.

If the task did not successfully accomplish it tries to re-invoke it with configurable exponential delay. In case the task completely fails after several invocations, the Scavenger marks it is dead and removes from the queue to avoid infinite retries. In this case some external alarm system: SNS or Lambda may be triggered as well.

Read more: Scavenger

Installation

Installation

`sosw` requires you to implement/deploy several Lambda functions (Essentials) using the appropriate core classes. The deployment is described in details, but assumes that you are familiar with basic AWS Serverless products.

Another deployment requirement is to create several DynamoDB tables.

You can find the Cloudformation template for the databases in the example.
If you are not familiar with CloudFormation, we highly recommend at least learning the basics from the tutorial.

Once again, the detailed guide for initial setup can be found in the Installation.

Installation

Setup AWS Account

sosw implementation currently supports only AWS infrastructure. If you are running production operations on AWS, we highly recommend setting up a standalone account for your first experiments with sosw. AWS Organisations now provide an easy way to set sub-accounts from the primary one.

To setup a completely isolated new account, follow the AWS Documentation

We shall require several services, but they are all supposed to fit in the AWS Free Tier. As long as the resources are created using CloudFormation, once you delete the stacks - the related resources will also be deleted automatically to avoid unnecessary charges.

See Cleanup after tutorials instructions in the Tutorials section.

Provision Required AWS Resources

This document shall guide you through the setup process for sosw Essentials and different resources required for them. All the resources are created using Infrastructure as Code concept and can be easily cleaned up if no longer required.

Warning

The following Guide assumes that you are running these commands from an EC2 machine using either a Key or Role with permissions to control IAM, CloudFormation, Lambda, CloudWatch, DynamoDB, S3 (and probably something else).

If you are running this in the test account - feel free to grant the IAM role of your EC2 instance the policy arn:aws:iam::aws:policy/AdministratorAccess, but never do this in Production.

If you plan to run tutorials after this, we recommend setting this up in us-west-2 (Oregon) Region. Some scripts in the tutorials guidelines may have the region hardcoded.

Now we assume that you have created a fresh Amazon Linux 2 machine with some IAM Role having permissions listed above. You may follow this tutorial if feeling uncertain, just create a new IAM Role on Step 3 of the instance setup Wizard.

Warning

Do not run this in Production AWS Account unless you completely understand what is going on!

The following commands are tested on a fresh EC2 instance of type t2.micro running on default Amazon Linux 2 AMI 64-bit.

# Install required system packages for SAM, AWS CLI and Python.
sudo yum update -y
sudo yum install zlib-devel build-essential python3.7 python3-devel git docker -y

# Update pip and ensure you have required Python packages locally for the user.
# You might not need all of them at first, but if you would like to test `sosw`
# or play with it run tests
sudo pip3 install -U pip pipenv boto3

sudo mkdir /var/app
sudo chown ec2-user:ec2-user /var/app

cd /var/app
git clone https://github.com/sosw/sosw.git
cd sosw

# Need to configure your AWS CLI environment.
# Assuming you are using a new machine we shall just copy config with default region
# `us-west-2` to $HOME. The credentials you should not keep in the profile.
# The correct secure way is to use IAM roles if running from the AWS infrastructure.
# Feel free to change or skip this step if your environment is configured.
cp -nr .aws ~/

Now you are ready to start creating AWS resources. First let us provide some shared resources that both sosw Essentials and sosw-managed Lambdas will use.

# Get your AccountId from EC2 metadata. Assuming you run this on EC2.
ACCOUNT=`curl http://169.254.169.254/latest/meta-data/identity-credentials/ec2/info/ | \
    grep AccountId | awk -F "\"" '{print $4}'`

# Set your bucket name
BUCKETNAME=sosw-s3-$ACCOUNT

PREFIX=/var/app/sosw/examples/yaml/initial

# Create new CloudFormation stacks
for filename in `ls $PREFIX`; do
    STACK=`echo $filename | sed s/.yaml//`

    aws cloudformation package --template-file $PREFIX/$filename \
        --output-template-file /tmp/deployment-output.yaml --s3-bucket $BUCKETNAME

    aws cloudformation deploy --template-file /tmp/deployment-output.yaml \
        --stack-name $STACK --capabilities CAPABILITY_NAMED_IAM
done

Note

Now, take a break and wait for these resources to be created. You may observe the changes in the CloudFormation web-console (Services -> CloudFormation).

Warning

DO NOT continue until all stacks reach the CREATE_COMPLETE status.

If you make any changes to these files in the future (after the initial deployment), use the following script and it will update CloudFormation stacks. There is ​no harm ​in running it extra time. CloudFormation is smart enough not to take any action if there are no actual changes in the templates.

Show script

Provision Lambda Functions for Essentials

In this tutorial we were first going to use AWS SAM for provisioning Lambdas, but eventually gave it up. Too much black magic is required and you eventually lose control over the Lambda. The example of deploying Essentials uses raw bash/python scripts, AWS CLI and CloudFormation templates. If you want to contribute providing examples with SAM, you are welcome to. Some sandboxes can be found in examples/sam/ in the repository.

# Get your AccountId from EC2 metadata. Assuming you run this on EC2.
ACCOUNT=`curl http://169.254.169.254/latest/meta-data/identity-credentials/ec2/info/ | \
    grep AccountId | awk -F "\"" '{print $4}'`

# Set your bucket name
BUCKETNAME=sosw-s3-$ACCOUNT

for name in `ls /var/app/sosw/examples/essentials`; do
    echo "Deploying $name"

    FUNCTION=$name
    FUNCTIONDASHED=`echo $name | sed s/_/-/g`

    cd /var/app/sosw/examples/essentials/$FUNCTION

    # Install sosw package locally.
    pip3 install -r requirements.txt --no-dependencies --target .

    # Make a source package.
    zip -qr /tmp/$FUNCTION.zip *

    # Upload the file to S3, so that AWS Lambda will be able to easily take it from there.
    aws s3 cp /tmp/$FUNCTION.zip s3://$BUCKETNAME/sosw/packages/

    # Package and Deploy CloudFormation stack for the Function.
    # It will create the Function and a custom IAM role for it with permissions
    # to access required DynamoDB tables.
    aws cloudformation package --template-file $FUNCTIONDASHED.yaml \
        --output-template-file /tmp/deployment-output.yaml --s3-bucket $BUCKETNAME

    aws cloudformation deploy --template-file /tmp/deployment-output.yaml \
        --stack-name $FUNCTIONDASHED --capabilities CAPABILITY_NAMED_IAM
done

If you change anything in the code or simply want to redeploy the code use the following:

Show script

Upload Essentials Configurations

sosw-managed Lambdas (and Essentials themselves) will automatically try to read their configuration from the DynamoDB table config. Each Lambda looks for the document with a range_key config_name = 'LAMBDA_NAME_config' (e.g. 'sosw_orchestrator_config').

The config_value should contain a JSON that will be recursively merged to the DEFAULT_CONFIG of each Lambda.

We have provided some very basic examples of configuring Essentials. The config files have some values that are dependant on your AWS Account ID, so we shall substitute it and then upload these configs to DynamoDB. It is much easier to do this in Python, so we shall call a python script for that. The script uses some sosw features for working with DynamoDB, so we shall have to install sosw.

cd /var/app/sosw
pipenv run pip install sosw
cd /var/app/sosw/examples/
pipenv run python3 config_updater.py

### Or alternatively use old one:
# cd /var/app/sosw/examples/essentials/.config
# python3 config_uploader.py
# cd /var/app/sosw

Please take your time to read more about Config Source and find advanced examples in the guidelines of Orchestrator, Scavenger and Scheduler.

Create Scheduled Rules

The usual implementation expects the Orchestrator and Scavenger to run every minute, while Scheduler and WorkerAssistant are executed per request. Scheduler may have any number of cronned Business Tasks with any desired periodicity of course.

The following script will create an AWS CloudWatch Events Scheduled Rule that will invoke the Orchestrator and Scavenger every minute.

Note

Make sure not to leave this rule enabled after you finish your tutorial, because after passing the free tier of AWS for Lambda functions it might cause unexpected charges.

# Set parameters:
BUCKETNAME=sosw-s3-$ACCOUNT
PREFIX=/var/app/sosw/examples/yaml
FILENAME=sosw-dev-scheduled-rules.yaml
STACK=sosw-dev-scheduled-rules

aws cloudformation package --template-file $PREFIX/$FILENAME \
    --output-template-file /tmp/deployment-output.yaml --s3-bucket $BUCKETNAME

aws cloudformation deploy --template-file /tmp/deployment-output.yaml \
    --stack-name $STACK --capabilities CAPABILITY_NAMED_IAM
Manual creation of rules

Essentials

Worker

View Licence Agreement

class sosw.worker.Worker(custom_config=None, **kwargs)[source]

We recommend that you inherit your core Processor from this class in Lambdas that are orchestrated by sosw.

The __call__ method is supposed to accept the event of the Lambda invocation. This is a dictionary with the payload received in the lambda_handler during invocation.

Worker has all the common methods of Processor and tries to mark task as completed if received task_id in the event. Worker create a payload with stats and result if exist and invoke worker assistant lambda.

mark_task_as_completed(task_id: str)[source]

Call worker assistant lambda and tell it to close task

Example

Please find the following elementary example of Worker Lambda.

import logging
from sosw import Worker
from sosw.app import LambdaGlobals, get_lambda_handler

logger = logging.getLogger()
logger.setLevel(logging.INFO)

class Processor(Worker):

    DEFAULT_CONFIG = {
        'init_clients':     ['dynamo_db'],
        'dynamo_db_config': {
            'row_mapper':      {
                'hash_col':  'S',  # Number
                'range_col': 'N',  # String
            },
            'required_fields': ['hash_col', 'range_col'],
            'table_name':      'autotest_dynamo_db',  # If a table is not specified, this table will be used.
        }
    }

    dynamo_db_client = None


    def __call__(self, event):

        # Example of your Worker logic
        row = event.get('row')
        self.put_to_db(row)

        # Do some basic cleaning and marking `sosw` task as completed.
        super().__call__(event)


    def put_to_db(self, row):

        self.dynamo_db_client.put(row)


    # Setting the entry point of the lambda.
    global_vars = LambdaGlobals()
    lambda_handler = get_lambda_handler(Processor, global_vars)

Orchestrator

Orchestrator does the … Orchestration.

You can use the class in your Lambda as is, just configure some settings using one of the supported ways in Config

The following diagram represents the basic Task Workflow initiated by the Orchestrator.

Invocation Process

Workers Invocation Workflow

TASKS_TABLE_CONFIG = {
    'row_mapper':       {
        'task_id':             'S',
        'labourer_id':         'S',
        'greenfield':          'N',
        'attempts':            'N',
        'closed_at':           'N',
        'completed_at':        'N',
        'desired_launch_time': 'N',
        'arn':                 'S',
        'payload':             'S'
    },
    'required_fields':  ['task_id', 'labourer_id', 'created_at', 'greenfield'],
    'table_name':       'sosw_tasks',
    'index_greenfield': 'sosw_tasks_greenfield',
    'field_names':      {
        'task_id':     'task_id',
        'labourer_id': 'labourer_id',
        'greenfield':  'greenfield',
    }
}

TASK_CLIENT_CONFIG = {
    'dynamo_db_config':                  TASKS_TABLE_CONFIG,
    'sosw_closed_tasks_table':           'sosw_closed_tasks',
    'sosw_retry_tasks_table':            'sosw_retry_tasks',
    'sosw_retry_tasks_greenfield_index': 'labourer_id_greenfield',
    'ecology_config':             {},
    'labourers':                         {
        'some_function': {
            'arn':                          f"arn:aws:lambda:us-west-2:737060422660:function:some_function",
            'max_simultaneous_invocations': 10,
            'health_metrics':               {
                'SomeDBCPU': {
                    'details':                     {
                        'Name':       'CPUUtilization',
                        'Namespace':  'AWS/RDS',
                        'Period':     60,
                        'Statistics': ['Average'],
                        'Dimensions': [
                            {
                                'Name':  'DBInstanceIdentifier',
                                'Value': 'YOUR-DB'
                            },
                        ],
                    },

                    # These is the mapping of how the Labourer should "feel" about this metric.
                    # See EcologyManager.ECO_STATUSES.
                    # This is just a mapping ``ECO_STATUS: value`` using ``feeling_comparison_operator``.
                    'feelings':                    {
                        3: 50,
                        4: 25,
                    },
                    'feeling_comparison_operator': '<='
                },
            },
        },
    },
}

ORCHESTRATOR_CONFIG = {
    'task_config': TASK_CLIENT_CONFIG,
}

Example CloudFormation template for Orchestrator

See also Greenfield

View Licence Agreement

class sosw.orchestrator.Orchestrator(custom_config=None, **kwargs)[source]
Orchestrator class.
Iterates the pre-configured Labourers and invokes appropriate number of Tasks for each one.
get_desired_invocation_number_for_labourer(labourer: sosw.labourer.Labourer) → int[source]

Decides the desired maximum number of simultaneous invocations for a specific Labourer. The decision is based on the ecology status of the Labourer and the configs.

Returns:Number of invocations
get_labourers() → List[sosw.labourer.Labourer][source]

Gets a list of pre-configured Labourers from TaskManager.

Returns:
invoke_for_labourer(labourer: sosw.labourer.Labourer)[source]

Invokes required queued tasks for labourer.

Scheduler

Scheduler is the public interface of sosw for any applications who want to invoke some orchestrated Lambdas. It’s main role is to transform some business job to the actual payload of Lambda invocations. It respects the configurable rules for chunking specific for different workers.

sosw Scheduler Workflow

Scheduler Workflow

TASKS_TABLE_CONFIG = {
    'row_mapper':       {
        'task_id':             'S',
        'labourer_id':         'S',
        'greenfield':          'N',
        'attempts':            'N',
        'closed_at':           'N',
        'completed_at':        'N',
        'desired_launch_time': 'N',
        'arn':                 'S',
        'payload':             'S'
    },
    'required_fields':  ['task_id', 'labourer_id', 'created_at', 'greenfield'],
    'table_name':       'sosw_tasks',
    'index_greenfield': 'sosw_tasks_greenfield',
    'field_names':      {
        'task_id':     'task_id',
        'labourer_id': 'labourer_id',
        'greenfield':  'greenfield',
    }
}

TASK_CLIENT_CONFIG = {
    'dynamo_db_config':                  TASKS_TABLE_CONFIG,
    'sosw_closed_tasks_table':           'sosw_closed_tasks',
    'sosw_retry_tasks_table':            'sosw_retry_tasks',
    'sosw_retry_tasks_greenfield_index': 'labourer_id_greenfield',
    'ecology_config':             {},
    'labourers':                         {
        'some_function': {
            'arn':                          'arn:aws:lambda:us-west-2:0000000000:function:some_function',
            'max_simultaneous_invocations': 10,
        },
    },
}

SCHEDULER_CONFIG = {

    'queue_bucket':       'some-bucket',

    'task_config': TASK_CLIENT_CONFIG,
    'job_schema':         {
        'chunkable_attrs': [
        ]
    }
}
View Licence Agreement

class sosw.scheduler.Scheduler(*args, **kwargs)[source]

Scheduler is converting business jobs to one or multiple Worker tasks.

Job supports a lot of dynamic settings that will coordinate the chunking.

Parameters:

max_YOURATTRs_per_batch: int

This is applicable only to the lowest level of chunking. If isolation of this parameters is not required, and all values of this parameter are simple strings/integers the scheduler shall chunk them in batches of given size. By default will chunk to 1kk objects in a list.

chunk_dates(job: Dict, skeleton: Dict = None) → List[Dict][source]

There is a support for multiple not nested parameters to chunk. Dates is one very specific of them.

chunk_job(job: dict, skeleton: Dict = None, attr: str = None) → List[Dict][source]

Recursively parses a job, validates everything and chunks to simple tasks what should be chunked. The Scenario of chunking and isolation is worth another story, so you should put a link here once it is ready.

construct_job_data(job: Dict, skeleton: Dict = None) → List[Dict][source]

Chunks the job to tasks using several layers. Each layer is represented with a chunker method. All chunkers should accept job and optional skeleton for tasks and return a list of tasks. If there is nothing to chunk for some chunker, return same job (with injected skeleton) wrapped in a list.

Default chunkers:

  • Date list chunking
  • Recursive chunking for chunkable_attrs
extract_job_from_payload(event: Dict)[source]

Parse and basically validate job from the event.

get_and_lock_queue_file() → str[source]

Either take a new (recently created) file in local /tmp/, or download the version of queue file from S3. We move the file in S3 to locked_ by prefix state or simply upload the new one there in locked_ state.

Returns:Local path to the file.
static get_index_from_list(attr, data)[source]

Finds the index ignoring the ‘s’ at the end of attribute.

get_next_chunkable_attr(attr)[source]

Return the next by order after attr chunkable attribute.

last_week(pattern: str = 'last_week') → List[str][source]

Returns list of dates (YYYY-MM-DD) as strings for last week (Sunday - Saturday) :param pattern: :return:

last_x_days(pattern: str) → List[str][source]

Constructs the list of date strings for chunking.

needs_chunking(attr: str, data: Dict) → bool[source]

Recursively analyses the data and identifies if the current level of data should be chunked. This could happen if either isolate_attr marker in the current scope or recursively in any of sub-elements.

Parameters:
  • attr – Name of attribute you want to check for chunking.
  • data – Input dictionary to analyse.
parse_job_to_file(job: Dict)[source]

Splits the Job to multiple tasks and writes them down in self.local_queue_file.

Parameters:job (dict) – Payload from Scheduled Rule. Should be already parsed from whatever payload to dict and contain the raw job
static pop_rows_from_file(file_name: str, rows: Optional[int] = 1) → List[str][source]

Reads the rows from the top of file. Along the way removes them from original file.

Parameters:
  • file_name (str) – File to read.
  • rows (int) – Number of rows to read. Default: 1
Returns:

List of strings read from file top.

previous_x_days(pattern: str) → List[str][source]

Returns a list of string dates from today - x - x

For example, consider today’s date as 2019-04-30. If I call for previous_x_days(pattern=’previous_2_days’), I will receive a list of string dates equal to: [‘2019-04-26’, ‘2019-04-27’]

process_file()[source]

Process a file for creating tasks, then uploading it to S3. In case of execution time reached its limit, spawning a new sibling to continue the processing.

remote_queue_file

Full S3 Key of file with queue of tasks not yet in DynamoDB.

remote_queue_locked_file

Full S3 Key of file with queue of tasks not yet in DynamoDB in the locked state. Concurrent processes should not touch it.

set_queue_file(name: str = None)[source]

Initialize a unique file_name to store the queue of tasks to write.

sufficient_execution_time_left

Return if there is a sufficient execution time for processing (‘shutdown period’ is in seconds).

today(pattern: str = 'today') → List[str][source]

Returns list with one datetime string (YYYY-MM-DD) equal to today’s date.

upload_and_unlock_queue_file()[source]

Upload the local queue file to S3 and remove the locked_ by prefix copy if it exists.

validate_list_of_vals(data: Union[list, set, tuple, Dict]) → list[source]

Supported resulting values: str, int, float.

Expects a simple iterable of supported values or a dictionary with values = None. The keys then are treated as resulting values and if they validate, are returned as a list.

x_days_back(pattern: str) → List[str][source]

Finds the exact date X days back from now. Returns it as a str in a list following the interface requirements of chunk_dates.

e.g. 1_days_back - yesterday, 7_days_back - same day as today last week

yesterday(pattern: str = 'yesterday') → List[str][source]

Simple wrapper for x_days_back() to return yesterday’s date.

Scavenger

The main roles of Scavenger are:

  • Find Completed tasks and archive them
  • Find Expired tasks (ones invoked, but not successfully completed by Workers) and either retry or mark them as failed.
View Licence Agreement

class sosw.scavenger.Scavenger(custom_config=None, **kwargs)[source]

Scavenger main class performes the following operations:

  • archive_tasks(labourer)
  • handle_expired_tasks(labourer)
  • retry_tasks(labourer)
archive_tasks(labourer: sosw.labourer.Labourer)[source]

Read from sosw_tasks the ones successfully marked as completed by Workers and archive them.

get_db_field_name(key: str) → str[source]

Could be useful if you overwrite field names with your own ones (e.g. for tests).

move_task_to_retry_table(task: Dict, labourer: sosw.labourer.Labourer)[source]

Put the task to a Dynamo table sosw_retry_tasks, with the wanted delay: labourer.max_runtime * attempts. Delete it from sosw_tasks table.

retry_tasks(labourer: sosw.labourer.Labourer)[source]

Read from dynamo table sosw_retry_tasks, get tasks with retry_time <= now, and put them to sosw_tasks in the beginning of the queue.

Worker Assitant

View Licence Agreement

class sosw.worker_assistant.WorkerAssistant(custom_config=None, **kwargs)[source]

Worker Assistant is the interface Worker Lambdas should call to mark their tasks completed.

The future versions will also accept the remaining workload to process to call Siblings in case the worker is about to time out and wants to finish healthy.

This Essential is supposed to be called synchronously by the Worker Lambdas. Should pass the action and task_id attributes in the payload of the call.

See example of the usage in Worker.

Processor

Core class for other components to inherit.

View Licence Agreement

class sosw.app.Processor(custom_config=None, **kwargs)[source]

Core Processor class template. All the main components (Worker, Orchestrator and Scheduler) inherit from this one. You can also use this class as parent for some of your standalone Lambdas, but we strictly encourage you to use Worker class in case you are running functions under sosw orchestration.

die(message='Unknown Failure')[source]

Logs current Processor stats and message. Then raises RuntimeError with message.

If there is access to publish SNS messages, the method will also try to publish to the topic configured as dead_sns_topic or ‘SoswWorkerErrors’.

Parameters:message (str) – Description of failure.
static get_config(name)[source]

Returns config by name from SSM. Override this to provide your config handling method.

Parameters:name – Name of the config
Return type:dict
get_stats(recursive: bool = True)[source]

Return statistics of operations performed by current instance of the Class.

Statistics of custom clients existing in the Processor is also aggregated by default. Clients must be initialized as self.some_client ending with _client suffix (e.g. self.dynamo_client). Clients must also have their own get_stats() methods implemented.

Be careful about circular get_stats() calls from child classes. If required overwrite get_stats() with recursive = False.

Parameters:recursive – Merge stats from self.***_client.
Return type:dict
Returns:Statistics counter of current Processor instance.
reset_stats(recursive: bool = True)[source]

Cleans statistics other than specified for the lifetime of processor. All the parameters with prefix ‘total_’ are also preserved.

The function makes sense if your Processor lives outside the scope of lambda_handler.

Be careful about circular get_stats() calls from child classes. If required overwrite reset_stats() with recursive = False.

Parameters:recursive – Reset stats from self.***_client.
class sosw.app.LambdaGlobals[source]

Global placeholder for global_vars that we want to preserve in the lifetime of the Lambda Container. e.g. once initiailised the given Processor, we keep it alive in the container to minimize warm-run time.

This namespace also contains the lambda_context which should be reset by get_lambda_handler method. See Worker examples in documentation for more info.

sosw.app.get_lambda_handler(processor_class, global_vars=None, custom_config=None)[source]

Return a reference to the entry point of the lambda function.

Parameters:
  • processor_class – Callable processor class.
  • global_vars – Lambda’s global variables (processor, context).
  • custom_config – Custom configuration to pass the processor constructor.
Returns:

Function reference for the lambda handler.

Components

Config Source

View Licence Agreement

Config manager component. Has methods for getting configuration for lambdas. Use Config class to get the correct methods we use for each action. Using SSMConfig or DynamoConfig directly is discouraged. Especially SSMConfig, because SSM throttles and has limits we reached in the past. Using these methods requires the Role to have permissions to access SSM and/or Dynamo for requested resources.

class sosw.components.config.ConfigSource(test=False, sources=None, config=None)[source]

A strategy adapter for config. Returns config from the selected config source. You can implement your own functions using these clients, and they can even call different configurations.

Parameters:
  • sources (str) – Config clients to initialize. Supported: SSM and Dynamo (default). Could be both, comma-separated. The first one then becomes default.
  • config (dict) – Custom configurations for clients. Should be in ssm_config, dynamo_config, etc. Don’t be confused, but sometimes configs also need their own configs. :)

DynamoDB Client

View Licence Agreement

class sosw.components.dynamo_db.DynamoDbClient(config)[source]

Has default methods for different types of DynamoDB tables.

The current implementation supports only one fixed table during initialization, but you are free to initialize multiple simultaneous dynamo_clients in your Lambda with different configs.

Config should have a mapping for the field types and required fields. Config example:

{
    'row_mapper':     {
        'col_name_1':      'N', # Number
        'col_name_2':      'S', # String
    },
    'required_fields': ['col_name_1']
    'table_name': 'some_table_name',  # If a table is not specified, this table will be used.
    'hash_key': 'the_hash_key',
    'dont_json_loads_results': True  # Use this if you don't want to convert json strings into json
}
batch_get_items_one_table(keys_list, table_name=None, max_retries=0, retry_wait_base_time=0.2, strict=None, fetch_all_fields=None)[source]

Gets a batch of items from a single dynamo table. Only accepts keys, can’t query by other columns.

Parameters:keys_list (list) – A list of the keys of the items we want to get. Gets the items that match the given keys. If some key doesn’t exist - it just skips it and gets the others. e.g. [{‘hash_col’: ‘1, ‘range_col’: 2}, {‘hash_col’: 3}] - will get a row where hash_col is 1 and range_col is 2, and also all rows where hash_col is 3.

Optional

Parameters:
  • table_name (str) –
  • max_retries (int) – If failed to get some items, retry this many times. Waiting between retries is multiplied by 2 after each retry, so retries shouldn’t be a big number. Default is 1.
  • retry_wait_base_time (int) – Wait this much time after first retry. Will wait twice longer in each retry.
  • strict (bool) – DEPRECATED.
  • fetch_all_fields (bool) – If False, will only get the attributes specified in the row mapper. If True, will get all attributes. Default is False.
Returns:

List of items from the table

Return type:

list

delete(keys: Dict, table_name: Optional[str] = None)[source]
Parameters:
  • keys (dict) – Keys and values of the row we delete.
  • table_name
dict_to_dynamo(row_dict, add_prefix=None, strict=True)[source]

Convert the row from regular dictionary to the ugly DynamoDB syntax. Takes settings from row_mapper.

e.g. {‘key1’: ‘value1’, ‘key2’: ‘value2’} will convert to: {‘key1’: {‘Type1’: ‘value1’}, ‘key2’: {‘Type2’: ‘value2’}}

Parameters:
  • row_dict (dict) – A row we want to convert to dynamo syntax.
  • add_prefix (str) – A string prefix to add to the key in the result dict. Useful for queries like update.
  • strict (bool) – If False, will get the type from the value in the dict (this works for numbers and strings). If True, won’t add them if they’re not in the required_fields, and if they are, will raise an error.
Returns:

DynamoDB Task item

Return type:

dict

dynamo_to_dict(dynamo_row: Dict, strict: bool = None, fetch_all_fields: Optional[bool] = None) → Dict[source]

Convert the ugly DynamoDB syntax of the row, to regular dictionary. We currently support only String or Numeric values. Latest ones are converted to int or float. Takes settings from row_mapper.

e.g.: {‘key1’: {‘N’: ‘3’}, ‘key2’: {‘S’: ‘value2’}} will convert to: {‘key1’: 3, ‘key2’: ‘value2’}

Parameters:
  • dynamo_row (dict) – DynamoDB row item
  • strict (bool) – DEPRECATED.
  • fetch_all_fields (bool) – If False only row_mapper fields will be extracted from dynamo_row, else, all fields will be extracted from dynamo_row.
Returns:

The row in a key-value format

Return type:

dict

get_by_query(keys: Dict, table_name: Optional[str] = None, index_name: Optional[str] = None, comparisons: Optional[Dict] = None, max_items: Optional[int] = None, filter_expression: Optional[str] = None, strict: bool = None, return_count: bool = False, desc: bool = False, fetch_all_fields: bool = None, expr_attrs_names: list = None) → Union[List[Dict], int][source]

Get an item from a table, by some keys. Can specify an index. If an index is not specified, will query the table. IMPORTANT: You must specify the rows you expect to be converted in row mapper in config, otherwise you won’t get them in the result. If you want to get items from dynamo by non-key attributes, this method is not for you.

Parameters:keys (dict) – Keys and values of the items we get. You must specify the hash key, and can optionally also add the range key. Example, in a table where the hash key is ‘hk’ and the range key is ‘rk’: * {‘hk’: ‘cat’, ‘rk’: ‘123’} * {‘hk’: ‘cat’}

Optional

Parameters:
  • table_name (str) – Name of the dynamo table. If not specified, will use table_name from the config.
  • index_name (str) – Name of the secondary index in the table. If not specified, will query the table itself.
  • comparisons (dict) – Type of comparison for each key. If a key is not mentioned, comparison type will be =. Valid values: =, <, <=, >, >=, begins_with. Comparisons only work for the range key. Example: if keys={‘hk’: ‘cat’, ‘rk’: 100} and comparisons={‘rk’: ‘<=’} -> will get items where rk <= 100
  • max_items (int) – Limit the number of items to fetch.
  • filter_expression (str) – Supports regular comparisons and between. Input must be a regular human string e.g. ‘key <= 42’, ‘name = marta’, ‘foo between 10 and 20’, etc.
  • strict (bool) – DEPRECATED.
  • return_count (bool) – If True, will return the number of items in the result instead of the items themselves
  • desc (bool) – By default (False) the the values will be sorted ascending by the SortKey. To reverse the order set the argument desc = True.
  • fetch_all_fields (bool) – If False, will only get the attributes specified in the row mapper. If True, will get all attributes. Default is False.
  • expr_attrs_names (list) – List of attributes names, in case if an attribute name begins with a number or contains a space, a special character, or a reserved word, you must use an expression attribute name to replace that attribute’s name in the expression. Example, if the list [‘session’, ‘key’] is received, then a new dict will be assigned to ExpressionAttributeNames: {‘#session’: ‘session’, ‘#key’: ‘key’}
Returns:

List of items from the table, each item in key-value format OR the count if return_count is True

get_by_scan(attrs=None, table_name=None, strict=None, fetch_all_fields=None)[source]

Scans a table. Don’t use this method if you want to select by keys. It is SLOW compared to get_by_query. Careful - don’t make queries of too many items, this could run for a long time.

Optional:

Parameters:
  • attrs (dict) – Attribute names and values of the items we get. Can be empty to get the whole table.
  • table_name (str) – Name of the dynamo table. If not specified, will use table_name from the config.
  • strict (bool) – DEPRECATED.
  • fetch_all_fields (bool) – If False, will only get the attributes specified in the row mapper. If True, will get all attributes. Default is False.
Returns:

List of items from the table, each item in key-value format

Return type:

list

get_by_scan_generator(attrs=None, table_name=None, strict=None, fetch_all_fields=None)[source]

Scans a table. Don’t use this method if you want to select by keys. It is SLOW compared to get_by_query. Careful - don’t make queries of too many items, this could run for a long time. Same as get_by_scan, but yields parts of the results.

Optional:

Parameters:
  • attrs (dict) – Attribute names and values of the items we get. Can be empty to get the whole table.
  • table_name (str) – Name of the dynamo table. If not specified, will use table_name from the config.
  • strict (bool) – DEPRECATED.
  • fetch_all_fields (bool) – If False, will only get the attributes specified in the row mapper. If false, will get all attributes. Default is True.
Returns:

List of items from the table, each item in key-value format

Return type:

list

get_capacity(table_name=None)[source]

Fetches capacity for data tables

Keyword Arguments:
table_name {str} – DynamoDB (default: {None})
Returns:
dict – read/write capacity for the table requested
get_stats()[source]

Return statistics of operations performed by current instance of the Class.

Returns:
  • dict - key: int statistics.
get_table_indexes(table_name: Optional[str] = None) → Dict[source]

Returns active indexes of the table: their hash key, range key, and projection type.

{
    'index_1_name': {
        'projection_type': 'ALL',  # One of: 'ALL'|'KEYS_ONLY'|'INCLUDE'
        'hash_key': 'the_hash_key_column_name',
        'range_key': 'the_range_key_column_name',  # Can be None if the index has no range key
        'provisioned_throughput': {
            'write_capacity': 5,
            'read_capacity': 10
        }
    },
    'index_2_name': ...
}
get_table_keys(table_name: Optional[str] = None) → Tuple[str, Optional[str]][source]

Returns table’s hash key name and range key name

Parameters:table_name
Returns:hash key and range key names
identify_dynamo_capacity(table_name=None)[source]

Identify and store the table capacity for a given table on the object

Arguments:
table_name {str} – short name of the dynamo db table to analyze
patch(keys: Dict, attributes_to_update: Optional[Dict] = None, attributes_to_increment: Optional[Dict] = None, table_name: Optional[str] = None, attributes_to_remove: Optional[List[str]] = None)[source]

Updates an item in DynamoDB. Will fail if an item with these keys does not exist.

put(row, table_name=None, overwrite_existing=True)[source]

Adds a row to the database

Parameters:
  • row (dict) – The row to add to the table. key is column name, value is value.
  • table_name (string) – Name of the dynamo table to add the row to.
  • overwrite_existing (bool) – Overwrite the existing row if True, otherwise will raise an exception if exists.
reset_stats()[source]

Cleans statistics.

sleep_db(last_action_time: datetime.datetime, action: str)[source]

Sleeps between calls to dynamodb (if it needs to). Uses the table’s capacity to decide how long it needs to sleep.

Parameters:
  • last_action_time – Last time when we did this action (read/write) to this dynamo table
  • action – “read” or “write”
transact_write(*transactions)[source]

Executes many write transaction. Can execute operations on different tables. Will split transactions to chunks - because transact_write_items accepts up to 10 actions. WARNING: If you’re expecting a transaction on more than 10 operations - AWS DynamoDB doesn’t support it.

dynamo_db_client = DynamoDbClient(config)
t1 = dynamo_db_client.make_put_transaction_item(row, table_name='table1')
t2 = dynamo_db_client.make_delete_transaction_item(row, table_name='table2')
dynamo_db_client.transact_write(t1, t2)
update(keys: Dict, attributes_to_update: Optional[Dict] = None, attributes_to_increment: Optional[Dict] = None, table_name: Optional[str] = None, condition_expression: Optional[str] = None, attributes_to_remove: Optional[List[str]] = None)[source]

Updates an item in DynamoDB. Will create a new item if doesn’t exist. IMPORTANT - If you want to make sure it exists, use patch method

Parameters:
  • keys (dict) – Keys and values of the row we update. Example, in a table where the hash key is ‘hk’ and the range key is ‘rk’: {‘hk’: ‘cat’, ‘rk’: ‘123’}
  • attributes_to_update (dict) – Dict of the attributes to be updated. Can contain both existing attributes and new attributes. Will update existing, and create new attributes. Example: {‘col_name’: ‘some_value’}
  • attributes_to_increment (dict) – Attribute names to increment, and the value to increment by. If the attribute doesn’t exist, will create it. Example: {‘some_counter’: ‘3’}
  • attributes_to_remove (list) – Will remove these attributes from the record
  • condition_expression (str) – Condition Expression that must be fulfilled on the object to update.
  • table_name (str) – Name of the table
sosw.components.dynamo_db.clean_dynamo_table(table_name='autotest_dynamo_db', keys=('hash_col', 'range_col'), filter_expression=None)[source]

Cleans the DynamoDB Table. Only for autotest tables.

Parameters:
  • table_name (str) – name of the table
  • keys (tuple) – the keys of the table
  • filter_expression (str) – Supports regular comparisons and between. Input must be a regular human string e.g. ‘key <= 42’, ‘name = marta’, ‘foo between 10 and 20’, etc.

Warning

There are some reserved words that woud not work with Filter Expression in case they are attribute names. Fix this one day.

Helpers

View Licence Agreement

Static helper methods which you can use in any Lambdas. Must be completely independent with no specific requirements.

sosw.components.helpers.validate_account_to_dashed(account)[source]

Validates the the provided string is in valid AdWords account format and converts it to dashed format.

Parameters:account (str) – AdWords Account
Return type:str
Returns:Dashed format
sosw.components.helpers.validate_account_to_int(account)[source]

Validates the the provided string is in valid AdWords account format and converts it to integer format.

Parameters:int) account ((str,) – AdWords Account
Returns:Account ID as integer
sosw.components.helpers.validate_list_of_numbers_from_csv(data)[source]

Converts a comma separated string of numeric values to a list of sorted unique integers. The values that do not match are skipped.

Parameters:iterable) data ((str,) –
  • str | iterable
Returns:
  • list(int)
sosw.components.helpers.camel_case_to_underscore(name)[source]

Converts attribute to string and formats it as underscored.

Parameters:name
  • str - CamelCase string (or something convertable to CamelCase with __str__() method.
Returns:
  • str - underscore_formatted_value
sosw.components.helpers.chunks(l, n)[source]

Yield successive n-sized chunks from l.

sosw.components.helpers.validate_uuid4(uuid_string)[source]

Validate that a UUID string is in fact a valid uuid4. Happily, the uuid module does the actual checking for us. It is vital that the ‘version’ kwarg be passed to the UUID() call, otherwise any 32-character hex string is considered valid.

sosw.components.helpers.rstrip_all(input, patterns)[source]

Strips all of the patterns from the right of the input. Order and spaces do not matter.

Parameters:
  • input
    • str - String to modify
  • patterns
    • list|set|tuple|str - Pattern[-s] to remove.
Returns:

  • str

sosw.components.helpers.get_one_or_none_from_dict(input, name, vtype=None)[source]

Extracts object by ‘name’ from the ‘input’. Tries also plural name in case not found by single ‘name’. In case found an iterable by plural name, validates that it has one or zero values in it. If vtype is specified, tries to convert result to it.

Parameters:
  • input (dict) – Input dictionary. Event of Lambda for example.
  • name (str) – Name of attribute (in singular form).
  • vtype (type) – Type to be converted to. Must be callable. Tested types: str, int, float
Returns:

  • instance of vtype | something else | None

Raises:

ValueError – In all cases something is wrong.

sosw.components.helpers.get_one_from_dict(input, name, vtype=None)[source]

Extracts object by ‘name’ from the ‘input’. Tries also plural name in case not found by single ‘name’. In case found an iterable by plural name, validates that it has exactly one value in it. If vtype is specified, tries to convert result to it.

Parameters:
  • input
    • dict - Input dictionary. Event of Lambda for example.
  • name
    • str - Name of attribute (in singular form).
  • vtype
    • type - Type to be converted to. Must be callable. Tested types: str, int, float
Returns:

  • instance of vtype | something else | None

Raises:

ValueError

  • In all cases something is wrong.

sosw.components.helpers.get_list_of_multiple_or_one_or_empty_from_dict(input, name, vtype=None)[source]

Extracts objects by ‘name’ from the ‘input’ and returns as a list. Tries both plural and singular names from the input. If vtype is specified, tries to convert each of the elements in the result to this type.

Parameters:
  • input
    • dict - Input dictionary. Event of Lambda for example.
  • name
    • str - Name of attribute (in plural form).
  • vtype
    • type - Type to be converted to. Must be callable. Tested types: str, int, float
Returns:

  • list - List of vtypes, or list of whatever was in input, or empty list.

Raises:

ValueError – In all cases something is wrong.

sosw.components.helpers.validate_date_list_from_event_or_days_back(input, days_back=0, key_name='date_list')[source]

Takes from input the date_list and extracts date_list. Validates and converts to datetime.date. Input should have date_list as list of strings or comma-separated string.

  • Format: YYYY-MM-DD
  • Examples:
['2018-01-01', '2018-02-01']
'2018-01-01, 2018-02-01'
Parameters:
  • input (dict) – This is supposed to be your whole Lambda event.
  • days_back (int) – Optional Number of days to take back from today. Ex: days_back=1 is yesterday. Default: today.
  • key_name (str) – Optional custom name of key to extract from ‘input’.
Returns:

list(datetime.date)

sosw.components.helpers.validate_date_from_something(d)[source]

Convert valid input to datetime.date() or raise either AttributeError or ValueError.

Parameters:d – Some input. Supported types: * datetime.datetime * datetime.date * int - Epoch or Epoch milliseconds * float - Epoch or Epoch milliseconds * str (YYYY-MM-DD) * str (YYYY-MM-DD HH:MM:SS)
Returns:Transformed d
Return type:datetime.date
Raises:ValueError
sosw.components.helpers.validate_datetime_from_something(d)[source]

Converts the input d to datetime.datetime.

Parameters:d – Some input. Supported types: * datetime.datetime * datetime.date * int - Epoch or Epoch milliseconds * float - Epoch or Epoch milliseconds * str (YYYY-MM-DD) * str (YYYY-MM-DD HH:MM:SS)
Returns:Transformed d
Return type:datetime.datetime
Raises:ValueError
sosw.components.helpers.validate_string_matches_datetime_format(date_str, date_format, field_name='date')[source]

Validate string, make sure it’s of the given datetime format

Parameters:
Raises:

ValueError

sosw.components.helpers.is_valid_date(date_str, date_formats)[source]

Validate string to be at least one of the given datetime formats.

Parameters:
  • date_str (str) – a date or time or both, Example: ‘2018/09/16’
  • date_formats (list) – List of datetime format, that is acceptable for datetime.strptime. Example: ‘%Y/%m/%d’
Return type:

bool

Returns:

True if the date string is valid for any of the datetime formats, False otherwise.

sosw.components.helpers.recursive_matches_soft(src, key, val, **kwargs)[source]

Searches the ‘src’ recursively for nested elements provided in ‘key’ with dot notation. In case some levels are iterable (list, tuple) it checks every element. In case the full path is inaccessible returns False. If any of the elements addressed by ‘key’ matches the ‘val’ - Bingo! Return True.

You might also be interested in recursive_exists_strict() helper.

Parameters:
  • src (dict) – Input dictionary. Can contain nested dictionaries and lists.
  • key (str) – Path to search with dot notation.
  • val (any) – Value to match in some elements specified by path.

In order to check not just that some element exists, but to check for duplicates, you might want to use optional ‘exclude’ attributes. If attributes are specified and the last level element following the path (dot notation) will have a key-value, the check for the main key-value will be skipped. See unittests to understand the bahaviour better.

Parameters:
  • exclude_key (str) – Key to check in last level element to exclude.
  • exclude_val (srt) – Value to match in last level element to exclude.
Return type:

bool

sosw.components.helpers.recursive_matches_strict(src, key, val, **kwargs)[source]

Searches the ‘input’ recursively for nested elements provided in ‘key’ with dot notation. In case some levels are iterable (list, tuple) it checks every element. In case the full path is inaccessible raises AttributeError or KeyError.

Parameters:
  • src (dict) – Input dictionary. Can contain nested dictionaries and lists.
  • key (str) – Path to search with dot notation.
  • val (any) – Value to match in some elements specified by path.
Return type:

bool

sosw.components.helpers.recursive_matches_extract(src, key, separator=None, **kwargs)[source]

Searches the ‘src’ recursively for nested elements provided in ‘key’ with dot notation. In case some levels are iterable (list, tuple) it checks every element in it till finds it.

Returns the first found element or None. In case the full path is inaccessible also returns None.

If you are just checking if some elements exist, you might be interested in recursive_exists_strict() or recursive_exists_soft() helpers.

Parameters:
  • src (dict) – Input dictionary. Can contain nested dictionaries and lists.
  • key (str) – Path to search with dot notation.
  • separator (str) – Custom separator for recursive extraction. Default: ‘.’

In order to filter out some specific elements, you might want to use the optional ‘exclude’ attributes. If attributes are specified and the last level element following the path (dot notation) will have a key-value, the check for the main key-value will be skipped. See unittests to understand the bahaviour better.

Parameters:
  • exclude_key (str) – Key to check in last level element to exclude.
  • exclude_val (str) – Value to match in last level element to exclude.
Returns:

Value from structure extracted by specified path

sosw.components.helpers.dunder_to_dict(data: dict, separator=None)[source]

Converts the flat dict with keys using dunder notation for nesting elements to regular nested dictionary.

E.g.:

data = {'a': 'v1', 'b__c': 'v2', 'b__d__e': 'v3'}
result = dunder_to_dict(data)

# result:

{
    'a': 'v1',
    'b': {
        'c': 'v2',
        'd': {'e': 'v3'}
    }
}
Parameters:
  • data – A dictionary that is converted to Nested.
  • separator (str) – Custom separator for recursive extraction. Default: ‘.’
sosw.components.helpers.nested_dict_from_keys(keys: List, value: Optional = None) → Dict[source]

Constructs a nested dictionary using a list of keys to embed recursively. If value is provided it is assigned to the last subkey.

Examples:

nested_dict_from_keys(['a', 'b', 'c']) == {'a': {'b': {'c': None}}}
nested_dict_from_keys(['a', 'b', 'c'], value=42) == {'a': {'b': {'c': 42}}}
Parameters:
  • keys – List of keys to embed.
  • value – Optional value to set to lowest level
sosw.components.helpers.convert_string_to_words(string)[source]

Convert string to comma separated words.

Parameters:string (str) – String to convert into words.
Return type:str
Returns:Comma separated words.
sosw.components.helpers.construct_dates_from_event(event: dict) → tuple[source]

Processes given event dictionary for start and end points of time. Otherwise takes the default settings.

The end date of the period may be specified as en_date in the event. The default value is today.

Also the event should have either st_date or days_back numeric parameter. If provided the days_back it will be substracted from end date.

Both st_date and en_date might be either date, datetime or string (‘YYYY-MM-DD’) types. In case of datetime, the hours/minutes/etc are ignored.

Parameters:event (dict) – Lambda payload.
Returns:start_date, end_date as datetime.date
sosw.components.helpers.validate_list_of_words_from_csv_or_list(data: (<class 'str'>, <class 'list'>)) → list[source]

Splits a CSV string to list of stripped words. In case the data is already a list of strings - splits it’s elements and flattens the result.

All resulting elements must be single words, if any of the elements contains spaces (i.e. multiple words) the validation fails with ValueError.

Parameters:data – CSV string of list of strings (possibly CSV themselves)
Returns:List of stripped and split words
sosw.components.helpers.first_or_none(items: Iterable, condition: Callable = None)[source]

Return first element in iterable to match condition or None

sosw.components.helpers.recursive_update(d: Dict, u: Mapping) → Dict[source]

Recursively updates the dictionary d with another one u. Values of u overwrite in case of type conflict.

List, set and tuple values of d and u are merged, preserving only unique values. Returned as List.

sosw.components.helpers.trim_arn_to_name(arn: str) → str[source]

Extract just the name of function from full ARN. Supports versions, aliases or raw name (without ARN).

More information about ARN Format: https://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html#genref-arns

sosw.components.helpers.trim_arn_to_account(arn: str) → str[source]

Extract just the ACCOUNT_ID from full ARN. Supports versions, aliases or raw name (without ARN).

More information about ARN Format: https://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html#genref-arns

sosw.components.helpers.make_hash(o)[source]

Makes a hash from a dictionary, list, tuple or set to any level, that contains only other hashable types (including any lists, tuples, sets, and dictionaries).

Original idea from this user: https://stackoverflow.com/users/660554/jomido

Plus some upgrades to work with sets and dicts having different types of keys appropriately. See source unittests of this function for some more details.

sosw.components.helpers.get_message_dict_from_sns_event(event)[source]

Extract SNS event message and return it loaded as a dict.

Parameters:event (dict) – Lambda SNS event (payload). Must be a JSON document.

:rtype dict :return: The SNS message, converted to dict

sosw.components.helpers.is_event_from_sns(event)[source]

Check if the lambda invocation was by SNS.

Parameters:event (dict) – Lambda Event (payload)
Return type:bool

Siblings

Warning

This components has performance issues unless running with force=True. Can cause CloudWatch requests throttling. Requires some refactoring.

SiblingsManager provides Lambda executions an option of a “healthy” shutdown. They may pass the remaining payload to another execution automatically. See example:

import logging
import time
from sosw import Processor as SoswProcessor
from sosw.app import LambdaGlobals, get_lambda_handler
from sosw.components.siblings import SiblingsManager

logger = logging.getLogger()
logger.setLevel(logging.INFO)

class Processor(SoswProcessor):

    DEFAULT_CONFIG = {
        'init_clients':    ['Siblings'],    # Automatically initialize Siblings Manager
        'shutdown_period': 10,  # Some time to shutdown in a healthy manner.
    }

    siblings_client: SiblingsManager = None


    def __call__(self, event):

        cursor = event.get('cursor', 0)

        while self.sufficient_execution_time_left:
            self.process_data(cursor)
            cursor += 1
            if cursor == 20:
                return f"Reached the end of data"

        else:
            # Spawning another sibling to continue the processing
            payload = {'cursor': cursor}

            self.siblings_client.spawn_sibling(global_vars.lambda_context, payload=payload, force=True)
            self.stats['siblings_spawned'] += 1


    def process_data(self, cursor):
        """ Your custom logic respecting current cursor. """
        logger.info(f"Processing data at cursor: {cursor}")
        time.sleep(1)


    @property
    def sufficient_execution_time_left(self) -> bool:
        """ Return whether there is a sufficient execution time for processing ('shutdown period' is in seconds). """
        return global_vars.lambda_context.get_remaining_time_in_millis() > self.config['shutdown_period'] * 1000


global_vars = LambdaGlobals()
lambda_handler = get_lambda_handler(Processor, global_vars)

Here is an example use-case when you can store the remaining payload for example in S3 and call the sibling with a pointer to it.

View Licence Agreement

class sosw.components.siblings.SiblingsManager(custom_config=None, **kwargs)[source]

This set of helpers can be used for Lambdas that want to invoke some siblings of self. Very useful for Lambdas processing queues and running out of time.

The Role of your Lambda must have the following extra permissions to run correctly. Please note that we hardcode the Arn in the policy to avoid circular dependency when parsing YAML. This dependency is absolutely valid, but CloudFormation doesn’t know how to parse it.

Policies:
- PolicyName: "YOUR_FUNCTION_NAME"
PolicyDocument:
  Version: "2012-10-17"
  Statement:
  - Effect: "Allow"
    Action: "cloudwatch:GetMetricStatistics"
    Resource: "*"
  - Effect: "Allow"
    Action: "lambda:InvokeFunction"
    Resource: "arn:aws:lambda:us-west-2:737060422660:function:YOUR_FUNCTION_NAME"
any_events_rules_enabled(lambda_context)[source]

Checks the Status of CloudWatch Events Rules. It is very important to use this checker before launching siblings. Otherwise, you can create an infinite autorespawning loop and waste A LOT of money.

Parameters:lambda_context – Context object from your lambda_handler.
Return type:bool
Raises:ResourceNotFoundException – If Rule with the given name doesn’t exist.
get_approximate_concurrent_executions(minutes_back=5, name=None)[source]

Get approximate concurrent executions from CloudWatch Metrics. The value is very approximate and calculated as count of invocations during minutes_back divided by average duration in same period. Return value is rounded to integer using ceil.

We assume that the Role has permissions to read CloudWatch.

Parameters:
  • minutes_back (int) – Aggregate statistics for this number of minutes.
  • name (str) – Name of the function to check. Default: currently running lambda.
Return type:

int

Returns:

Approximate number of concurrent executions.

spawn_sibling(lambda_context, payload=None, force=False)[source]

Asynchronously invokes a copy of same function to continue working. Should be called if there is still work left to do (ex: messages in the queue).

Can optionally send some payload for example remaining unprocessed rows of something. Should be formatted as dictionary.

Parameters:
  • lambda_context – Context object from your lambda_handler.
  • payload (dict) – The payload to be put to event.
  • force (bool) – If specified True it will ignore the checks of enabled Events Rules.

SNS Manager

View Licence Agreement

class sosw.components.sns.SnsManager(**kwargs)[source]

AWS Simple Notification System Manager helper. Requires the Role to have permissions to access SSM for requested resources, something like.

Must have a recipient specified either during initialization or later using set_recipient method. Messages received by send_message() are batched and will be actually send to SNS only during the call of commit() or during the destruction of the class.

The method doesn’t yet support batching messages for multiple recipients, so in case you try to change the recipient it automatically sends all the currently batched messages to the previous recipient.

commit()[source]

Combines messages from self.queue and pushes them to self.recipient. Cleans the queue after that.

create_subscription(topic_arn, protocol, endpoint)[source]

Create a subscription to the topic

Parameters:
  • topic_arn (str) – ARN of a topic
  • protocol (str) – The type of endpoint to subscribe
  • endpoint (str) – Endpoint that can receive notifications from Amazon SNS
create_topic(topic_name)[source]

Create a new topic name

Parameters:topic_name (str) – New topic name to create
Returns:New topic ARN
Return type:str
send_message(message, subject=None, forse_commit=False)[source]

If the subject is not yet set (for example during __init__() of the class) - then require subject to be set. Otherwize we accept None subject and simply append messages to queue. Once the subject changes - the queue is commite automatically.

Parameters:
  • message (str) – Message to be send in body of SNS message. Queued.
  • subject (str) – Optional. Custom subject for message.
set_client_attr(name, value)[source]

Sets the given _name_ attribute to _value_. Commits the self.queue if there are any messages in it.

set_separator(separator)[source]

Set custom separator for messages from the queue

tasks_api_client_for_workers

Warning

This components is a not generalized for broad usage. Requires refactoring.

Managers

TaskManager

View Licence Agreement

class sosw.managers.task.TaskManager(custom_config=None, **kwargs)[source]

TaskManager is the core class used by most sosw Lambdas. It handles all the operations with tasks thus the configuration of this Manager is essential during your sosw implementation.

The default version of TaskManager works with DynamoDB tables to store and analyze the state of Tasks. This could be upgraded in future versions to work with other persistent storage or DBs.

The very important concept to understand about Task workflow is greenfield. Read more.

construct_payload_for_task(**kwargs) → str[source]

Combines remaining kwargs to a singular JSON payload.

create_task(labourer: sosw.labourer.Labourer, strict: bool = True, **kwargs)[source]

Schedule a new task.

Parameters:
  • labourer – Labourer object of Lambda to execute the task.
  • strict (bool) – By default (True) prohibits specifying in the task (kwargs) the fields that are supposed to be autogenerated. Only if they match with autogen - then pass. You can override this and pass custom task properties setting strict = False
get_completed_tasks_for_labourer(labourer: sosw.labourer.Labourer) → List[Dict][source]

Return a list of tasks of the Labourer marked as completed. Scavenger is supposed to archive them all so no special filtering is required here.

In order to be able to use the already existing index_greenfield, we sort tasks only in invoked stages (greenfield > now()). This number is supposed to be small, so filtering by an un-indexed field will be fast.

get_count_of_running_tasks_for_labourer(labourer: sosw.labourer.Labourer) → int[source]

Returns a number of tasks we assume to be still running. Theoretically they can be dead with Exception, but not yet expired.

get_db_field_name(key: str) → str[source]

Could be useful if you overwrite field names with your own ones (e.g. for tests).

get_expired_tasks_for_labourer(labourer: sosw.labourer.Labourer) → List[Dict][source]

Return a list of tasks of Labourer previously invoked, and expired without being closed.

get_invoked_tasks_for_labourer(labourer: sosw.labourer.Labourer, completed: Optional[bool] = None) → List[Dict][source]

Return a list of tasks of current Labourer invoked during the current run of the Orchestrator.

If completed is provided: * True - filter completed ones * False - filter NOT completed ones * None (default) - do not care about completed status.

get_labourers() → List[sosw.labourer.Labourer][source]

Return configured Labourers. Config of the TaskManager expects ‘labourers’ as a dict ‘name_of_lambda’: {‘some_setting’: ‘value1’}

get_length_of_queue_for_labourer(labourer: sosw.labourer.Labourer) → int[source]

Approximate count of tasks still in queue for labourer. Tasks with greenfield <= now()

Parameters:labourer
Returns:
get_newest_greenfield_for_labourer(labourer: sosw.labourer.Labourer) → int[source]

Return value of the newest greenfield in queue. This means the end of the queue or latest added.

get_next_for_labourer(labourer: sosw.labourer.Labourer, cnt: int = 1, only_ids: bool = False) → List[Union[str, Dict]][source]

Fetch the next task(s) from the queue for the Labourer.

Parameters:
  • labourer – Labourer to get next tasks for.
  • cnt – Optional number of Tasks to fetch.
  • only_ids – If explicitly set True, then returns only the IDs of tasks. This could save some transport if you are sending big batches of tasks between Lambdas.
get_oldest_greenfield_for_labourer(labourer: sosw.labourer.Labourer, reverse: bool = False) → int[source]

Return value of oldest greenfield in queue. This means the beginning of the queue if you need FIFO behaviour.

get_running_tasks_for_labourer(labourer: sosw.labourer.Labourer, count: bool = False) → Union[List[Dict], int][source]

Return a list of tasks of Labourer previously invoked, but not yet closed or expired. We assume they are still running.

If count is specified as True will return just the number of tasks, not the items themselves. Much cheaper.

get_task_by_id(task_id: str) → Dict[source]

Fetches the full data of the Task.

invoke_task(labourer: sosw.labourer.Labourer, task_id: Optional[str] = None, task: Optional[Dict] = None)[source]

Invoke the Lambda Function execution for task. Providing the ID is more expensive, but safer from “task injection” attacks method that prefetches Task from the table before trying to invoke.

Skips already running tasks with no exception, thus concurrent Orchestrators (or whoever else) should not duplicate invocations.

is_valid_task(task: Dict) → bool[source]

Simple validation for required fields.

mark_task_invoked(labourer: sosw.labourer.Labourer, task: Dict, check_running: Optional[bool] = True)[source]

Update the greenfield with the latest invocation timestamp + invocation_delta

By default updates with a conditional expression that fails in case the current greenfield is already in invoked state. If this check fails the function raises RuntimeError that should be handled by the Orchestrator. This is very important to help duplicate invocations of the Worker by simultaneously running Orchestrators.

Parameters:
  • labourer – Labourer for the task
  • task – Task dictionary
  • check_running – If True (default) updates with conditional expression.

:raises RuntimeError

move_task_to_retry_table(task: Dict, wanted_delay: int)[source]

Put the task to a Dynamo table sosw_retry_tasks, with the wanted delay: labourer.max_runtime * attempts. Delete it from sosw_tasks table.

register_labourers() → List[sosw.labourer.Labourer][source]

Sets timestamps, health status and other custom attributes on Labourer objects passed for registration.

We also send a pointer to the TaskManager (aka self) to Ecology Manager. The latter will have to make some queries, and we don’t want him to initialise another TaskManager for himself.

retry_tasks(labourer: sosw.labourer.Labourer, tasks: List[Dict])[source]

Move tasks to tasks table, in beginning of the queue (with greenfield of a task that will be invoked next) All tasks must belong to the same labourer.

EcologyManager

View Licence Agreement

class sosw.managers.ecology.EcologyManager(*args, **kwargs)[source]
add_running_tasks_for_labourer(labourer: sosw.labourer.Labourer, count: int = 1)[source]

Adds to the current counter of running tasks the given count. Invokes the getter first in case the original number was not yet calculated from DynamoDB.

count_running_tasks_for_labourer(labourer: sosw.labourer.Labourer) → int[source]

TODO Refactor this to cache the value in the Labourer object itself. Should also update add_running_tasks_for_labourer() for that.

fetch_metric_stats(metric: Dict) → List[Dict][source]

Fetches from CloudWatch Datapoints of aggregated metric statistics. Fields in metric are the attributes of get_metric_statistics. Additional parameter: MetricAggregationTimeSlice in seconds is used to calculate the Start and EndTime.

If some fields are missing in the metric, the defaults come from config['default_metric_values']

get_health(value: Union[int, float], metric: Dict) → int[source]

Checks the value against the health_metric configuration.

get_labourer_average_duration(labourer: sosw.labourer.Labourer) → int[source]

Calculates the average duration of labourer executions.

The operation consumes DynamoDB RCU . Normally this method is called for each labourer only once during registration of Labourers. If you want to learn this value, you should ask Labourer object.

get_labourer_status(labourer: sosw.labourer.Labourer) → int[source]

Get the worst (lowest) health status according to preconfigured health metrics of the Labourer.

Current ECO_STATUSES:

  • (0, ‘Bad’)
  • (1, ‘Poor’)
  • (2, ‘Moderate’)
  • (3, ‘Good’)
  • (4, ‘High’)
get_max_labourer_duration(labourer: sosw.labourer.Labourer) → int[source]

Maximum duration of labourer executions.

get_stats(recursive=False)[source]

Return statistics of operations performed by current instance of the Class.

Statistics of custom clients existing in the Processor is also aggregated by default. Clients must be initialized as self.some_client ending with _client suffix (e.g. self.dynamo_client). Clients must also have their own get_stats() methods implemented.

Be careful about circular get_stats() calls from child classes. If required overwrite get_stats() with recursive = False.

Parameters:recursive – Merge stats from self.***_client.
Return type:dict
Returns:Statistics counter of current Processor instance.
register_task_manager(task_manager: sosw.managers.task.TaskManager)[source]

We will have to make some queries, and don’t want to initialise another TaskManager locally. Just receive the pointer to TaskManager from whoever needs.

This could be in __init__, but I don’t want to update the initialization workflows for every function initialising me. They usually use built-in in core Processor mechanism to register_clients().

reset_stats(recursive=False)[source]

Cleans statistics other than specified for the lifetime of processor. All the parameters with prefix ‘total_’ are also preserved.

The function makes sense if your Processor lives outside the scope of lambda_handler.

Be careful about circular get_stats() calls from child classes. If required overwrite reset_stats() with recursive = False.

Parameters:recursive – Reset stats from self.***_client.

Greenfield

Greenfield is a numeric field of the task used mainly by TaskManager to identify the current state of the task. The values in most states represent timestamps. TaskManager can easily identify the state by comparing the current time with the greenfield.

Possible states:

  • Queued
  • Invoked
    • Completed
    • Expired
    • Running

The following diagram represents different states.

Greenfield Timeline

Greenfield Timeline

Tutorials

Tutorial Pull Tweeter Hashtags

In this tutorial we are going to pull the data about popularity of several different topics in the hash tags of tweets. Imagine that we have already classified some popular hashtags to several specific groups. Now we want to pull the data about their usage in the last day.

Every day we would want to read the data only for the last 3 previous days, so we add the chunking parameters: "period": "last_2_days", "isolate_days": true

We shall pretend that pulling the data for every keyword takes several minutes. In this case we add another parameter: “isolate_words”: true to make sure that each word shall be processed in an different Lambda execution.

The following payload shall become our Job. This means that we shall chunk it per specific word / day combination and create independent tasks for the Worker Lambda for each chunk.

{
  "topics": {
    "cars": {
      "words": ["toyota", "mazda", "nissan", "racing", "automobile", "car"]
    },
    "food": {
      "words": ["recipe", "cooking", "eating", "food", "meal", "tasty"]
    },
    "shows": {
      "words": ["opera", "cinema", "movie", "concert", "show", "musical"]
    }
  },
  "period": "last_2_days",
  "isolate_days": true,
  "isolate_words": true
}

Now the time has come to create the actual Lambda.

Register Twitter App
First of all you will have to register your own twitter API credentials. https://developer.twitter.com/
Submitting the application takes 2-3 minutes, but after that you have to wait severals hours (or even days). You can submit the application now and add them to config later. The Lambda shall handle the missing credentials.
Package Lambda Code

Creating the Lambda is very similar to the way we deployed sosw Essentials. We use the same scripts and deployment workflow. Feel free to use your own favourite method or contribute to upgrade this one.

# Get your AccountId from EC2 metadata. Assuming you run this on EC2.
ACCOUNT=`curl http://169.254.169.254/latest/meta-data/identity-credentials/ec2/info/ | \
    grep AccountId | awk -F "\"" '{print $4}'`

# Set your bucket name
BUCKETNAME=sosw-s3-$ACCOUNT

FUNCTION="sosw_tutorial_pull_tweeter_hashtags"
FUNCTIONDASHED=`echo $FUNCTION | sed s/_/-/g`

cd /var/app/sosw/examples/workers/$FUNCTION

# Install sosw package locally. It's only dependency is boto3, but we have it in Lambda
# containter already. Saving a lot of packages size ignoring this dependency.
# Install other possible requirements directly into package.
pip3 install sosw --no-dependencies --target .
pip3 install -r requirements.txt --target .

# Make a source package. TODO is skip 'dist-info' and 'test' paths.
zip -qr /tmp/$FUNCTION.zip *

# Upload the file to S3, so that AWS Lambda will be able to easily take it from there.
aws s3 cp /tmp/$FUNCTION.zip s3://$BUCKETNAME/sosw/packages/

# Package and Deploy CloudFormation stack for the Function.
# It will create the Function and a custom IAM role for it with permissions to
# acces the required DynamoDB tables.
aws cloudformation package --template-file $FUNCTION.yaml \
    --output-template-file /tmp/deployment-output.yaml --s3-bucket $BUCKETNAME

aws cloudformation deploy --template-file /tmp/deployment-output.yaml \
    --stack-name $FUNCTIONDASHED --capabilities CAPABILITY_NAMED_IAM

This pattern has created the IAM Role for the function, the Lambda function itself and a DynamoDB table to save data to. All these resources are still falling under the AWS free tier if you do not abuse them.

In case you will later make any changes to the application and need to re-deploy a new version, you may use the following script. It will validate changes in CloudFormation template and also publish the new version of the Lambda code package:

Show script

Upload configs

In order for this function to be managed by sosw, we have to register in as a Labourer in the configs of sosw-Essentials. As you probably remember the configs are in the config DynamoDB table.

Specially for this tutorial we have a nice script to inject configs. It finds the JSON files of the worker in FUNCTION/config and “injects” the labourer.json contents to the existing configs of Essentials. It will also create a config for the Worker Lambda itself out of the self.json. You shall add twitter credentials in the placeholders there once you receive them and re-run the uploader.

cd /var/app/sosw/examples
pipenv run python3 config_updater.py sosw_tutorial_pull_tweeter_hashtags

After updating the configs we must reset the Essentials so that they read fresh configs from the DynamoDB. There is currently no special AWS API endpoint for this, so we just re-deploy the essentials.

# Get your AccountId from EC2 metadata. Assuming you run this on EC2.
ACCOUNT=`curl http://169.254.169.254/latest/meta-data/identity-credentials/ec2/info/ | \
    grep AccountId | awk -F "\"" '{print $4}'`

# Set your bucket name
BUCKETNAME=sosw-s3-$ACCOUNT

for name in `ls /var/app/sosw/examples/essentials`; do
    echo "Deploying $name"

    FUNCTIONDASHED=`echo $name | sed s/_/-/g`

    cd /var/app/sosw/examples/essentials/$name
    zip -qr /tmp/$name.zip *
    aws lambda update-function-code --function-name $name --s3-bucket $BUCKETNAME \
        --s3-key sosw/packages/$name.zip --publish
done
Schedule task
Congratulations!
You are ready to run the tutorial. You just to call the sosw_scheduler Lambda with the Job that we constructed at the very beginning. The payload for the Scheduler must have the labourer_id which is the name of Worker function and the optional job.
See full payload

This JSON payload is also available in the file FUNCTION/config/task.json.

cd /var/app/sosw/examples
PAYLOAD=`cat workers/sosw_tutorial_pull_tweeter_hashtags/config/task.json`
aws lambda invoke --function-name sosw_scheduler \
    --payload "$PAYLOAD" /tmp/output.txt && cat /tmp/output.txt

Cleanup after tutorials

Most of the elements in the tutorials were created following Infrastructure as Code technique. Thus temoving AWS CloudFormation stacks will recursively remove all the resources that were created from them.

# Get your AccountId from EC2 metadata. Assuming you run this on EC2.
ACCOUNT=`curl http://169.254.169.254/latest/meta-data/identity-credentials/ec2/info/ | \
    grep AccountId | awk -F "\"" '{print $4}'`

# You can't remove non-empty S3 bucket, so first clean it.
BUCKETNAME=sosw-s3-$ACCOUNT
aws s3 rm s3://$BUCKETNAME --recursive

# Remove CloudFormation stacks
cd /var/app/sosw
python3 examples/cleanup.py

Note

In some cases you might need to run the script python3 examples/cleanup.py several times until all stacks are removed. This is because of ImportValue dependencies.

After you run this it is highly recommended to check manually that the resources were indeed removed:

  • CloudFormation stacks
  • S3 Bucket
  • Lambda Functions
  • IAM Roles
  • DynamoDB Tables
  • CloudWatch Events Scheduled Rules

Last thing - terminate your EC2 instance if you were running the tutorial from it.

Warning

Please be aware of different regions (selector in upper right corner of web-console). Most of the scripts were region-inspecific and resources were created in same region where your were running them from. But for this tutorial we recommended using us-west-2 (Oregon) region and some scripts might have this region hardcoded.

Contribution Guidelines

Documentation Convention

This document states the convention that we follow for writing Documentation and especially docstrings for classes and functions for sosw package. This convention is based on [https://www.python.org/dev/peps/pep-0008/](PEP8), with minor styling changes listed below.

Basics
  • We use sphinx package to compile documentation.
  • We use sphinx.autodoc extention to automatically parse the Python code and extract docstrings.
  • .rst wins against .md
  • Make docstrings readable both in the code and in HTML. Use new lines and tabs to beautify the source of docstrings even if they are not really required.
  • Richard wins against Winnie.
Common Sense Boosters
  • Document for humans. You will have to read this one day later.
  • Read other good docs to notice some good practices.
  • Do not be afraid to use some new markup features not yet used in our documentation.
  • Create labels where required, if you feel that you will need it one day from other doc.
  • Do not make pull requests if your docstrings do not compile in Sphinx without warnings.
Structure

index.rst of each Lambda should reside in ./docs/. It should have the basic Readme information and links to the documentation of external components used in the Lambda. At the end of the file, you should include the autodoc directives for each module of your function (the minimum of app.py)

If you add schemas or images (good practice) include them in ./docs/images/ and use appropriately.

Example of Docstring
def hello(name, age, tags=None):
    """
    User friendly welcome function.
    Uses `name` and `age` to salute user.
    This is still same line of documentation.

    While this is a new paragraph.
    Note that `rst` is sensitive to empty lines and spaces.

    Some code Example:

    .. code-block:: python

        def hello():
            return "world"

    This is paragraph 3.

    * And some bullet list
    * With couple rows

    Now go parameters. PyCharm adds them automatically.

    :param str name:    User name of type string.
    :param tags:        Types are not required, but this is a good
                        practice to show what you expect.

    :param age:         You can also specify multiple types, with a
                        little different syntax.
                        Note that empty lines do not matter here for
                        `sphinx`, but good for code readability.
    :type age:          int | float

    :rtype:             dict
    :return:            You can specify type of return value in
                        `rtype` (if it is uncertain).
    """

    return f"Hello {'bro' if age > 10 else 'kid'}"

I hope this example above was useful. Note the indention and spacing again. Now we are out of code-block. Do not get frustrated with the 80 chars width that I used in the example. This is just to show this code-block nicely when displayed as code in rendered HTML. Our convention is 120 characters max width.


Here is the compiled version of the example docstring from above:

docs.hello.hello(name, age, tags=None)[source]

User friendly welcome function. Uses name and age to salute user. This is still same line of documentation.

While this is a new paragraph. Note that rst is sensitive to empty lines and spaces.

Some code Example:

def hello():
    return "world"

This is paragraph 3.

  • And some bullet list
  • With couple rows
  • Even 3 rows

Now go parameters. PyCharm adds them automatically.

Parameters:
  • name (str) – User name of type string.
  • tags – Types are not required, but this is a good practice to show what you expect.
  • age (int | float) – You can also specify multiple types, with a little different syntax. Note that empty lines do not matter here for sphinx, but good for code readability.
Return type:

dict

Returns:

You can specify type of return value in rtype (if it is uncertain).


End of the compiled example.

Configuration

There are some bugs with compiling documentation for components. Sphinx recognises the module components correctly, but then in notices the same module during import from autodoc of lambdas. And fails to import manually.

  • One workaround - create a symlink inside the lambdas (as init-lambda would normally do) and then include :automodule: for components directly in Lambdas index.
  • Another option is to rename the components to smth else like components-tmp and compile the documentation for it. But you will have to take care about the links directly in the documentation of lambdas in the second case.

Sprinting

Welcome to the sosw sprint in PyCon. You are probably eager to contribute, so try to make the introduction as short as possible.

Initialization

Release cycle

  • Master branch commits are automatically packaged and published to PyPI.
  • Branches for staging versions follow the pattern: X_X_X
  • Make your pull requests to the staging branch with highest number
  • Latest documentation is compiled from branch docme. It should be up to date with latest staging branch, not the master. Make PRs with documentation change directly to docme.

Code formatting

Follow the PEP8 but both classes and functions are padded with 2 empty lines.

Initialization

  • Fork the repository: https://github.com/sosw/sosw
  • Register Account in AWS: SignUp
  • Run pipenv sync --dev to setup your virtual environment and download the required dependencies
  • If you are not familiar with CloudFormation, we highly recommend at least learning the basics from the tutorial.
  • Follow the Installation to setup your environment.
  • Create some Sandbox Lambda.
  • Play with it.
  • Read the Documentation Convention

Building the docs

To build the docs locally, run: sphinx-build -ab html ./docs ./sosw-rtd

You can also use the built in python web server to view the html version directly from localhost in your preferred browser.

sphinx-build -ab html ./docs ./sosw-rtd; (cd sosw-rtd && python -m http.server)

Indices and tables