PHP Classes

How to Use Queue To Speedup PHP Processing Tasks Part 2: Processing Queued Tasks

Recommend this page to a friend!
  Blog PHP Classes blog   RSS 1.0 feed RSS 2.0 feed   Blog How to Use Queue To S...   Post a comment Post a comment   See comments See comments (7)   Trackbacks (0)  

Author:

Viewers: 3,298

Last month viewers: 35

Categories: PHP Tutorials, PHP Performance

As it was explained in the first part of the article, queues are great to defer the processing long tasks, allowing to provide faster user interfaces so users do not have to wait for those tasks to complete.

Read this article to learn how to efficiently process queued tasks in PHP and keep track of the results.




Loaded Article

Contents

Introduction

The Benefits of Queueing Processing Tasks

Processing a Queue

A Real Life Example

Queuing Tasks

The Queue Processor Worker

Improving the Performance of the Queue Processing

Conclusion


Introduction

As it was explained in the first part of this article, you can "postpone" the execution of a long task by sending it to a queue. That will help to to speed up your application for the end user as he does not have to wait for your task to complete.

The task is queued, your users have better experience with your sites or apps. The only thing left to implement is to process the tasks in the queue doing the actual work.

The Benefits of Queueing Processing Tasks

Now you may wonder if are there any other benefits of using a queue, apart from a quick response of such an application? Sure, the benefits are:

  • You can see the stats of processing: how many succeed and fail
  • You can get the exact time the tasks take in average
  • You can track the errors of processing, figure out the reason and then "replay" the failed entries. This can be very handy if your team mate changed the password to a service your task relies on, or you run out of credits and need to fund the account
  • Some complex rules can be applied to processing, for example, if you send queued emails via Gmail mail server, it allows to send only 2000 emails per day, so your queues processors may take care of this easily
  • Processing can take place only when it is the best time for your servers, for instance, when it is night or when there is an Internet connection or mobile phone signal
  • You can process tasks in a batch, for instance, establish a single connection to your STMP server and send a hundred queued emails within a single session

Processing a Queue

If you remember "The Imitation Game" movie, there were special workers,— a group of ladies, who were given each a deck of papers to decode, and as soon as a paper was decoded, it was given back to the manager. That is a perfect example of "a worker" or "queue processor".

So in this article you willl learn how to create a queue processor. The processor of the queue or just "worker" is a small script that:

  1. Runs regularly, usually as from the cron program, for example, every 5 minutes,
  2. Takes N jobs from the queue
  3. Process the jobs of this batch one by one
  4. Reports the results: marks the jobs as processed, or logs the error message for failed ones so you could analyze it later
Here how a simple worker script looks like:
<?php

class Worker {

 protected $_queue;

 /**
  * remember the queue reference
  * @param object $queue
  */
 public function __construct($queue) {
  $this->_queue = $queue;
 }

 public function process() {
   $item = $this->_queue->getItem();
   manipulate_the_item_to_do_the_work($item);
   return true;
 }
};

$worker->process();

?>

This is a simplified example of code, as it processes a single job per run, and the step number 4, report the results, is not implemented yet.

Anyway, this simple piece of code can do the job already, and you can run in using cron every 5 minutes using a crontab configuration like this (if you just saved the code above as queue-worker.php file):

> crontab -e //run this to open crontab settings
*/5 * * * * php /path/to/queue-worker.php

A Real Life Example

Now let's implement a real life example. The queued items are to send a notification email to a user after successful subscription, purchase or something like that.

Create a table in your database with help of this SQL query:

