Hot to let RabbitMQ do the work in your Symfony2 application
Many of the high-performance websites we create nowadays sometimes need to perform tasks that take a relatively large amount of time, such as sending an user an email after registration, creating a thumbnail from an avatar, or processing a large XML or CSV import file.
Some of these tasks require a lot of memory, others just take time to complete. It would be great to avoid a delay for the user who just registered and needs us to send him an email or to have a nice, scalable architecture to parse big files efficiently.
In this article we will consider and focus on second example and look at ways to reduce the impact of needing to process large import files.
Processing large files is painful for performance, even if we have access to really good hardware. It becomes even more painful when we need to process data that contains relations to itself, such as a list of people or products which are related to each other. One tool that we use to solve this type of problem at Inviqa is RabbitMQ, a message queue tool that will save both your time and your hair!
In our examples we will use Symfony2 and the RabbitMqBundle to go with it, but even if you are not familiar with this framework, the examples shown are intended to be understandable to any PHP developer.
What is RabbitMQ?
RabbitMQ is a message broker system which allows you to write messages to an exchange with one process, called the producer, and then read back from the queue with another process, called the consumer. This seems like a good time to explain what those exchanges, queues and the bindings between them actually are.
- exchange: exchanges are entry point for all messages published by producers.
- queue: you can think about a queue as a container where messages are stored.
- binding: bindings are rule-sets to connect exchanges with queues, dictating which messages received by the exchange should be added to which queue.
Messages in queues are stored independently from one another. Each message contains everything that is needed to process itself. A message might be a serialised object, an email to send, the path to an image which needs thumbnailing, or even the actual image itself - it can be anything. The only limit here is your imagination!
In our case we will have one queue where one message will be a single node of an XML file, in a format like <item>...</item>.
In each system there can be an unlimited number of queues where messages are stored. It is also possible to have multiple processes reading from one queue, to enable faster processing of messages, or the processing of many different types of messages.
Furthermore, consumer and producer processes can be spread across many servers, so you can have a cloud of servers to process the messages in a queue. For our example we will work with a single message queue and one type of message.
A representation of the processing of one message in RabbitMQ:
Advantages of using RabbitMQ
So far, we've learned what RabbitMQ is, but using it in our application gives us some great benefits:
- Scalability: one consumer's process reads one message from one queue, but there is nothing limiting you to one consumer. So you could for example process the queue using as many consumers as you need, perhaps also using the cloud for extra scale and flexibility.
- Clustering: several RabbitMQ servers on a local network can be clustered together, forming a single logical broker.
- Highly Available Queues: queues can be mirrored across several machines in a cluster, ensuring that even in the event of hardware failure your messages are safe.
- More responsive applications: some actions are time-consuming as we mentioned, such as sending a registration email or creating thumbnails. You can avoid a delay to the user by delegating the expensive task to a specialised consumer which will process it asynchronously.
RabbitMQ has many, many more great features. You can find out more in the RabbitMQ documentation and read more about the product itself on the RabbitMQ Project Page.
Install RabbitMQ
First of all you will want to get all the various tools set up and we will begin by installing RabbitMQ. The following steps are the installation for my Ubuntu platform, but if you would like instructions for another system, there is good documentation available. Pick your system from the list on the right hand side.
- As we want to use newest version of RabbitMQ we will use their repository to fetch the packages from. To do this, add the following line to your /etc/apt/sources.lis
$ deb http://www.rabbitmq.com/debian/ testing main
2. To use the new repository, we will also need to add an entry to our trusted keys list, like this:
$ wget http://www.rabbitmq.com/rabbitmq-signing-key-public.asc $ sudo apt-key add rabbitmq-signing-key-public.asc
3. Now we can update our packages and then install RabbitMQ:
$ sudo apt-get update $ sudo apt-get install rabbitmq-server
At this point, the installation should complete successfully and we should have RabbitMQ running on our local system. We can check everything is working as expected by running the status command:
$ sudo rabbitmqctl status
After running this command you should see status of your node, with various metrics such as the current version of RabbitMQ, used memory, uptime, etc. Here is part of the output from my console:
Status of node rabbit@precise64 ...
[{pid,928},
{running_applications,
[{rabbitmq_management,"RabbitMQ Management Console","3.1.5"},
{rabbitmq_management_agent,"RabbitMQ Management Agent","3.1.5"},
{rabbit,"RabbitMQ","3.1.5"},
...
{disk_free,77721620480},
{file_descriptors,
[{total_limit,924},{total_used,4},{sockets_limit,829},{sockets_used,1}]},
{processes,[{limit,1048576},{used,193}]},
{run_queue,0},
{uptime,44}]
...done.
Install the RabbitMQ admin panel
The admin panel for RabbitMQ is not required for today's tutorial, but I highly recommend it. It's a web-based admin panel which simplifies management of RabbitMQ by allowing you to see statistics, see a list of queues, display the messages, add messages by hand, and many other tricks.
To install the admin panel, we add it as a plugin to RabbitMQ by executing the following command:
$ sudo rabbitmq-plugins enable rabbitmq_management
Now to get an access to admin panel you just need to visit server-name:15672/, where server-name is the domain name for where RabbitMQ is installed, and15672 is the default port on which the RabbitMQ, web server is listening. By default, the login and password for a new installation are 'guest' and 'guest' respectively.
I've included some screenshots of the admin panel, to give you an idea of what you can expect if you choose to install this plugin for your own system.
Install Symfony2 and the RabbitMQBundle
You may well have Symfony installed already, but if not then check out the installation instructions for the project. Once Symfony is set up, then you can install the RabbitMqBundle. There are some good instructions on the github project page for the bundle.
Build a producer
Now that everything is set up, and our RabbitMQ instance is ready to use, we will start to write some code. First we will create a producer that will be used to publish messages to a queue.
You can think about producers as postmen, that take a letter (message) from you and transport it to the post office (exchange). At the exchange they will be sorted into queues, and from queues they can be transported to clients by a special type of postman called consumers.
The RabbitMqBundle knows about producers so we can simply start by configuring ours in the config.yml file:
# app/config/config.yml
old_sound_rabbit_mq:
producers:
split_file:
connection: default
exchange_options: { name: 'file_nodes', type: direct }
Next we need to create the class that will actually do the work of splitting up the XML. The responsibility of this class is very simple, it takes the path to the XML file, parses the file, and publishes a separate message for each <item> node it found in the document. I should mention that this code uses \XMLReader. This was a very important choice made from a performance point of view.
From the PHP Documentation: The reader acts as a cursor going forward on the document stream and stopping at each node on the way.
This means that the reader will not load the whole file into memory - which for very big files would cause problems!
Now we will take a very large XML file and split it into smaller chunks that we can then handle individually. First we will create a class to do this work, then we will configure it to run from the command line so that it could be run by cron or some other scheduled process.
Here is the class that does this work:
<?php
// src/Acme/DemoBundle/SplitFile.php
namespace Acme\DemoBundle;
class SplitFile
{
private $path;
private $producer;
public function __construct($path, $producer)
{
$this->path = $path;
$this->producer = $producer;
}
public function process()
{
$xmlReader = new \XMLReader();
$xmlReader->open($this->path);
while ($xmlReader->read()) {
if ((\XMLReader::ELEMENT === $xmlReader->nodeType) && ($xmlReader->name === 'item')) {
$xml = '<?xml version="1.0"?>';
$xml .= '<item>';
$xml .= $xmlReader->readInnerXML();
$xml .= '</item>';
$this->producer->publish($xml);
}
}
$xmlReader->close();
}
}
Next we register the new SplitFile class as a service. If you are not familiar with the concept of services, then I recommend you read the excellent documentation about the Service Container in Symfony2.
To do that we need to add the following lines to services.yml:
# src/Acme/DemoBundle/Resources/config/services.yml
parameters:
file_path: /path/to/xml/file.xml
services:
split_file:
class: Acme\DemoBundle\SplitFile
arguments:
- %file_path%
- @old_sound_rabbit_mq.split_file_producer
What exactly happened here? Let's look at each section of the file in turn.
The first part is responsible for defining parameters that will be accessible by any other configuration file (e.g. parameters.yml, services.yml, config_*.yml etc.). We defined one parameter, file_path which is a path to the XML file that will be processed by the SplitFile class.
Next we added a services section; this is the definition of our service to process an XML file. As you can see, we defined the namespace of the class using the class attribute, and this service takes two arguments to its __construct() method. The first argument is %file_path%, this will be the path to the XML file defined above. The second argument is created by the RabbitMqBundle service to insert the message into the exchange.
A simple example of the file format we'll be processing is shown below: we'll be using this as our test file. You can save it in the path defined in the file_path parameter.
<?xml version='1.0'?>
<items>
<item>
<title>First Blood</title>
<email>[email protected]</email>
</item>
<item>
<title>Rambo: First Blood Part II</title>
<email>[email protected]</email>
</item>
</items>
In our example, the body of one message published to the queue will look like this:
<?xml version='1.0' encoding='utf-8' standalone='yes'?>
<item>
<title>Rambo: First Blood Part II</title>
<email>[email protected]</email>
</item>
Build the producer
Together, these elements of code and configuration give us a fully working application which can process a large XML file and create a message in a exchange for each <item> node that it encounters.
We can also set up Symfony to make this functionality available as a CLI command that you could run through cron, perhaps to process a large product import or inventory file that arrives regularly. Happily Symfony2 comes with a great component called Console that can be used for this.
Below is an example of configuring a simple command for the console:
<?php
// src/Acme/DemoBundle/Command/ReadFileCommand.php
namespace Acme\DemoBundle\Command;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
class ReadFileCommand extends ContainerAwareCommand
{
protected function configure()
{
$this->setName('reader:read-file');
}
protected function execute(InputInterface $input, OutputInterface $output)
{
$this->getContainer()->get('split_file')->process();
}
}
To execute this command just run the following in your project directory:
$ ./app/console reader:read-file
To read more, you can visit the documentation of the Console component.
Build the queue consumer
What exactly does a consumer do? Well... more or less anything you can think of, as it is simply a PHP class registered as service. It could parse some XML from a message and send an email, or even publish message to another exchange, or insert data into a database; there are so many possibilities. Exactly as I said before: the only limit is your own imagination.
For our example application, the consumer class will read messages from the queue, and then process the XML that is stored within each one.
<?php
// src/Acme/DemoBundle/Consumer/ReadNode.php
namespace Acme\DemoBundle\Consumer;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Component\HttpKernel\Log\LoggerInterface;
class ReadNode implements ConsumerInterface
{
private $logger;
public function __construct(LoggerInterface $logger)
{
$this->logger = $logger;
}
public function execute(AMQPMessage $msg)
{
// $msg->body is a data sent by RabbitMQ, in our example it contains XML
$sxe = new \SimpleXMLElement($msg->body);
// Now it's completely up to what you will do with this XML
// You can do anything! But we will just log that we processed XML node
$this->logger->info(sprintf('Node processed: "%s"', $sxe->title));
}
}
We can register our class as a service by adding the following code, similar to the process we used for the producer:
# src/Acme/DemoBundle/Resources/config/services.yml
services:
process_node:
class: Acme\DemoBundle\Consumer\ReadNode
arguments:
- @logger
We are almost done with creating our consumer, now we just need to register the service we created as a RabbitMQ consumer and run the command which will be listening on the queue and reading messages from it for processing.
To tell the RabbitMqBundle about the service, add the following code to your config.yml:
# app/config/config.yml old_sound_rabbit_mq: consumers: read_node: connection: default exchange_options: { name: 'file_nodes', type: direct } queue_options: { name: 'file_nodes' } callback: process_node
To start processing messages by consumer from queue, we just need to run this command:
$ ./app/console rabbitmq:consumer -w read_node
As we mentioned at the beginning of this article you are not limited to one consumer for a queue, you may have many consumers, either all processing a single message type in parallel, or perhaps each listening for particular types of message. You can extend this setup in many ways, for example if you wanted to add monitoring to your message queues, you could try a tool like God, a process monitoring framework written in Ruby.
RabbitMQ and Symfony2
RabbitMQ is powerful tool with lot of features. In this article we went through some simple configuration and use of RabbitMQ, but this was really just a small first step. To learn more about this topic, I recommend you try the RabbitMQ tutorials available from the RabbitMQ project itself. If you're using RabbitMQ in your applications, or you're thinking about adding it, I'd be interested to hear your experiences - so please leave me a comment!