Overview

The QPush Bundle relies on the Push Queue model of Message Queues to provide asynchronous processing in your Symfony application. This allows you to remove blocking processes from the immediate flow of your application and delegate them to another part of your application or, say, a cluster of workers.

This bundle allows you to easily consume and process messages by simply tagging your service or services and relying on Symfony’s event dispatcher - without needing to run a daemon or background process to continuously poll your queue.

Content

Installation

The bundle should be installed through composer.

composer require uecode/qpush-bundle

Update AppKernel.php of your Symfony Application

Add the UecodeQPushBundle to your kernel bootstrap sequence, in the $bundles array

public function registerBundles()
{
    $bundles = array(
        // ...
        new Uecode\Bundle\QPushBundle\UecodeQPushBundle(),
    );

    return $bundles;
}

Configure the Bundle

The bundle allows you to specify different Message Queue providers - however, Amazon AWS and IronMQ are the only ones currently supported. Blocking, synchronous queues are also supported through the sync driver to aid development and debugging.

We are actively looking to add more and would be more than happy to accept contributions.

Providers

This bundle allows you to configure and use multiple supported providers with in the same application. Each queue that you create is attached to one of your registered providers and can have its own configuration options.

Providers may have their own dependencies that should be added to your composer.json file.

For specific instructions on how to configure each provider, please view their documents.

AWS Provider

The AWS Provider uses SQS & SNS to create a Push Queue model. SNS is optional with this provider and its possible to use just SQS by utilizing the provided Console Command (uecode:qpush:receive) to poll the queue.

Configuration

This provider relies on the AWS SDK PHP library, which needs to be required in your composer.json file.

This bundle will support both v2 and v3 of the AWS SDK.

{
    require: {
        "aws/aws-sdk-php": : "2.*" #OR "3.*"
    }
}

From there, the rest of the configuration is simple. You need to provide your credentials in your configuration.

#app/config.yml