CREATE TABLE IF NOT EXISTS `queue` (
  `id` int(10) NOT NULL PRIMARY KEY AUTO_INCREMENT,
  `email` varchar(100) NOT NULL,
  `html` text NOT NULL,
  `status` enum('queued','processing','done','failed') NOT NULL DEFAULT 'queued',
  `error_text` varchar(255) DEFAULT NULL,
  `created_at` datetime DEFAULT NULL,
  `updated_at` timestamp NOT NULL 
        DEFAULT '0000-00-00 00:00:00'
        ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Notice that:

  • status field equals "queued" value by default— that means that a job is queued and is waiting for the processing. When we take an element from the queue to process, we change this field value to "processing". If the processing was successful, we set this value to "done", otherwise to "failed" and log the error message.
  • From my experience it is best to log the timestamp when the item was queued and the timestamp of the last status change. This is saved in created_at and updated_at fields accordingly. When we just create a new record in this table, we set created_at to equal to current time. And every time the record is updated (say, we change the status field), the updated_at  field gets updated automatically — it's a trick of MySQL for TIMESTAMP fields that have "ON UPDATE CURRENT_TIMESTAMP" setting.

Queuing Tasks

Ok, we have a table to queue our jobs. Now let's write the code to add an item to the queue. It depends on your implementation and framework, but it's really likely that you need to pass the database handler object reference to the constructor:
<?php

class Queue {

 protected $_databaseHandler;

 /**
  * remember the database reference
  * @param object $databaseHandler
  */
 public function __construct($databaseHandler) {
  $this->_databaseHandler = $databaseHandler;
 }


 /**
  * runs an INSERT query to append a new task to the existing queue table
  * @param string $emailAddress
  * @param string $html
  */
 public function addItem($emailAddress, $html) {
    $this->_databaseHandler->query('INSERT INTO queue
       (`id`, `email`, `html`, `created_at`, `updated_at`) 
       VALUES (NULL, "' . quote($emailAddress). '",  "'
          . quote($html). '", NOW(), NOW())';
    return true;
 }


 /**
  * requests a new queued item (not being processed, done or failed. A new one!)
  * @return object $item
  */
 public function getItem() {
  $item = $this->_databaseHandler->query('SELECT * 
      FROM queue
      WHERE status="queued"
      LIMIT 1');
  return $item;
 }
 
};

?>

The Queue Processor Worker

Now let's write the worker. It needs a reference to database handler as well.
<?php

class Worker {

 protected $_databaseHandler;
 protected $_queue;

 /**
  * remember the references
  * @param object $queue
  * @param object $databaseHandler
  */
 public function __construct($queue, $databaseHandler) {
  $this->_queue = $queue;
  $this->_databaseHandler = $databaseHandler;
 }

 public function process() {
   $item = $this->_queue->getItem();
   $this->markItemAsBeingProcessed($item);

   try {
     mail(
       $item['email'],
       'Your are subscribed!',
       $item['html']
     );

     // if mail() was ok, just mark the item as done
     $this->markItemAsDone($item);
   } 
   catch (Exception $e) {
     //if mail() fails, mark item as failed and save the error text
     $errorMessageText = $e->getMessage();
     $this->markItemAsFailed($item, $errorMessageText);
   }
 }

 public function markItemAsBeingProcessed($item) {
   $this->_changeItemStatus($item['id'], 'processing');
 }

 public function markItemAsDone($item) {
   $this->_changeItemStatus($item['id'], 'done');
 }

 public function markItemAsFailed($item, $errorMessageText) {
   $this->_changeItemStatus($item['id'], 'failed', $errorMessageText);
 }

 protected function _changeItemStatus($itemId, $itemStatus, $errorMessageText=null) {
    $this->_databaseHandler->query('UPDATE queue SET 
      `status` = '. $itemStatus . ',
      `error_text` = '. $errorMessageText . '
      WHERE `id` = '. $itemId;
 }
 
};

?>

That's it!

Improving the Performance of the Queue Processing

There are a few ways to improve this system. For instance if you need to process more than 1 item per run, like for instance 10 or 20, items you need to change Queue::getItem() method, pass a $number parameter as the number of required jobs to process, and change "LIMIT 1" to "LIMIT $number" in the SQL query.

Conclusion

As you can see, making a queue and implement its processing is not a super complex task. You just need to implement it once, but then it allows you to kill many birds by one stone, so you can use the solution as many times as necessary.

If you liked this article or you have a question about processing queues of pending tasks, post a comment here.




You need to be a registered user or login to post a comment

Login Immediately with your account on:



Comments:

4. better solution - Dailis Tukans (2016-03-27 00:42)
beanstalk + supervisor as worker processes control system... - 0 replies
Read the whole comment and replies

3. My implementation - Gerry Danen (2016-03-10 19:11)
Reducing long wait times by implementing queues... - 1 reply
Read the whole comment and replies

2. Optimisations - Lionel F. Lebeau (2016-03-10 19:11)
One or more workers... - 1 reply
Read the whole comment and replies

1. Guilherme - Guilherme Viana (2016-03-10 19:10)
Why not gearman ?... - 1 reply
Read the whole comment and replies



  Blog PHP Classes blog   RSS 1.0 feed RSS 2.0 feed   Blog How to Use Queue To S...   Post a comment Post a comment   See comments See comments (7)   Trackbacks (0)