uecode_qpush:
    providers:
        my_provider:
            driver: aws
            key:    <aws key>
            secret: <aws secret>
            region: us-east-1
    queues:
        my_queue_name:
            provider: my_provider
            options:
                push_notifications: true
                subscribers:
                    - { endpoint: http://example.com/qpush, protocol: http }

You may exclude the aws key and secret if you are using IAM role in EC2.

Using SNS

If you set push_notifications to true in your queue config, this provider will automatically create the SNS Topic, subscribe your SQS queue to it, as well as loop over your list of subscribers, adding them to your Topic.

This provider automatically handles Subscription Confirmations sent from SNS, as long as the HTTP endpoint you’ve listed is externally accessible and has the QPush Bundle properly installed and configured.

Overriding Queue Options

It’s possible to override the default queue options that are set in your config file when sending or receiving messages.

Publishing

The publish() method takes an array as a second argument. For the AWS Provider you are able to change the options listed below per publish.

If you disable push_notifications for a message, it will skip using SNS and only write the message to SQS. You will need to manually poll the SQS queue to fetch those messages.

Option Description Default Value
push_notifications Whether or not to POST notifications to subscribers of a Queue false
message_delay Time in seconds before a published Message is available to be read in a Queue 0
$message = ['foo' => 'bar'];

// Optional config to override default options
$options = [
    'push_notifications' => 0,
    'message_delay'      => 1
];

$this->get('uecode_qpush.my_queue_name')->publish($message, $options);

Receiving

The receive() method takes an array as a second argument. For the AWS Provider you are able to change the options listed below per attempt to receive messages.

Option Description Default Value
messages_to_receive Maximum amount of messages that can be received when polling the queue 1
receive_wait_time If supported, time in seconds to leave the polling request open - for long polling 3
// Optional config to override default options
$options = [
    'messages_to_receive' => 3,
    'receive_wait_time'   => 10
];

$messages = $this->get('uecode_qpush.my_queue_name')->receive($options);

foreach ($messages as $message) {
    echo $message->getBody();
}

IronMQ Provider

The IronMQ Provider uses its Push Queues to notify subscribers of new queued messages without needing to continually poll the queue.

Using a Push Queue is optional with this provider and its possible to use simple Pull queues by utilizing the provided Console Command (uecode:qpush::receive) to poll the queue.

Configuration

This provider relies on the Iron MQ classes and needs to have the library included in your composer.json file.

{
    require: {
        "iron-io/iron_mq": "^4.0"
    }
}

Configuring the provider is very easy. It requires that you have already created an account and have a project id.

Iron.io provides free accounts for Development, which makes testing and using this service extremely easy.

Just include your OAuth token and project_id in the configuration and set your queue to use a provider using the ironmq driver.

#app/config.yml

uecode_qpush:
    providers:
        my_provider:
            driver:     ironmq
            token:      YOUR_TOKEN_HERE
            project_id: YOUR_PROJECT_ID_HERE
            host:       YOUR_OPTIONAL_HOST_HERE
            port:       YOUR_OPTIONAL_PORT_HERE
            version_id: YOUR_OPTIONAL_VERSION_HERE
    queues:
        my_queue_name:
            provider: my_provider
            options:
                push_notifications: true
                subscribers:
                    - { endpoint: http://example.com/qpush, protocol: http }
IronMQ Push Queues

If you set push_notifications to true in your queue config, this provider will automatically create your Queue as a Push Queue and loop over your list of subscribers, adding them to your Queue.

This provider only supports http and https subscribers. This provider also uses the multicast setting for its Push Queues, meaning that all subscribers are notified of the same new messages.

You can chose to have your IronMQ queues work as a Pull Queue by setting push_notifications to false. This would require you to use the uecode:qpush:receive Console Command to poll the queue.

Overriding Queue Options

It’s possible to override the default queue options that are set in your config file when sending or receiving messages.

Publishing

The publish() method takes an array as a second argument. For the IronMQ Provider you are able to change the options listed below per publish.

Option Description Default Value
message_delay Time in seconds before a published Message is available to be read in a Queue 0
message_timeout Time in seconds a worker has to delete a Message before it is available to other workers 30
message_expiration Time in seconds that Messages may remain in the Queue before being removed 604800
$message = ['foo' => 'bar'];

// Optional config to override default options
$options = [
    'message_delay'      => 1,
    'message_timeout'    => 1,
    'message_expiration' => 60
];

$this->get('uecode_qpush.my_queue_name')->publish($message, $options);

Receiving

The receive() method takes an array as a second argument. For the AWS Provider you are able to change the options listed below per attempt to receive messages.

Option Description Default Value
messages_to_receive Maximum amount of messages that can be received when polling the queue 1
message_timeout Time in seconds a worker has to delete a Message before it is available to other workers 30
// Optional config to override default options
$options = [
    'messages_to_receive' => 3,
    'message_timeout'     => 10
];

$messages = $this->get('uecode_qpush.my_queue_name')->receive($options);

foreach ($messages as $message) {
    echo $message->getBody();
}

Sync Provider

The sync provider immediately dispatches and resolves queued events. It is not intended for production use but instead to support local development, debugging and testing of queue-based code paths.

Configuration

To designate a queue as synchronous, set the driver of its provider to sync. No further configuration is necessary.

#app/config_dev.yml

uecode_qpush:
    providers:
        in_band:
            driver: sync
    queues:
        my_queue_name:
            provider: in_band

File Provider

The file provider uses the filesystem to dispatch and resolve queued messages.

Configuration

To designate a queue as file, set the driver of its provider to file. You will need to configure a readable and writable path to store the messages.

#app/config_dev.yml

uecode_qpush:
    providers:
        file_based:
            driver: file
            path: [Path to store messages]
    queues:
        my_queue_name:
            provider: file_based

Custom Provider

The custom provider allows you to use your own provider. When using this provider, your implementation must implement Uecode\Bundle\QPushBundle\Provider\ProviderInterface

Configuration

To designate a queue as custom, set the driver of its provider to custom, and the service to your service id.

#app/config_dev.yml

uecode_qpush:
    providers:
        custom_provider:
            driver: custom
            service: YOUR_CUSTOM_SERVICE_ID
    queues:
        my_queue_name:
            provider: custom_provider

Caching

Providers can leverage a caching layer to limit the amount of calls to the Message Queue for basic lookup functionality - this is important for things like AWS’s ARN values, etc.

By default the library will attempt to use file cache, however you can pass your own cache service, as long as its an instance of Doctrine\Common\Cache\Cache.

The configuration parameter cache_service expects the container service id of a registered Cache service. See below.

#app/config.yml

services:
    my_cache_service:
        class: My\Caching\CacheService

uecode_qpush:
    cache_service: my_cache_service

Note: Though the Queue Providers will attempt to create queues if they do not exist when publishing or receiving messages, it is highly recommended that you run the included console command to build queues and warm cache from the CLI beforehand.

Queue Options

Each queue can have their own options that determine how messages are published or received. The options and their descriptions are listed below.

Option Description Default Value
queue_name The name used to describe the queue on the Provider’s side null
push_notifications Whether or not to POST notifications to subscribers of a Queue false
notification_retries How many attempts notifications are resent in case of errors - if supported 3
message_delay Time in seconds before a published Message is available to be read in a Queue 0
message_timeout Time in seconds a worker has to delete a Message before it is available to other workers 30
message_expiration Time in seconds that Messages may remain in the Queue before being removed 604800
messages_to_receive Maximum amount of messages that can be received when polling the queue 1
receive_wait_time If supported, time in seconds to leave the polling request open - for long polling 3
fifo If supported (only aws), sets queue into FIFO mode false
content_based_deduplication If supported (only aws), turns on automatic deduplication id based on the message content false
subscribers An array of Subscribers, containing an endpoint and protocol empty

Symfony Application as a Subscriber

The QPush Bundle uses a Request Listener which will capture and dispatch notifications from your queue providers for you. The specific route you use does not matter.

In most cases, it is recommended to just list the host or domain for your Symfony application as the endpoint of your subscriber. You do not need to create a new action for QPush to receive messages.

Logging with Monolog

By default, logging is enabled in the Qpush Bundle and uses Monolog, configured via the MonologBundle. You can toggle the logging behavior by setting logging_enabled to false.

Logs will output to your default Symfony environment logs using the ‘qpush’ channel.

Example Configuration

A working configuration would look like the following

uecode_qpush:
    cache_service: null
    logging_enabled: true
    providers:
        aws:
            driver: aws #optional for providers named 'aws' or 'ironmq'
            key: YOUR_AWS_KEY_HERE
            secret: YOUR_AWS_SECRET_HERE
            region: YOUR_AWS_REGION_HERE
        another_aws_provider:
            driver: aws #required for named providers
            key: YOUR_AWS_KEY_HERE
            secret: YOUR_AWS_SECRET_HERE
            region: YOUR_AWS_REGION_HERE
        ironmq:
            driver: aws #optional for providers named 'aws' or 'ironmq'
            token: YOUR_IRONMQ_TOKEN_HERE
            project_id: YOUR_IRONMQ_PROJECT_ID_HERE
        in_band:
            driver: sync
        custom_provider:
            driver: custom
            service: YOUR_CUSTOM_SERVICE_ID
    queues:
        my_queue_key:
            provider: ironmq #or aws or in_band or another_aws_provider
            options:
                queue_name:                  my_actual_queue_name
                push_notifications:          true
                notification_retries:        3
                message_delay:               0
                message_timeout:             30
                message_expiration:          604800
                messages_to_receive:         1
                receive_wait_time:           3
                fifo:                        false
                content_based_deduplication: false
                subscribers:
                    - { endpoint: http://example1.com/, protocol: http }
                    - { endpoint: http://example2.com/, protocol: http }
        my_fifo_queue_key:
            provider: aws
            options:
                queue_name:                  my_actual_queue_name.fifo
                push_notifications:          false
                notification_retries:        3
                message_delay:               0
                message_timeout:             30
                message_expiration:          604800
                messages_to_receive:         1
                receive_wait_time:           3
                fifo:                        true
                content_based_deduplication: true
                subscribers:
                    - { endpoint: http://example1.com/, protocol: http }
                    - { endpoint: http://example2.com/, protocol: http }

Note that FIFO queues are not currently compatible with push_notifications. For more information, see: http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-subscribe-queue-sns-topic.html

Usage

Once configured, you can create messages and publish them to the queue. You may also create services that will automatically be fired as messages are pushed to your application.

For your convenience, a custom Provider service will be created and registered in the Container for each of your defined Queues. The container queue service id will be in the format of uecode_qpush.{your queue name}.

Publishing messages to your Queue

Publishing messages is simple - fetch your Provider service from the container and call the publish method on the respective queue, which accepts an array.

#src/My/Bundle/ExampleBundle/Controller/MyController.php

public function publishAction()
{
    $message = [
        'messages should be an array',
        'they can be flat arrays' => [
            'or multidimensional'
        ]
    ];

    $this->get('uecode_qpush.my_queue_name')->publish($message);
}

Working with messages from your Queue

Messages are either automatically received by your application and events dispatched (setting push_notification to true), or can be picked up by Cron jobs through an included command if you are not using a Message Queue provider that supports Push notifications.

When the notifications or messages are Pushed to your application, the QPush Bundle automatically catches the request and dispatches an event which can be easily hooked into.

MessageEvents

Once a message is received via POST from your Message Queue, a MessageEvent is dispatched which can be handled by your services. Each MessageEvent contains the name of the queue and a Uecode\Bundle\QPushBundle\Message\Message object, accessible through getters.

#src/My/Bundle/ExampleBundle/Service/ExampleService.php

use Uecode\Bundle\QPushBundle\Event\MessageEvent

public function onMessageReceived(MessageEvent $event)
{
    $queue_name = $event->getQueueName();
    $message    = $event->getMessage();
}

The Message objects contain the provider specific message id, a message body, and a collection of provider specific metadata.

These properties are accessible through simple getters.

The message body is an array matching your original message. The metadata property is an ArrayCollection of varying fields sent with your message from your Queue Provider.

#src/My/Bundle/ExampleBundle/Service/ExampleService.php

use Uecode\Bundle\QPushBundle\Event\MessageEvent;
use Uecode\Bundle\QPushBundle\Message\Message;

public function onMessageReceived(MessageEvent $event)
{
    $id         = $event->getMessage()->getId();
    $body       = $event->getMessage()->getBody();
    $metadata   = $event->getMessage()->getMetadata();

    // do some processing
}

Tagging Your Services

For your Services to be called on QPush events, they must be tagged with the name uecode_qpush.event_listener. A complete tag is made up of the following properties:

Tag Property Example Description
name uecode_qpush.event_listener The Qpush Event Listener Tag
event {queue name}.message_received The message_received event, prefixed with the Queue name
method onMessageReceived A publicly accessible method on your service
priority 100 Priority, 1-100 to control order of services. Higher priorities are called earlier

The priority is useful to chain services, ensuring that they fire in a certain order - the higher priorities fire earlier.

Each event fired by the Qpush Bundle is prefixed with the name of your queue, ex: my_queue_name.message_received.

This allows you to assign services to fire only on certain queues, based on the queue name. However, you may also have multiple tags on a single service, so that one service can handle events from multiple queues.

services:
    my_example_service:
    class: My\Example\ExampleService
    tags:
        - { name: uecode_qpush.event_listener, event: my_queue_name.message_received, method: onMessageReceived }

The method listed in the tag must be publicly available in your service and should take a single argument, an instance of Uecode\Bundle\QPushBundle\Event\MessageEvent.

#src/My/Bundle/ExampleBundle/Service/MyService.php

use Uecode\Bundle\QPushBundle\Event\MessageEvent;

// ...

public function onMessageReceived(MessageEvent $event)
{
    $queueName  = $event->getQueueName();
    $message    = $event->getMessage();
    $metadata   = $message()->getMetadata();

    // Process ...
}

Cleaning Up the Queue

Once all other Event Listeners have been invoked on a MessageEvent, the QPush Bundle will automatically attempt to remove the Message from your Queue for you.

If an error or exception is thrown, or event propagation is stopped earlier in the chain, the Message will not be removed automatically and may be picked up by other workers.

If you would like to remove the message inside your service, you can do so by calling the delete method on your provider and passing it the message id. However, you must also stop the event propagation to avoid other services (including the Provider service) from firing on that MessageEvent.

#src/My/Bundle/ExampleBundle/Service/MyService.php

use Uecode\Bundle\QPushBundle\Event\MessageEvent;

// ...

public function onMessageReceived(MessageEvent $event)
{
    $id = $event->getMessage()->getId();
    // Removes the message from the queue
    $awsProvider->delete($id);

    // Stops the event from propagating
    $event->stopPropagation();
}

Push Queues in Development

It is recommended to use your config_dev.yml file to disable the push_notifications settings on your queues. This will make the queue a simple Pull queue. You can then use the uecode:qpush:receive Console Command to receive messages from your Queue.

If you need to test the Push Queue functionality from a local stack or internal machine, it’s possible to use ngrok to tunnel to your development environment, so its reachable by your Queue Provider.

You would need to update your config_dev.yml configuration to use the ngrok url for your subscriber(s).

Console Commands

This bundle includes some Console Commands which can be used for building, destroying and polling your queues as well as sending simple messages.

Build Command

You can use the uecode:qpush:build command to create the queues on your providers. You can specify the name of a queue as an argument to build a single queue. This command will also warm cache which avoids the need to query the provider’s API to ensure that the queue exists. Most queue providers create commands are idempotent, so running this multiple times is not an issue.:

$ php app/console uecode:qpush:build my_queue_name

Note: By default, this bundle uses File Cache. If you clear cache, it is highly recommended you re-run the build command to warm the cache!

Destroy Command

You can use the uecode:qpush:destroy command to completely remove queues. You can specify the name of a queue as an argument to destroy a single queue. If you do not specify an argument, this will destroy all queues after confirmation.:

$ php app/console uecode:qpush:destroy my_queue_name

Note: This will remove queues, even if there are still unreceived messages in the queue!

Receive Command

You can use the uecode:qpush:receive command to poll the specified queue. This command takes the name of a queue as an argument. Messages received from this command are dispatched through the EventDispatcher and can be handled by your tagged services the same as Push Notifications would be.:

$ php app/console uecode:qpush:receive my_queue_name

Publish Command

You can use the uecode:qpush:publish command to send messages to your queue from the CLI. This command takes two arguments, the name of the queue and the message to publish. The message needs to be a json encoded string.:

$ php app/console uecode:qpush:publish my_queue_name '{"foo": "bar"}'